1 /*****************************************************************************
2 
3 Copyright (c) 2000, 2018, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file row/row0mysql.cc
22 Interface between Innobase row operations and MySQL.
23 Contains also create table and other data dictionary operations.
24 
25 Created 9/17/2000 Heikki Tuuri
26 *******************************************************/
27 
28 #include "univ.i"
29 #include <debug_sync.h>
30 #include <gstream.h>
31 #include <spatial.h>
32 
33 #include "row0mysql.h"
34 #include "btr0sea.h"
35 #include "dict0boot.h"
36 #include "dict0crea.h"
37 #include "dict0dict.h"
38 #include "dict0load.h"
39 #include "dict0priv.h"
40 #include "dict0stats.h"
41 #include "dict0stats_bg.h"
42 #include "dict0defrag_bg.h"
43 #include "btr0defragment.h"
44 #include "fil0fil.h"
45 #include "fil0crypt.h"
46 #include "fsp0file.h"
47 #include "fts0fts.h"
48 #include "fts0types.h"
49 #include "ibuf0ibuf.h"
50 #include "lock0lock.h"
51 #include "log0log.h"
52 #include "pars0pars.h"
53 #include "que0que.h"
54 #include "rem0cmp.h"
55 #include "row0import.h"
56 #include "row0ins.h"
57 #include "row0row.h"
58 #include "row0sel.h"
59 #include "row0upd.h"
60 #include "trx0purge.h"
61 #include "trx0rec.h"
62 #include "trx0roll.h"
63 #include "trx0undo.h"
64 #include "srv0start.h"
65 #include "row0ext.h"
66 #include "srv0start.h"
67 
68 #include <algorithm>
69 #include <deque>
70 #include <vector>
71 
72 #ifdef WITH_WSREP
73 #include "mysql/service_wsrep.h"
74 #include "wsrep.h"
75 #include "log.h"
76 #include "wsrep_mysqld.h"
77 #endif
78 
79 /** Provide optional 4.x backwards compatibility for 5.0 and above */
80 ibool	row_rollback_on_timeout	= FALSE;
81 
82 /** Chain node of the list of tables to drop in the background. */
83 struct row_mysql_drop_t{
84 	table_id_t			table_id;	/*!< table id */
85 	UT_LIST_NODE_T(row_mysql_drop_t)row_mysql_drop_list;
86 							/*!< list chain node */
87 };
88 
89 /** @brief List of tables we should drop in background.
90 
91 ALTER TABLE in MySQL requires that the table handler can drop the
92 table in background when there are no queries to it any
93 more.  Protected by row_drop_list_mutex. */
94 static UT_LIST_BASE_NODE_T(row_mysql_drop_t)	row_mysql_drop_list;
95 
96 /** Mutex protecting the background table drop list. */
97 static ib_mutex_t row_drop_list_mutex;
98 
99 /** Flag: has row_mysql_drop_list been initialized? */
100 static bool row_mysql_drop_list_inited;
101 
102 #ifdef UNIV_DEBUG
103 /** Wait for the background drop list to become empty. */
104 void
row_wait_for_background_drop_list_empty()105 row_wait_for_background_drop_list_empty()
106 {
107 	bool	empty = false;
108 	while (!empty) {
109 		mutex_enter(&row_drop_list_mutex);
110 		empty = (UT_LIST_GET_LEN(row_mysql_drop_list) == 0);
111 		mutex_exit(&row_drop_list_mutex);
112 		os_thread_sleep(100000);
113 	}
114 }
115 #endif /* UNIV_DEBUG */
116 
117 /*******************************************************************//**
118 Delays an INSERT, DELETE or UPDATE operation if the purge is lagging. */
119 static
120 void
row_mysql_delay_if_needed(void)121 row_mysql_delay_if_needed(void)
122 /*===========================*/
123 {
124 	if (srv_dml_needed_delay) {
125 		os_thread_sleep(srv_dml_needed_delay);
126 	}
127 }
128 
129 /*******************************************************************//**
130 Frees the blob heap in prebuilt when no longer needed. */
131 void
row_mysql_prebuilt_free_blob_heap(row_prebuilt_t * prebuilt)132 row_mysql_prebuilt_free_blob_heap(
133 /*==============================*/
134 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct of a
135 					ha_innobase:: table handle */
136 {
137 	DBUG_ENTER("row_mysql_prebuilt_free_blob_heap");
138 
139 	DBUG_PRINT("row_mysql_prebuilt_free_blob_heap",
140 		   ("blob_heap freeing: %p", prebuilt->blob_heap));
141 
142 	mem_heap_free(prebuilt->blob_heap);
143 	prebuilt->blob_heap = NULL;
144 	DBUG_VOID_RETURN;
145 }
146 
147 /*******************************************************************//**
148 Stores a >= 5.0.3 format true VARCHAR length to dest, in the MySQL row
149 format.
150 @return pointer to the data, we skip the 1 or 2 bytes at the start
151 that are used to store the len */
152 byte*
row_mysql_store_true_var_len(byte * dest,ulint len,ulint lenlen)153 row_mysql_store_true_var_len(
154 /*=========================*/
155 	byte*	dest,	/*!< in: where to store */
156 	ulint	len,	/*!< in: length, must fit in two bytes */
157 	ulint	lenlen)	/*!< in: storage length of len: either 1 or 2 bytes */
158 {
159 	if (lenlen == 2) {
160 		ut_a(len < 256 * 256);
161 
162 		mach_write_to_2_little_endian(dest, len);
163 
164 		return(dest + 2);
165 	}
166 
167 	ut_a(lenlen == 1);
168 	ut_a(len < 256);
169 
170 	mach_write_to_1(dest, len);
171 
172 	return(dest + 1);
173 }
174 
175 /*******************************************************************//**
176 Reads a >= 5.0.3 format true VARCHAR length, in the MySQL row format, and
177 returns a pointer to the data.
178 @return pointer to the data, we skip the 1 or 2 bytes at the start
179 that are used to store the len */
180 const byte*
row_mysql_read_true_varchar(ulint * len,const byte * field,ulint lenlen)181 row_mysql_read_true_varchar(
182 /*========================*/
183 	ulint*		len,	/*!< out: variable-length field length */
184 	const byte*	field,	/*!< in: field in the MySQL format */
185 	ulint		lenlen)	/*!< in: storage length of len: either 1
186 				or 2 bytes */
187 {
188 	if (lenlen == 2) {
189 		*len = mach_read_from_2_little_endian(field);
190 
191 		return(field + 2);
192 	}
193 
194 	ut_a(lenlen == 1);
195 
196 	*len = mach_read_from_1(field);
197 
198 	return(field + 1);
199 }
200 
201 /*******************************************************************//**
202 Stores a reference to a BLOB in the MySQL format. */
203 void
row_mysql_store_blob_ref(byte * dest,ulint col_len,const void * data,ulint len)204 row_mysql_store_blob_ref(
205 /*=====================*/
206 	byte*		dest,	/*!< in: where to store */
207 	ulint		col_len,/*!< in: dest buffer size: determines into
208 				how many bytes the BLOB length is stored,
209 				the space for the length may vary from 1
210 				to 4 bytes */
211 	const void*	data,	/*!< in: BLOB data; if the value to store
212 				is SQL NULL this should be NULL pointer */
213 	ulint		len)	/*!< in: BLOB length; if the value to store
214 				is SQL NULL this should be 0; remember
215 				also to set the NULL bit in the MySQL record
216 				header! */
217 {
218 	/* MySQL might assume the field is set to zero except the length and
219 	the pointer fields */
220 
221 	memset(dest, '\0', col_len);
222 
223 	/* In dest there are 1 - 4 bytes reserved for the BLOB length,
224 	and after that 8 bytes reserved for the pointer to the data.
225 	In 32-bit architectures we only use the first 4 bytes of the pointer
226 	slot. */
227 
228 	ut_a(col_len - 8 > 1 || len < 256);
229 	ut_a(col_len - 8 > 2 || len < 256 * 256);
230 	ut_a(col_len - 8 > 3 || len < 256 * 256 * 256);
231 
232 	mach_write_to_n_little_endian(dest, col_len - 8, len);
233 
234 	memcpy(dest + col_len - 8, &data, sizeof data);
235 }
236 
237 /*******************************************************************//**
238 Reads a reference to a BLOB in the MySQL format.
239 @return pointer to BLOB data */
240 const byte*
row_mysql_read_blob_ref(ulint * len,const byte * ref,ulint col_len)241 row_mysql_read_blob_ref(
242 /*====================*/
243 	ulint*		len,		/*!< out: BLOB length */
244 	const byte*	ref,		/*!< in: BLOB reference in the
245 					MySQL format */
246 	ulint		col_len)	/*!< in: BLOB reference length
247 					(not BLOB length) */
248 {
249 	byte*	data;
250 
251 	*len = mach_read_from_n_little_endian(ref, col_len - 8);
252 
253 	memcpy(&data, ref + col_len - 8, sizeof data);
254 
255 	return(data);
256 }
257 
258 /*******************************************************************//**
259 Converting InnoDB geometry data format to MySQL data format. */
260 void
row_mysql_store_geometry(byte * dest,ulint dest_len,const byte * src,ulint src_len)261 row_mysql_store_geometry(
262 /*=====================*/
263 	byte*		dest,		/*!< in/out: where to store */
264 	ulint		dest_len,	/*!< in: dest buffer size: determines
265 					into how many bytes the GEOMETRY length
266 					is stored, the space for the length
267 					may vary from 1 to 4 bytes */
268 	const byte*	src,		/*!< in: GEOMETRY data; if the value to
269 					store is SQL NULL this should be NULL
270 					pointer */
271 	ulint		src_len)	/*!< in: GEOMETRY length; if the value
272 					to store is SQL NULL this should be 0;
273 					remember also to set the NULL bit in
274 					the MySQL record header! */
275 {
276 	/* MySQL might assume the field is set to zero except the length and
277 	the pointer fields */
278 	MEM_CHECK_DEFINED(src, src_len);
279 
280 	memset(dest, '\0', dest_len);
281 
282 	/* In dest there are 1 - 4 bytes reserved for the BLOB length,
283 	and after that 8 bytes reserved for the pointer to the data.
284 	In 32-bit architectures we only use the first 4 bytes of the pointer
285 	slot. */
286 
287 	ut_ad(dest_len - 8 > 1 || src_len < 1<<8);
288 	ut_ad(dest_len - 8 > 2 || src_len < 1<<16);
289 	ut_ad(dest_len - 8 > 3 || src_len < 1<<24);
290 
291 	mach_write_to_n_little_endian(dest, dest_len - 8, src_len);
292 
293 	memcpy(dest + dest_len - 8, &src, sizeof src);
294 }
295 
296 /*******************************************************************//**
297 Read geometry data in the MySQL format.
298 @return pointer to geometry data */
299 static
300 const byte*
row_mysql_read_geometry(ulint * len,const byte * ref,ulint col_len)301 row_mysql_read_geometry(
302 /*====================*/
303 	ulint*		len,		/*!< out: data length */
304 	const byte*	ref,		/*!< in: geometry data in the
305 					MySQL format */
306 	ulint		col_len)	/*!< in: MySQL format length */
307 {
308 	byte*		data;
309 	ut_ad(col_len > 8);
310 
311 	*len = mach_read_from_n_little_endian(ref, col_len - 8);
312 
313 	memcpy(&data, ref + col_len - 8, sizeof data);
314 
315 	return(data);
316 }
317 
318 /**************************************************************//**
319 Pad a column with spaces. */
320 void
row_mysql_pad_col(ulint mbminlen,byte * pad,ulint len)321 row_mysql_pad_col(
322 /*==============*/
323 	ulint	mbminlen,	/*!< in: minimum size of a character,
324 				in bytes */
325 	byte*	pad,		/*!< out: padded buffer */
326 	ulint	len)		/*!< in: number of bytes to pad */
327 {
328 	const byte*	pad_end;
329 
330 	switch (UNIV_EXPECT(mbminlen, 1)) {
331 	default:
332 		ut_error;
333 	case 1:
334 		/* space=0x20 */
335 		memset(pad, 0x20, len);
336 		break;
337 	case 2:
338 		/* space=0x0020 */
339 		pad_end = pad + len;
340 		ut_a(!(len % 2));
341 		while (pad < pad_end) {
342 			*pad++ = 0x00;
343 			*pad++ = 0x20;
344 		};
345 		break;
346 	case 4:
347 		/* space=0x00000020 */
348 		pad_end = pad + len;
349 		ut_a(!(len % 4));
350 		while (pad < pad_end) {
351 			*pad++ = 0x00;
352 			*pad++ = 0x00;
353 			*pad++ = 0x00;
354 			*pad++ = 0x20;
355 		}
356 		break;
357 	}
358 }
359 
360 /**************************************************************//**
361 Stores a non-SQL-NULL field given in the MySQL format in the InnoDB format.
362 The counterpart of this function is row_sel_field_store_in_mysql_format() in
363 row0sel.cc.
364 @return up to which byte we used buf in the conversion */
365 byte*
row_mysql_store_col_in_innobase_format(dfield_t * dfield,byte * buf,ibool row_format_col,const byte * mysql_data,ulint col_len,ulint comp)366 row_mysql_store_col_in_innobase_format(
367 /*===================================*/
368 	dfield_t*	dfield,		/*!< in/out: dfield where dtype
369 					information must be already set when
370 					this function is called! */
371 	byte*		buf,		/*!< in/out: buffer for a converted
372 					integer value; this must be at least
373 					col_len long then! NOTE that dfield
374 					may also get a pointer to 'buf',
375 					therefore do not discard this as long
376 					as dfield is used! */
377 	ibool		row_format_col,	/*!< TRUE if the mysql_data is from
378 					a MySQL row, FALSE if from a MySQL
379 					key value;
380 					in MySQL, a true VARCHAR storage
381 					format differs in a row and in a
382 					key value: in a key value the length
383 					is always stored in 2 bytes! */
384 	const byte*	mysql_data,	/*!< in: MySQL column value, not
385 					SQL NULL; NOTE that dfield may also
386 					get a pointer to mysql_data,
387 					therefore do not discard this as long
388 					as dfield is used! */
389 	ulint		col_len,	/*!< in: MySQL column length; NOTE that
390 					this is the storage length of the
391 					column in the MySQL format row, not
392 					necessarily the length of the actual
393 					payload data; if the column is a true
394 					VARCHAR then this is irrelevant */
395 	ulint		comp)		/*!< in: nonzero=compact format */
396 {
397 	const byte*	ptr	= mysql_data;
398 	const dtype_t*	dtype;
399 	ulint		type;
400 	ulint		lenlen;
401 
402 	dtype = dfield_get_type(dfield);
403 
404 	type = dtype->mtype;
405 
406 	if (type == DATA_INT) {
407 		/* Store integer data in Innobase in a big-endian format,
408 		sign bit negated if the data is a signed integer. In MySQL,
409 		integers are stored in a little-endian format. */
410 
411 		byte*	p = buf + col_len;
412 
413 		for (;;) {
414 			p--;
415 			*p = *mysql_data;
416 			if (p == buf) {
417 				break;
418 			}
419 			mysql_data++;
420 		}
421 
422 		if (!(dtype->prtype & DATA_UNSIGNED)) {
423 
424 			*buf ^= 128;
425 		}
426 
427 		ptr = buf;
428 		buf += col_len;
429 	} else if ((type == DATA_VARCHAR
430 		    || type == DATA_VARMYSQL
431 		    || type == DATA_BINARY)) {
432 
433 		if (dtype_get_mysql_type(dtype) == DATA_MYSQL_TRUE_VARCHAR) {
434 			/* The length of the actual data is stored to 1 or 2
435 			bytes at the start of the field */
436 
437 			if (row_format_col) {
438 				if (dtype->prtype & DATA_LONG_TRUE_VARCHAR) {
439 					lenlen = 2;
440 				} else {
441 					lenlen = 1;
442 				}
443 			} else {
444 				/* In a MySQL key value, lenlen is always 2 */
445 				lenlen = 2;
446 			}
447 
448 			ptr = row_mysql_read_true_varchar(&col_len, mysql_data,
449 							  lenlen);
450 		} else {
451 			/* Remove trailing spaces from old style VARCHAR
452 			columns. */
453 
454 			/* Handle Unicode strings differently. */
455 			ulint	mbminlen	= dtype_get_mbminlen(dtype);
456 
457 			ptr = mysql_data;
458 
459 			switch (mbminlen) {
460 			default:
461 				ut_error;
462 			case 4:
463 				/* space=0x00000020 */
464 				/* Trim "half-chars", just in case. */
465 				col_len &= ~3U;
466 
467 				while (col_len >= 4
468 				       && ptr[col_len - 4] == 0x00
469 				       && ptr[col_len - 3] == 0x00
470 				       && ptr[col_len - 2] == 0x00
471 				       && ptr[col_len - 1] == 0x20) {
472 					col_len -= 4;
473 				}
474 				break;
475 			case 2:
476 				/* space=0x0020 */
477 				/* Trim "half-chars", just in case. */
478 				col_len &= ~1U;
479 
480 				while (col_len >= 2 && ptr[col_len - 2] == 0x00
481 				       && ptr[col_len - 1] == 0x20) {
482 					col_len -= 2;
483 				}
484 				break;
485 			case 1:
486 				/* space=0x20 */
487 				while (col_len > 0
488 				       && ptr[col_len - 1] == 0x20) {
489 					col_len--;
490 				}
491 			}
492 		}
493 	} else if (comp && type == DATA_MYSQL
494 		   && dtype_get_mbminlen(dtype) == 1
495 		   && dtype_get_mbmaxlen(dtype) > 1) {
496 		/* In some cases we strip trailing spaces from UTF-8 and other
497 		multibyte charsets, from FIXED-length CHAR columns, to save
498 		space. UTF-8 would otherwise normally use 3 * the string length
499 		bytes to store an ASCII string! */
500 
501 		/* We assume that this CHAR field is encoded in a
502 		variable-length character set where spaces have
503 		1:1 correspondence to 0x20 bytes, such as UTF-8.
504 
505 		Consider a CHAR(n) field, a field of n characters.
506 		It will contain between n * mbminlen and n * mbmaxlen bytes.
507 		We will try to truncate it to n bytes by stripping
508 		space padding.	If the field contains single-byte
509 		characters only, it will be truncated to n characters.
510 		Consider a CHAR(5) field containing the string
511 		".a   " where "." denotes a 3-byte character represented
512 		by the bytes "$%&". After our stripping, the string will
513 		be stored as "$%&a " (5 bytes). The string
514 		".abc " will be stored as "$%&abc" (6 bytes).
515 
516 		The space padding will be restored in row0sel.cc, function
517 		row_sel_field_store_in_mysql_format(). */
518 
519 		ulint		n_chars;
520 
521 		ut_a(!(dtype_get_len(dtype) % dtype_get_mbmaxlen(dtype)));
522 
523 		n_chars = dtype_get_len(dtype) / dtype_get_mbmaxlen(dtype);
524 
525 		/* Strip space padding. */
526 		while (col_len > n_chars && ptr[col_len - 1] == 0x20) {
527 			col_len--;
528 		}
529 	} else if (!row_format_col) {
530 		/* if mysql data is from a MySQL key value
531 		since the length is always stored in 2 bytes,
532 		we need do nothing here. */
533 	} else if (type == DATA_BLOB) {
534 
535 		ptr = row_mysql_read_blob_ref(&col_len, mysql_data, col_len);
536 	} else if (DATA_GEOMETRY_MTYPE(type)) {
537 		ptr = row_mysql_read_geometry(&col_len, mysql_data, col_len);
538 	}
539 
540 	dfield_set_data(dfield, ptr, col_len);
541 
542 	return(buf);
543 }
544 
545 /**************************************************************//**
546 Convert a row in the MySQL format to a row in the Innobase format. Note that
547 the function to convert a MySQL format key value to an InnoDB dtuple is
548 row_sel_convert_mysql_key_to_innobase() in row0sel.cc. */
549 static
550 void
row_mysql_convert_row_to_innobase(dtuple_t * row,row_prebuilt_t * prebuilt,const byte * mysql_rec,mem_heap_t ** blob_heap)551 row_mysql_convert_row_to_innobase(
552 /*==============================*/
553 	dtuple_t*	row,		/*!< in/out: Innobase row where the
554 					field type information is already
555 					copied there! */
556 	row_prebuilt_t*	prebuilt,	/*!< in: prebuilt struct where template
557 					must be of type ROW_MYSQL_WHOLE_ROW */
558 	const byte*	mysql_rec,	/*!< in: row in the MySQL format;
559 					NOTE: do not discard as long as
560 					row is used, as row may contain
561 					pointers to this record! */
562 	mem_heap_t**	blob_heap)	/*!< in: FIX_ME, remove this after
563 					server fixes its issue */
564 {
565 	const mysql_row_templ_t*templ;
566 	dfield_t*		dfield;
567 	ulint			i;
568 	ulint			n_col = 0;
569 	ulint			n_v_col = 0;
570 
571 	ut_ad(prebuilt->template_type == ROW_MYSQL_WHOLE_ROW);
572 	ut_ad(prebuilt->mysql_template);
573 
574 	for (i = 0; i < prebuilt->n_template; i++) {
575 
576 		templ = prebuilt->mysql_template + i;
577 
578 		if (templ->is_virtual) {
579 			ut_ad(n_v_col < dtuple_get_n_v_fields(row));
580 			dfield = dtuple_get_nth_v_field(row, n_v_col);
581 			n_v_col++;
582 		} else {
583 			dfield = dtuple_get_nth_field(row, n_col);
584 			n_col++;
585 		}
586 
587 		if (templ->mysql_null_bit_mask != 0) {
588 			/* Column may be SQL NULL */
589 
590 			if (mysql_rec[templ->mysql_null_byte_offset]
591 			    & (byte) (templ->mysql_null_bit_mask)) {
592 
593 				/* It is SQL NULL */
594 
595 				dfield_set_null(dfield);
596 
597 				goto next_column;
598 			}
599 		}
600 
601 		row_mysql_store_col_in_innobase_format(
602 			dfield,
603 			prebuilt->ins_upd_rec_buff + templ->mysql_col_offset,
604 			TRUE, /* MySQL row format data */
605 			mysql_rec + templ->mysql_col_offset,
606 			templ->mysql_col_len,
607 			dict_table_is_comp(prebuilt->table));
608 
609 		/* server has issue regarding handling BLOB virtual fields,
610 		and we need to duplicate it with our own memory here */
611 		if (templ->is_virtual
612 		    && DATA_LARGE_MTYPE(dfield_get_type(dfield)->mtype)) {
613 			if (*blob_heap == NULL) {
614 				*blob_heap = mem_heap_create(dfield->len);
615 			}
616 			dfield_dup(dfield, *blob_heap);
617 		}
618 next_column:
619 		;
620 	}
621 
622 	/* If there is a FTS doc id column and it is not user supplied (
623 	generated by server) then assign it a new doc id. */
624 	if (!prebuilt->table->fts) {
625 		return;
626 	}
627 
628 	ut_a(prebuilt->table->fts->doc_col != ULINT_UNDEFINED);
629 
630 	doc_id_t	doc_id;
631 
632 	if (!DICT_TF2_FLAG_IS_SET(prebuilt->table, DICT_TF2_FTS_HAS_DOC_ID)) {
633 		if (prebuilt->table->fts->cache->first_doc_id
634 		    == FTS_NULL_DOC_ID) {
635 			fts_get_next_doc_id(prebuilt->table, &doc_id);
636 		}
637 		return;
638 	}
639 
640 	dfield_t*	fts_doc_id = dtuple_get_nth_field(
641 		row, prebuilt->table->fts->doc_col);
642 
643 	if (fts_get_next_doc_id(prebuilt->table, &doc_id) == DB_SUCCESS) {
644 		ut_a(doc_id != FTS_NULL_DOC_ID);
645 		ut_ad(sizeof(doc_id) == fts_doc_id->type.len);
646 		dfield_set_data(fts_doc_id, prebuilt->ins_upd_rec_buff
647 				+ prebuilt->mysql_row_len, 8);
648 		fts_write_doc_id(fts_doc_id->data, doc_id);
649 	} else {
650 		dfield_set_null(fts_doc_id);
651 	}
652 }
653 
654 /****************************************************************//**
655 Handles user errors and lock waits detected by the database engine.
656 @return true if it was a lock wait and we should continue running the
657 query thread and in that case the thr is ALREADY in the running state. */
658 bool
row_mysql_handle_errors(dberr_t * new_err,trx_t * trx,que_thr_t * thr,trx_savept_t * savept)659 row_mysql_handle_errors(
660 /*====================*/
661 	dberr_t*	new_err,/*!< out: possible new error encountered in
662 				lock wait, or if no new error, the value
663 				of trx->error_state at the entry of this
664 				function */
665 	trx_t*		trx,	/*!< in: transaction */
666 	que_thr_t*	thr,	/*!< in: query thread, or NULL */
667 	trx_savept_t*	savept)	/*!< in: savepoint, or NULL */
668 {
669 	dberr_t	err;
670 
671 	DBUG_ENTER("row_mysql_handle_errors");
672 	DEBUG_SYNC_C("row_mysql_handle_errors");
673 
674 handle_new_error:
675 	err = trx->error_state;
676 
677 	ut_a(err != DB_SUCCESS);
678 
679 	trx->error_state = DB_SUCCESS;
680 
681 	DBUG_LOG("trx", "handle error: " << ut_strerr(err)
682 		 << ";id=" << ib::hex(trx->id) << ", " << trx);
683 
684 	switch (err) {
685 	case DB_LOCK_WAIT_TIMEOUT:
686 		if (row_rollback_on_timeout) {
687 			goto rollback;
688 		}
689 		/* fall through */
690 	case DB_DUPLICATE_KEY:
691 	case DB_FOREIGN_DUPLICATE_KEY:
692 	case DB_TOO_BIG_RECORD:
693 	case DB_UNDO_RECORD_TOO_BIG:
694 	case DB_ROW_IS_REFERENCED:
695 	case DB_NO_REFERENCED_ROW:
696 	case DB_CANNOT_ADD_CONSTRAINT:
697 	case DB_TOO_MANY_CONCURRENT_TRXS:
698 	case DB_OUT_OF_FILE_SPACE:
699 	case DB_READ_ONLY:
700 	case DB_FTS_INVALID_DOCID:
701 	case DB_INTERRUPTED:
702 	case DB_CANT_CREATE_GEOMETRY_OBJECT:
703 	case DB_TABLE_NOT_FOUND:
704 	case DB_DECRYPTION_FAILED:
705 	case DB_COMPUTE_VALUE_FAILED:
706 	rollback_to_savept:
707 		DBUG_EXECUTE_IF("row_mysql_crash_if_error", {
708 					log_buffer_flush_to_disk();
709 					DBUG_SUICIDE(); });
710 		if (savept) {
711 			/* Roll back the latest, possibly incomplete insertion
712 			or update */
713 
714 			trx->rollback(savept);
715 		}
716 		/* MySQL will roll back the latest SQL statement */
717 		break;
718 	case DB_LOCK_WAIT:
719 		lock_wait_suspend_thread(thr);
720 
721 		if (trx->error_state != DB_SUCCESS) {
722 			que_thr_stop_for_mysql(thr);
723 
724 			goto handle_new_error;
725 		}
726 
727 		*new_err = err;
728 
729 		DBUG_RETURN(true);
730 
731 	case DB_DEADLOCK:
732 	case DB_LOCK_TABLE_FULL:
733 	rollback:
734 		/* Roll back the whole transaction; this resolution was added
735 		to version 3.23.43 */
736 
737 		trx->rollback();
738 		break;
739 
740 	case DB_MUST_GET_MORE_FILE_SPACE:
741 		ib::fatal() << "The database cannot continue operation because"
742 			" of lack of space. You must add a new data file"
743 			" to my.cnf and restart the database.";
744 		break;
745 
746 	case DB_CORRUPTION:
747 	case DB_PAGE_CORRUPTED:
748 		ib::error() << "We detected index corruption in an InnoDB type"
749 			" table. You have to dump + drop + reimport the"
750 			" table or, in a case of widespread corruption,"
751 			" dump all InnoDB tables and recreate the whole"
752 			" tablespace. If the mysqld server crashes after"
753 			" the startup or when you dump the tables. "
754 			<< FORCE_RECOVERY_MSG;
755 		goto rollback_to_savept;
756 	case DB_FOREIGN_EXCEED_MAX_CASCADE:
757 		ib::error() << "Cannot delete/update rows with cascading"
758 			" foreign key constraints that exceed max depth of "
759 			<< FK_MAX_CASCADE_DEL << ". Please drop excessive"
760 			" foreign constraints and try again";
761 		goto rollback_to_savept;
762 	case DB_UNSUPPORTED:
763 		ib::error() << "Cannot delete/update rows with cascading"
764 			" foreign key constraints in timestamp-based temporal"
765 			" table. Please drop excessive"
766 			" foreign constraints and try again";
767 		goto rollback_to_savept;
768 	default:
769 		ib::fatal() << "Unknown error " << err;
770 	}
771 
772 	if (trx->error_state != DB_SUCCESS) {
773 		*new_err = trx->error_state;
774 	} else {
775 		*new_err = err;
776 	}
777 
778 	trx->error_state = DB_SUCCESS;
779 
780 	DBUG_RETURN(false);
781 }
782 
783 /********************************************************************//**
784 Create a prebuilt struct for a MySQL table handle.
785 @return own: a prebuilt struct */
786 row_prebuilt_t*
row_create_prebuilt(dict_table_t * table,ulint mysql_row_len)787 row_create_prebuilt(
788 /*================*/
789 	dict_table_t*	table,		/*!< in: Innobase table handle */
790 	ulint		mysql_row_len)	/*!< in: length in bytes of a row in
791 					the MySQL format */
792 {
793 	DBUG_ENTER("row_create_prebuilt");
794 
795 	row_prebuilt_t*	prebuilt;
796 	mem_heap_t*	heap;
797 	dict_index_t*	clust_index;
798 	dict_index_t*	temp_index;
799 	dtuple_t*	ref;
800 	ulint		ref_len;
801 	uint		srch_key_len = 0;
802 	ulint		search_tuple_n_fields;
803 
804 	search_tuple_n_fields = 2 * (dict_table_get_n_cols(table)
805 				     + dict_table_get_n_v_cols(table));
806 
807 	clust_index = dict_table_get_first_index(table);
808 
809 	/* Make sure that search_tuple is long enough for clustered index */
810 	ut_a(2 * unsigned(table->n_cols) >= unsigned(clust_index->n_fields)
811 	     - clust_index->table->n_dropped());
812 
813 	ref_len = dict_index_get_n_unique(clust_index);
814 
815 
816         /* Maximum size of the buffer needed for conversion of INTs from
817 	little endian format to big endian format in an index. An index
818 	can have maximum 16 columns (MAX_REF_PARTS) in it. Therfore
819 	Max size for PK: 16 * 8 bytes (BIGINT's size) = 128 bytes
820 	Max size Secondary index: 16 * 8 bytes + PK = 256 bytes. */
821 #define MAX_SRCH_KEY_VAL_BUFFER         2* (8 * MAX_REF_PARTS)
822 
823 #define PREBUILT_HEAP_INITIAL_SIZE	\
824 	( \
825 	sizeof(*prebuilt) \
826 	/* allocd in this function */ \
827 	+ DTUPLE_EST_ALLOC(search_tuple_n_fields) \
828 	+ DTUPLE_EST_ALLOC(ref_len) \
829 	/* allocd in row_prebuild_sel_graph() */ \
830 	+ sizeof(sel_node_t) \
831 	+ sizeof(que_fork_t) \
832 	+ sizeof(que_thr_t) \
833 	/* allocd in row_get_prebuilt_update_vector() */ \
834 	+ sizeof(upd_node_t) \
835 	+ sizeof(upd_t) \
836 	+ sizeof(upd_field_t) \
837 	  * dict_table_get_n_cols(table) \
838 	+ sizeof(que_fork_t) \
839 	+ sizeof(que_thr_t) \
840 	/* allocd in row_get_prebuilt_insert_row() */ \
841 	+ sizeof(ins_node_t) \
842 	/* mysql_row_len could be huge and we are not \
843 	sure if this prebuilt instance is going to be \
844 	used in inserts */ \
845 	+ (mysql_row_len < 256 ? mysql_row_len : 0) \
846 	+ DTUPLE_EST_ALLOC(dict_table_get_n_cols(table) \
847 			   + dict_table_get_n_v_cols(table)) \
848 	+ sizeof(que_fork_t) \
849 	+ sizeof(que_thr_t) \
850 	+ sizeof(*prebuilt->pcur) \
851 	+ sizeof(*prebuilt->clust_pcur) \
852 	)
853 
854 	/* Calculate size of key buffer used to store search key in
855 	InnoDB format. MySQL stores INTs in little endian format and
856 	InnoDB stores INTs in big endian format with the sign bit
857 	flipped. All other field types are stored/compared the same
858 	in MySQL and InnoDB, so we must create a buffer containing
859 	the INT key parts in InnoDB format.We need two such buffers
860 	since both start and end keys are used in records_in_range(). */
861 
862 	for (temp_index = dict_table_get_first_index(table); temp_index;
863 	     temp_index = dict_table_get_next_index(temp_index)) {
864 		DBUG_EXECUTE_IF("innodb_srch_key_buffer_max_value",
865 			ut_a(temp_index->n_user_defined_cols
866 						== MAX_REF_PARTS););
867 		uint temp_len = 0;
868 		for (uint i = 0; i < temp_index->n_uniq; i++) {
869 			ulint type = temp_index->fields[i].col->mtype;
870 			if (type == DATA_INT) {
871 				temp_len +=
872 					temp_index->fields[i].fixed_len;
873 			}
874 		}
875 		srch_key_len = std::max(srch_key_len,temp_len);
876 	}
877 
878 	ut_a(srch_key_len <= MAX_SRCH_KEY_VAL_BUFFER);
879 
880 	DBUG_EXECUTE_IF("innodb_srch_key_buffer_max_value",
881 		ut_a(srch_key_len == MAX_SRCH_KEY_VAL_BUFFER););
882 
883 	/* We allocate enough space for the objects that are likely to
884 	be created later in order to minimize the number of malloc()
885 	calls */
886 	heap = mem_heap_create(PREBUILT_HEAP_INITIAL_SIZE + 2 * srch_key_len);
887 
888 	prebuilt = static_cast<row_prebuilt_t*>(
889 		mem_heap_zalloc(heap, sizeof(*prebuilt)));
890 
891 	prebuilt->magic_n = ROW_PREBUILT_ALLOCATED;
892 	prebuilt->magic_n2 = ROW_PREBUILT_ALLOCATED;
893 
894 	prebuilt->table = table;
895 
896 	prebuilt->sql_stat_start = TRUE;
897 	prebuilt->heap = heap;
898 
899 	prebuilt->srch_key_val_len = srch_key_len;
900 	if (prebuilt->srch_key_val_len) {
901 		prebuilt->srch_key_val1 = static_cast<byte*>(
902 			mem_heap_alloc(prebuilt->heap,
903 				       2 * prebuilt->srch_key_val_len));
904 		prebuilt->srch_key_val2 = prebuilt->srch_key_val1 +
905 						prebuilt->srch_key_val_len;
906 	} else {
907 		prebuilt->srch_key_val1 = NULL;
908 		prebuilt->srch_key_val2 = NULL;
909 	}
910 
911 	prebuilt->pcur = static_cast<btr_pcur_t*>(
912 				mem_heap_zalloc(prebuilt->heap,
913 					       sizeof(btr_pcur_t)));
914 	prebuilt->clust_pcur = static_cast<btr_pcur_t*>(
915 					mem_heap_zalloc(prebuilt->heap,
916 						       sizeof(btr_pcur_t)));
917 	btr_pcur_reset(prebuilt->pcur);
918 	btr_pcur_reset(prebuilt->clust_pcur);
919 
920 	prebuilt->select_lock_type = LOCK_NONE;
921 	prebuilt->stored_select_lock_type = LOCK_NONE_UNSET;
922 
923 	prebuilt->search_tuple = dtuple_create(heap, search_tuple_n_fields);
924 
925 	ref = dtuple_create(heap, ref_len);
926 
927 	dict_index_copy_types(ref, clust_index, ref_len);
928 
929 	prebuilt->clust_ref = ref;
930 
931 	prebuilt->autoinc_error = DB_SUCCESS;
932 	prebuilt->autoinc_offset = 0;
933 
934 	/* Default to 1, we will set the actual value later in
935 	ha_innobase::get_auto_increment(). */
936 	prebuilt->autoinc_increment = 1;
937 
938 	prebuilt->autoinc_last_value = 0;
939 
940 	/* During UPDATE and DELETE we need the doc id. */
941 	prebuilt->fts_doc_id = 0;
942 
943 	prebuilt->mysql_row_len = mysql_row_len;
944 
945 	prebuilt->fts_doc_id_in_read_set = 0;
946 	prebuilt->blob_heap = NULL;
947 
948 	DBUG_RETURN(prebuilt);
949 }
950 
951 /********************************************************************//**
952 Free a prebuilt struct for a MySQL table handle. */
953 void
row_prebuilt_free(row_prebuilt_t * prebuilt,ibool dict_locked)954 row_prebuilt_free(
955 /*==============*/
956 	row_prebuilt_t*	prebuilt,	/*!< in, own: prebuilt struct */
957 	ibool		dict_locked)	/*!< in: TRUE=data dictionary locked */
958 {
959 	DBUG_ENTER("row_prebuilt_free");
960 
961 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
962 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
963 
964 	prebuilt->magic_n = ROW_PREBUILT_FREED;
965 	prebuilt->magic_n2 = ROW_PREBUILT_FREED;
966 
967 	btr_pcur_reset(prebuilt->pcur);
968 	btr_pcur_reset(prebuilt->clust_pcur);
969 
970 	ut_free(prebuilt->mysql_template);
971 
972 	if (prebuilt->ins_graph) {
973 		que_graph_free_recursive(prebuilt->ins_graph);
974 	}
975 
976 	if (prebuilt->sel_graph) {
977 		que_graph_free_recursive(prebuilt->sel_graph);
978 	}
979 
980 	if (prebuilt->upd_graph) {
981 		que_graph_free_recursive(prebuilt->upd_graph);
982 	}
983 
984 	if (prebuilt->blob_heap) {
985 		row_mysql_prebuilt_free_blob_heap(prebuilt);
986 	}
987 
988 	if (prebuilt->old_vers_heap) {
989 		mem_heap_free(prebuilt->old_vers_heap);
990 	}
991 
992 	if (prebuilt->fetch_cache[0] != NULL) {
993 		byte*	base = prebuilt->fetch_cache[0] - 4;
994 		byte*	ptr = base;
995 
996 		for (ulint i = 0; i < MYSQL_FETCH_CACHE_SIZE; i++) {
997 			ulint	magic1 = mach_read_from_4(ptr);
998 			ut_a(magic1 == ROW_PREBUILT_FETCH_MAGIC_N);
999 			ptr += 4;
1000 
1001 			byte*	row = ptr;
1002 			ut_a(row == prebuilt->fetch_cache[i]);
1003 			ptr += prebuilt->mysql_row_len;
1004 
1005 			ulint	magic2 = mach_read_from_4(ptr);
1006 			ut_a(magic2 == ROW_PREBUILT_FETCH_MAGIC_N);
1007 			ptr += 4;
1008 		}
1009 
1010 		ut_free(base);
1011 	}
1012 
1013 	if (prebuilt->rtr_info) {
1014 		rtr_clean_rtr_info(prebuilt->rtr_info, true);
1015 	}
1016 	if (prebuilt->table) {
1017 		dict_table_close(prebuilt->table, dict_locked, FALSE);
1018 	}
1019 
1020 	mem_heap_free(prebuilt->heap);
1021 
1022 	DBUG_VOID_RETURN;
1023 }
1024 
1025 /*********************************************************************//**
1026 Updates the transaction pointers in query graphs stored in the prebuilt
1027 struct. */
1028 void
row_update_prebuilt_trx(row_prebuilt_t * prebuilt,trx_t * trx)1029 row_update_prebuilt_trx(
1030 /*====================*/
1031 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt struct
1032 					in MySQL handle */
1033 	trx_t*		trx)		/*!< in: transaction handle */
1034 {
1035 	ut_a(trx->magic_n == TRX_MAGIC_N);
1036 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
1037 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
1038 
1039 	prebuilt->trx = trx;
1040 
1041 	if (prebuilt->ins_graph) {
1042 		prebuilt->ins_graph->trx = trx;
1043 	}
1044 
1045 	if (prebuilt->upd_graph) {
1046 		prebuilt->upd_graph->trx = trx;
1047 	}
1048 
1049 	if (prebuilt->sel_graph) {
1050 		prebuilt->sel_graph->trx = trx;
1051 	}
1052 }
1053 
1054 /*********************************************************************//**
1055 Gets pointer to a prebuilt dtuple used in insertions. If the insert graph
1056 has not yet been built in the prebuilt struct, then this function first
1057 builds it.
1058 @return prebuilt dtuple; the column type information is also set in it */
1059 static
1060 dtuple_t*
row_get_prebuilt_insert_row(row_prebuilt_t * prebuilt)1061 row_get_prebuilt_insert_row(
1062 /*========================*/
1063 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL
1064 					handle */
1065 {
1066 	dict_table_t*		table	= prebuilt->table;
1067 
1068 	ut_ad(prebuilt && table && prebuilt->trx);
1069 
1070 	if (prebuilt->ins_node != 0) {
1071 
1072 		/* Check if indexes have been dropped or added and we
1073 		may need to rebuild the row insert template. */
1074 
1075 		if (prebuilt->trx_id == table->def_trx_id
1076 		    && prebuilt->ins_node->entry_list.size()
1077 		    == UT_LIST_GET_LEN(table->indexes)) {
1078 
1079 			return(prebuilt->ins_node->row);
1080 		}
1081 
1082 		ut_ad(prebuilt->trx_id < table->def_trx_id);
1083 
1084 		que_graph_free_recursive(prebuilt->ins_graph);
1085 
1086 		prebuilt->ins_graph = 0;
1087 	}
1088 
1089 	/* Create an insert node and query graph to the prebuilt struct */
1090 
1091 	ins_node_t*		node;
1092 
1093 	node = ins_node_create(INS_DIRECT, table, prebuilt->heap);
1094 
1095 	prebuilt->ins_node = node;
1096 
1097 	if (prebuilt->ins_upd_rec_buff == 0) {
1098 		prebuilt->ins_upd_rec_buff = static_cast<byte*>(
1099 			mem_heap_alloc(
1100 				prebuilt->heap,
1101 				DICT_TF2_FLAG_IS_SET(prebuilt->table,
1102 						     DICT_TF2_FTS_HAS_DOC_ID)
1103 				? prebuilt->mysql_row_len + 8/* FTS_DOC_ID */
1104 				: prebuilt->mysql_row_len));
1105 	}
1106 
1107 	dtuple_t*	row;
1108 
1109 	row = dtuple_create_with_vcol(
1110 			prebuilt->heap, dict_table_get_n_cols(table),
1111 			dict_table_get_n_v_cols(table));
1112 
1113 	dict_table_copy_types(row, table);
1114 
1115 	ins_node_set_new_row(node, row);
1116 
1117 	prebuilt->ins_graph = static_cast<que_fork_t*>(
1118 		que_node_get_parent(
1119 			pars_complete_graph_for_exec(
1120 				node,
1121 				prebuilt->trx, prebuilt->heap, prebuilt)));
1122 
1123 	prebuilt->ins_graph->state = QUE_FORK_ACTIVE;
1124 
1125 	prebuilt->trx_id = table->def_trx_id;
1126 
1127 	return(prebuilt->ins_node->row);
1128 }
1129 
1130 /*********************************************************************//**
1131 Sets an AUTO_INC type lock on the table mentioned in prebuilt. The
1132 AUTO_INC lock gives exclusive access to the auto-inc counter of the
1133 table. The lock is reserved only for the duration of an SQL statement.
1134 It is not compatible with another AUTO_INC or exclusive lock on the
1135 table.
1136 @return error code or DB_SUCCESS */
1137 dberr_t
row_lock_table_autoinc_for_mysql(row_prebuilt_t * prebuilt)1138 row_lock_table_autoinc_for_mysql(
1139 /*=============================*/
1140 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in the MySQL
1141 					table handle */
1142 {
1143 	trx_t*			trx	= prebuilt->trx;
1144 	ins_node_t*		node	= prebuilt->ins_node;
1145 	const dict_table_t*	table	= prebuilt->table;
1146 	que_thr_t*		thr;
1147 	dberr_t			err;
1148 	ibool			was_lock_wait;
1149 
1150 	/* If we already hold an AUTOINC lock on the table then do nothing.
1151 	Note: We peek at the value of the current owner without acquiring
1152 	the lock mutex. */
1153 	if (trx == table->autoinc_trx) {
1154 
1155 		return(DB_SUCCESS);
1156 	}
1157 
1158 	trx->op_info = "setting auto-inc lock";
1159 
1160 	row_get_prebuilt_insert_row(prebuilt);
1161 	node = prebuilt->ins_node;
1162 
1163 	/* We use the insert query graph as the dummy graph needed
1164 	in the lock module call */
1165 
1166 	thr = que_fork_get_first_thr(prebuilt->ins_graph);
1167 
1168 	thr->start_running();
1169 
1170 run_again:
1171 	thr->run_node = node;
1172 	thr->prev_node = node;
1173 
1174 	/* It may be that the current session has not yet started
1175 	its transaction, or it has been committed: */
1176 
1177 	trx_start_if_not_started_xa(trx, true);
1178 
1179 	err = lock_table(0, prebuilt->table, LOCK_AUTO_INC, thr);
1180 
1181 	trx->error_state = err;
1182 
1183 	if (err != DB_SUCCESS) {
1184 		que_thr_stop_for_mysql(thr);
1185 
1186 		was_lock_wait = row_mysql_handle_errors(&err, trx, thr, NULL);
1187 
1188 		if (was_lock_wait) {
1189 			goto run_again;
1190 		}
1191 
1192 		trx->op_info = "";
1193 
1194 		return(err);
1195 	}
1196 
1197 	thr->stop_no_error();
1198 
1199 	trx->op_info = "";
1200 
1201 	return(err);
1202 }
1203 
1204 /** Lock a table.
1205 @param[in,out]	prebuilt	table handle
1206 @return error code or DB_SUCCESS */
1207 dberr_t
row_lock_table(row_prebuilt_t * prebuilt)1208 row_lock_table(row_prebuilt_t* prebuilt)
1209 {
1210 	trx_t*		trx		= prebuilt->trx;
1211 	que_thr_t*	thr;
1212 	dberr_t		err;
1213 	ibool		was_lock_wait;
1214 
1215 	trx->op_info = "setting table lock";
1216 
1217 	if (prebuilt->sel_graph == NULL) {
1218 		/* Build a dummy select query graph */
1219 		row_prebuild_sel_graph(prebuilt);
1220 	}
1221 
1222 	/* We use the select query graph as the dummy graph needed
1223 	in the lock module call */
1224 
1225 	thr = que_fork_get_first_thr(prebuilt->sel_graph);
1226 
1227 	thr->start_running();
1228 
1229 run_again:
1230 	thr->run_node = thr;
1231 	thr->prev_node = thr->common.parent;
1232 
1233 	/* It may be that the current session has not yet started
1234 	its transaction, or it has been committed: */
1235 
1236 	trx_start_if_not_started_xa(trx, false);
1237 
1238 	err = lock_table(0, prebuilt->table,
1239 			 static_cast<enum lock_mode>(
1240 				 prebuilt->select_lock_type),
1241 			 thr);
1242 
1243 	trx->error_state = err;
1244 
1245 	if (err != DB_SUCCESS) {
1246 		que_thr_stop_for_mysql(thr);
1247 
1248 		was_lock_wait = row_mysql_handle_errors(&err, trx, thr, NULL);
1249 
1250 		if (was_lock_wait) {
1251 			goto run_again;
1252 		}
1253 
1254 		trx->op_info = "";
1255 
1256 		return(err);
1257 	}
1258 
1259 	thr->stop_no_error();
1260 
1261 	trx->op_info = "";
1262 
1263 	return(err);
1264 }
1265 
1266 /** Determine is tablespace encrypted but decryption failed, is table corrupted
1267 or is tablespace .ibd file missing.
1268 @param[in]	table		Table
1269 @param[in]	trx		Transaction
1270 @param[in]	push_warning	true if we should push warning to user
1271 @retval	DB_DECRYPTION_FAILED	table is encrypted but decryption failed
1272 @retval	DB_CORRUPTION		table is corrupted
1273 @retval	DB_TABLESPACE_NOT_FOUND	tablespace .ibd file not found */
1274 static
1275 dberr_t
row_mysql_get_table_status(const dict_table_t * table,trx_t * trx,bool push_warning=true)1276 row_mysql_get_table_status(
1277 	const dict_table_t*	table,
1278 	trx_t*			trx,
1279 	bool 			push_warning = true)
1280 {
1281 	dberr_t err;
1282 	if (const fil_space_t* space = table->space) {
1283 		if (space->crypt_data && space->crypt_data->is_encrypted()) {
1284 			// maybe we cannot access the table due to failing
1285 			// to decrypt
1286 			if (push_warning) {
1287 				ib_push_warning(trx, DB_DECRYPTION_FAILED,
1288 					"Table %s in tablespace %lu encrypted."
1289 					"However key management plugin or used key_id is not found or"
1290 					" used encryption algorithm or method does not match.",
1291 					table->name.m_name, table->space);
1292 			}
1293 
1294 			err = DB_DECRYPTION_FAILED;
1295 		} else {
1296 			if (push_warning) {
1297 				ib_push_warning(trx, DB_CORRUPTION,
1298 					"Table %s in tablespace %lu corrupted.",
1299 					table->name.m_name, table->space);
1300 			}
1301 
1302 			err = DB_CORRUPTION;
1303 		}
1304 	} else {
1305 		ib::error() << ".ibd file is missing for table "
1306 			<< table->name;
1307 		err = DB_TABLESPACE_NOT_FOUND;
1308 	}
1309 
1310 	return(err);
1311 }
1312 
1313 /** Does an insert for MySQL.
1314 @param[in]	mysql_rec	row in the MySQL format
1315 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
1316 @return error code or DB_SUCCESS */
1317 dberr_t
row_insert_for_mysql(const byte * mysql_rec,row_prebuilt_t * prebuilt,ins_mode_t ins_mode)1318 row_insert_for_mysql(
1319 	const byte*	mysql_rec,
1320 	row_prebuilt_t*	prebuilt,
1321 	ins_mode_t	ins_mode)
1322 {
1323 	trx_savept_t	savept;
1324 	que_thr_t*	thr;
1325 	dberr_t		err;
1326 	ibool		was_lock_wait;
1327 	trx_t*		trx		= prebuilt->trx;
1328 	ins_node_t*	node		= prebuilt->ins_node;
1329 	dict_table_t*	table		= prebuilt->table;
1330 
1331 	/* FIX_ME: This blob heap is used to compensate an issue in server
1332 	for virtual column blob handling */
1333 	mem_heap_t*	blob_heap = NULL;
1334 
1335 	ut_ad(trx);
1336 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
1337 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
1338 
1339 	if (!prebuilt->table->space) {
1340 
1341 		ib::error() << "The table " << prebuilt->table->name
1342 			<< " doesn't have a corresponding tablespace, it was"
1343 			" discarded.";
1344 
1345 		return(DB_TABLESPACE_DELETED);
1346 
1347 	} else if (!prebuilt->table->is_readable()) {
1348 		return(row_mysql_get_table_status(prebuilt->table, trx, true));
1349 	} else if (high_level_read_only) {
1350 		return(DB_READ_ONLY);
1351 	}
1352 
1353 	DBUG_EXECUTE_IF("mark_table_corrupted", {
1354 		/* Mark the table corrupted for the clustered index */
1355 		dict_index_t*	index = dict_table_get_first_index(table);
1356 		ut_ad(dict_index_is_clust(index));
1357 		dict_set_corrupted(index, trx, "INSERT TABLE"); });
1358 
1359 	if (dict_table_is_corrupted(table)) {
1360 
1361 		ib::error() << "Table " << table->name << " is corrupt.";
1362 		return(DB_TABLE_CORRUPT);
1363 	}
1364 
1365 	trx->op_info = "inserting";
1366 
1367 	row_mysql_delay_if_needed();
1368 
1369 	if (!table->no_rollback()) {
1370 		trx_start_if_not_started_xa(trx, true);
1371 	}
1372 
1373 	row_get_prebuilt_insert_row(prebuilt);
1374 	node = prebuilt->ins_node;
1375 
1376 	row_mysql_convert_row_to_innobase(node->row, prebuilt, mysql_rec,
1377 					  &blob_heap);
1378 
1379 	if (ins_mode != ROW_INS_NORMAL) {
1380           node->vers_update_end(prebuilt, ins_mode == ROW_INS_HISTORICAL);
1381         }
1382 
1383 	savept = trx_savept_take(trx);
1384 
1385 	thr = que_fork_get_first_thr(prebuilt->ins_graph);
1386 
1387 	if (prebuilt->sql_stat_start) {
1388 		node->state = INS_NODE_SET_IX_LOCK;
1389 		prebuilt->sql_stat_start = FALSE;
1390 	} else {
1391 		node->state = INS_NODE_ALLOC_ROW_ID;
1392 	}
1393 
1394 	thr->start_running();
1395 
1396 run_again:
1397 	thr->run_node = node;
1398 	thr->prev_node = node;
1399 
1400 	row_ins_step(thr);
1401 
1402 	DEBUG_SYNC_C("ib_after_row_insert_step");
1403 
1404 	err = trx->error_state;
1405 
1406 	if (err != DB_SUCCESS) {
1407 error_exit:
1408 		que_thr_stop_for_mysql(thr);
1409 
1410 		/* FIXME: What's this ? */
1411 		thr->lock_state = QUE_THR_LOCK_ROW;
1412 
1413 		was_lock_wait = row_mysql_handle_errors(
1414 			&err, trx, thr, &savept);
1415 
1416 		thr->lock_state = QUE_THR_LOCK_NOLOCK;
1417 
1418 		if (was_lock_wait) {
1419 			ut_ad(node->state == INS_NODE_INSERT_ENTRIES
1420 			      || node->state == INS_NODE_ALLOC_ROW_ID);
1421 			goto run_again;
1422 		}
1423 
1424 		trx->op_info = "";
1425 
1426 		if (blob_heap != NULL) {
1427 			mem_heap_free(blob_heap);
1428 		}
1429 
1430 		return(err);
1431 	}
1432 
1433 	if (dict_table_has_fts_index(table)) {
1434 		doc_id_t	doc_id;
1435 
1436 		/* Extract the doc id from the hidden FTS column */
1437 		doc_id = fts_get_doc_id_from_row(table, node->row);
1438 
1439 		if (doc_id <= 0) {
1440 			ib::error() << "FTS_DOC_ID must be larger than 0 for table "
1441 				    << table->name;
1442 			err = DB_FTS_INVALID_DOCID;
1443 			trx->error_state = DB_FTS_INVALID_DOCID;
1444 			goto error_exit;
1445 		}
1446 
1447 		if (!DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
1448 			doc_id_t	next_doc_id
1449 				= table->fts->cache->next_doc_id;
1450 
1451 			if (doc_id < next_doc_id) {
1452 				ib::error() << "FTS_DOC_ID must be larger than "
1453 					<< next_doc_id - 1 << " for table "
1454 					<< table->name;
1455 
1456 				err = DB_FTS_INVALID_DOCID;
1457 				trx->error_state = DB_FTS_INVALID_DOCID;
1458 				goto error_exit;
1459 			}
1460 		}
1461 
1462 		if (table->skip_alter_undo) {
1463 			if (trx->fts_trx == NULL) {
1464 				trx->fts_trx = fts_trx_create(trx);
1465 			}
1466 
1467 			fts_trx_table_t ftt;
1468 			ftt.table = table;
1469 			ftt.fts_trx = trx->fts_trx;
1470 
1471 			fts_add_doc_from_tuple(&ftt, doc_id, node->row);
1472 		} else {
1473 			/* Pass NULL for the columns affected, since an INSERT affects
1474 			all FTS indexes. */
1475 			fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
1476 		}
1477 	}
1478 
1479 	thr->stop_no_error();
1480 
1481 	if (table->is_system_db) {
1482 		srv_stats.n_system_rows_inserted.inc(size_t(trx->id));
1483 	} else {
1484 		srv_stats.n_rows_inserted.inc(size_t(trx->id));
1485 	}
1486 
1487 	/* Not protected by dict_sys.mutex for performance
1488 	reasons, we would rather get garbage in stat_n_rows (which is
1489 	just an estimate anyway) than protecting the following code
1490 	with a latch. */
1491 	dict_table_n_rows_inc(table);
1492 
1493 	if (prebuilt->clust_index_was_generated) {
1494 		/* set row id to prebuilt */
1495 		memcpy(prebuilt->row_id, node->sys_buf, DATA_ROW_ID_LEN);
1496 	}
1497 
1498 	dict_stats_update_if_needed(table, *trx);
1499 	trx->op_info = "";
1500 
1501 	if (blob_heap != NULL) {
1502 		mem_heap_free(blob_heap);
1503 	}
1504 
1505 	return(err);
1506 }
1507 
1508 /*********************************************************************//**
1509 Builds a dummy query graph used in selects. */
1510 void
row_prebuild_sel_graph(row_prebuilt_t * prebuilt)1511 row_prebuild_sel_graph(
1512 /*===================*/
1513 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL
1514 					handle */
1515 {
1516 	sel_node_t*	node;
1517 
1518 	ut_ad(prebuilt && prebuilt->trx);
1519 
1520 	if (prebuilt->sel_graph == NULL) {
1521 
1522 		node = sel_node_create(prebuilt->heap);
1523 
1524 		prebuilt->sel_graph = static_cast<que_fork_t*>(
1525 			que_node_get_parent(
1526 				pars_complete_graph_for_exec(
1527 					static_cast<sel_node_t*>(node),
1528 					prebuilt->trx, prebuilt->heap,
1529 					prebuilt)));
1530 
1531 		prebuilt->sel_graph->state = QUE_FORK_ACTIVE;
1532 	}
1533 }
1534 
1535 /*********************************************************************//**
1536 Creates an query graph node of 'update' type to be used in the MySQL
1537 interface.
1538 @return own: update node */
1539 upd_node_t*
row_create_update_node_for_mysql(dict_table_t * table,mem_heap_t * heap)1540 row_create_update_node_for_mysql(
1541 /*=============================*/
1542 	dict_table_t*	table,	/*!< in: table to update */
1543 	mem_heap_t*	heap)	/*!< in: mem heap from which allocated */
1544 {
1545 	upd_node_t*	node;
1546 
1547 	DBUG_ENTER("row_create_update_node_for_mysql");
1548 
1549 	node = upd_node_create(heap);
1550 
1551 	node->in_mysql_interface = true;
1552 	node->is_delete = NO_DELETE;
1553 	node->searched_update = FALSE;
1554 	node->select = NULL;
1555 	node->pcur = btr_pcur_create_for_mysql();
1556 
1557 	DBUG_PRINT("info", ("node: %p, pcur: %p", node, node->pcur));
1558 
1559 	node->table = table;
1560 
1561 	node->update = upd_create(dict_table_get_n_cols(table)
1562 				  + dict_table_get_n_v_cols(table), heap);
1563 
1564 	node->update_n_fields = dict_table_get_n_cols(table);
1565 
1566 	UT_LIST_INIT(node->columns, &sym_node_t::col_var_list);
1567 
1568 	node->has_clust_rec_x_lock = TRUE;
1569 	node->cmpl_info = 0;
1570 
1571 	node->table_sym = NULL;
1572 	node->col_assign_list = NULL;
1573 
1574 	DBUG_RETURN(node);
1575 }
1576 
1577 /*********************************************************************//**
1578 Gets pointer to a prebuilt update vector used in updates. If the update
1579 graph has not yet been built in the prebuilt struct, then this function
1580 first builds it.
1581 @return prebuilt update vector */
1582 upd_t*
row_get_prebuilt_update_vector(row_prebuilt_t * prebuilt)1583 row_get_prebuilt_update_vector(
1584 /*===========================*/
1585 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL
1586 					handle */
1587 {
1588 	if (prebuilt->upd_node == NULL) {
1589 
1590 		/* Not called before for this handle: create an update node
1591 		and query graph to the prebuilt struct */
1592 
1593 		prebuilt->upd_node = row_create_update_node_for_mysql(
1594 			prebuilt->table, prebuilt->heap);
1595 
1596 		prebuilt->upd_graph = static_cast<que_fork_t*>(
1597 			que_node_get_parent(
1598 				pars_complete_graph_for_exec(
1599 					prebuilt->upd_node,
1600 					prebuilt->trx, prebuilt->heap,
1601 					prebuilt)));
1602 
1603 		prebuilt->upd_graph->state = QUE_FORK_ACTIVE;
1604 	}
1605 
1606 	return(prebuilt->upd_node->update);
1607 }
1608 
1609 /********************************************************************
1610 Handle an update of a column that has an FTS index. */
1611 static
1612 void
row_fts_do_update(trx_t * trx,dict_table_t * table,doc_id_t old_doc_id,doc_id_t new_doc_id)1613 row_fts_do_update(
1614 /*==============*/
1615 	trx_t*		trx,		/* in: transaction */
1616 	dict_table_t*	table,		/* in: Table with FTS index */
1617 	doc_id_t	old_doc_id,	/* in: old document id */
1618 	doc_id_t	new_doc_id)	/* in: new document id */
1619 {
1620 	if(trx->fts_next_doc_id) {
1621 		fts_trx_add_op(trx, table, old_doc_id, FTS_DELETE, NULL);
1622 		if(new_doc_id != FTS_NULL_DOC_ID)
1623 		fts_trx_add_op(trx, table, new_doc_id, FTS_INSERT, NULL);
1624 	}
1625 }
1626 
1627 /************************************************************************
1628 Handles FTS matters for an update or a delete.
1629 NOTE: should not be called if the table does not have an FTS index. .*/
1630 static
1631 dberr_t
row_fts_update_or_delete(row_prebuilt_t * prebuilt)1632 row_fts_update_or_delete(
1633 /*=====================*/
1634 	row_prebuilt_t*	prebuilt)	/* in: prebuilt struct in MySQL
1635 					handle */
1636 {
1637 	trx_t*		trx = prebuilt->trx;
1638 	dict_table_t*	table = prebuilt->table;
1639 	upd_node_t*	node = prebuilt->upd_node;
1640 	doc_id_t	old_doc_id = prebuilt->fts_doc_id;
1641 
1642 	DBUG_ENTER("row_fts_update_or_delete");
1643 
1644 	ut_a(dict_table_has_fts_index(prebuilt->table));
1645 
1646 	/* Deletes are simple; get them out of the way first. */
1647 	if (node->is_delete == PLAIN_DELETE) {
1648 		/* A delete affects all FTS indexes, so we pass NULL */
1649 		fts_trx_add_op(trx, table, old_doc_id, FTS_DELETE, NULL);
1650 	} else {
1651 		doc_id_t	new_doc_id;
1652 		new_doc_id = fts_read_doc_id((byte*) &trx->fts_next_doc_id);
1653 
1654 		if (new_doc_id == 0) {
1655 			ib::error() << "InnoDB FTS: Doc ID cannot be 0";
1656 			return(DB_FTS_INVALID_DOCID);
1657 		}
1658 		row_fts_do_update(trx, table, old_doc_id, new_doc_id);
1659 	}
1660 
1661 	DBUG_RETURN(DB_SUCCESS);
1662 }
1663 
1664 /*********************************************************************//**
1665 Initialize the Doc ID system for FK table with FTS index */
1666 static
1667 void
init_fts_doc_id_for_ref(dict_table_t * table,ulint * depth)1668 init_fts_doc_id_for_ref(
1669 /*====================*/
1670 	dict_table_t*	table,		/*!< in: table */
1671 	ulint*		depth)		/*!< in: recusive call depth */
1672 {
1673 	dict_foreign_t* foreign;
1674 
1675 	table->fk_max_recusive_level = 0;
1676 
1677 	(*depth)++;
1678 
1679 	/* Limit on tables involved in cascading delete/update */
1680 	if (*depth > FK_MAX_CASCADE_DEL) {
1681 		return;
1682 	}
1683 
1684 	/* Loop through this table's referenced list and also
1685 	recursively traverse each table's foreign table list */
1686 	for (dict_foreign_set::iterator it = table->referenced_set.begin();
1687 	     it != table->referenced_set.end();
1688 	     ++it) {
1689 
1690 		foreign = *it;
1691 
1692 		ut_ad(foreign->foreign_table != NULL);
1693 
1694 		if (foreign->foreign_table->fts != NULL) {
1695 			fts_init_doc_id(foreign->foreign_table);
1696 		}
1697 
1698 		if (!foreign->foreign_table->referenced_set.empty()
1699 		    && foreign->foreign_table != table) {
1700 			init_fts_doc_id_for_ref(
1701 				foreign->foreign_table, depth);
1702 		}
1703 	}
1704 }
1705 
1706 /** Does an update or delete of a row for MySQL.
1707 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
1708 @return error code or DB_SUCCESS */
1709 dberr_t
row_update_for_mysql(row_prebuilt_t * prebuilt)1710 row_update_for_mysql(row_prebuilt_t* prebuilt)
1711 {
1712 	trx_savept_t	savept;
1713 	dberr_t		err;
1714 	que_thr_t*	thr;
1715 	dict_index_t*	clust_index;
1716 	upd_node_t*	node;
1717 	dict_table_t*	table		= prebuilt->table;
1718 	trx_t*		trx		= prebuilt->trx;
1719 	ulint		fk_depth	= 0;
1720 	bool		got_s_lock	= false;
1721 
1722 	DBUG_ENTER("row_update_for_mysql");
1723 
1724 	ut_ad(trx);
1725 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
1726 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
1727 	ut_a(prebuilt->template_type == ROW_MYSQL_WHOLE_ROW);
1728 	ut_ad(table->stat_initialized);
1729 
1730 	if (!table->is_readable()) {
1731 		return(row_mysql_get_table_status(table, trx, true));
1732 	}
1733 
1734 	if (high_level_read_only) {
1735 		return(DB_READ_ONLY);
1736 	}
1737 
1738 	DEBUG_SYNC_C("innodb_row_update_for_mysql_begin");
1739 
1740 	trx->op_info = "updating or deleting";
1741 
1742 	row_mysql_delay_if_needed();
1743 
1744 	init_fts_doc_id_for_ref(table, &fk_depth);
1745 
1746 	if (!table->no_rollback()) {
1747 		trx_start_if_not_started_xa(trx, true);
1748 	}
1749 
1750 	if (dict_table_is_referenced_by_foreign_key(table)) {
1751 		/* Share lock the data dictionary to prevent any
1752 		table dictionary (for foreign constraint) change.
1753 		This is similar to row_ins_check_foreign_constraint
1754 		check protect by the dictionary lock as well.
1755 		In the future, this can be removed once the Foreign
1756 		key MDL is implemented */
1757 		row_mysql_freeze_data_dictionary(trx);
1758 		init_fts_doc_id_for_ref(table, &fk_depth);
1759 		row_mysql_unfreeze_data_dictionary(trx);
1760 	}
1761 
1762 	node = prebuilt->upd_node;
1763 	const bool is_delete = node->is_delete == PLAIN_DELETE;
1764 	ut_ad(node->table == table);
1765 
1766 	clust_index = dict_table_get_first_index(table);
1767 
1768 	btr_pcur_copy_stored_position(node->pcur,
1769 				      prebuilt->pcur->btr_cur.index
1770 				      == clust_index
1771 				      ? prebuilt->pcur
1772 				      : prebuilt->clust_pcur);
1773 
1774 	ut_a(node->pcur->rel_pos == BTR_PCUR_ON);
1775 
1776 	/* MySQL seems to call rnd_pos before updating each row it
1777 	has cached: we can get the correct cursor position from
1778 	prebuilt->pcur; NOTE that we cannot build the row reference
1779 	from mysql_rec if the clustered index was automatically
1780 	generated for the table: MySQL does not know anything about
1781 	the row id used as the clustered index key */
1782 
1783 	savept = trx_savept_take(trx);
1784 
1785 	thr = que_fork_get_first_thr(prebuilt->upd_graph);
1786 
1787 	node->state = UPD_NODE_UPDATE_CLUSTERED;
1788 
1789 	ut_ad(!prebuilt->sql_stat_start);
1790 
1791 	thr->start_running();
1792 
1793 	ut_ad(!prebuilt->versioned_write || node->table->versioned());
1794 
1795 	if (prebuilt->versioned_write) {
1796 		if (node->is_delete == VERSIONED_DELETE) {
1797                   node->vers_make_delete(trx);
1798                 } else if (node->update->affects_versioned()) {
1799                   node->vers_make_update(trx);
1800                 }
1801 	}
1802 
1803 	for (;;) {
1804 		thr->run_node = node;
1805 		thr->prev_node = node;
1806 		thr->fk_cascade_depth = 0;
1807 
1808 		row_upd_step(thr);
1809 
1810 		err = trx->error_state;
1811 
1812 		if (err == DB_SUCCESS) {
1813 			break;
1814 		}
1815 
1816 		que_thr_stop_for_mysql(thr);
1817 
1818 		if (err == DB_RECORD_NOT_FOUND) {
1819 			trx->error_state = DB_SUCCESS;
1820 			goto error;
1821 		}
1822 
1823 		thr->lock_state= QUE_THR_LOCK_ROW;
1824 
1825 		DEBUG_SYNC(trx->mysql_thd, "row_update_for_mysql_error");
1826 
1827 		bool was_lock_wait = row_mysql_handle_errors(
1828 			&err, trx, thr, &savept);
1829 		thr->lock_state= QUE_THR_LOCK_NOLOCK;
1830 
1831 		if (!was_lock_wait) {
1832 			goto error;
1833 		}
1834 	}
1835 
1836 	thr->stop_no_error();
1837 
1838 	if (dict_table_has_fts_index(table)
1839 	    && trx->fts_next_doc_id != UINT64_UNDEFINED) {
1840 		err = row_fts_update_or_delete(prebuilt);
1841 		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
1842 			ut_ad("unexpected error" == 0);
1843 			goto error;
1844 		}
1845 	}
1846 
1847 	/* Completed cascading operations (if any) */
1848 	if (got_s_lock) {
1849 		row_mysql_unfreeze_data_dictionary(trx);
1850 	}
1851 
1852 	bool	update_statistics;
1853 	ut_ad(is_delete == (node->is_delete == PLAIN_DELETE));
1854 
1855 	if (is_delete) {
1856 		/* Not protected by dict_sys.mutex for performance
1857 		reasons, we would rather get garbage in stat_n_rows (which is
1858 		just an estimate anyway) than protecting the following code
1859 		with a latch. */
1860 		dict_table_n_rows_dec(prebuilt->table);
1861 
1862 		if (table->is_system_db) {
1863 			srv_stats.n_system_rows_deleted.inc(size_t(trx->id));
1864 		} else {
1865 			srv_stats.n_rows_deleted.inc(size_t(trx->id));
1866 		}
1867 
1868 		update_statistics = !srv_stats_include_delete_marked;
1869 	} else {
1870 		if (table->is_system_db) {
1871 			srv_stats.n_system_rows_updated.inc(size_t(trx->id));
1872 		} else {
1873 			srv_stats.n_rows_updated.inc(size_t(trx->id));
1874 		}
1875 
1876 		update_statistics
1877 			= !(node->cmpl_info & UPD_NODE_NO_ORD_CHANGE);
1878 	}
1879 
1880 	if (update_statistics) {
1881 		dict_stats_update_if_needed(prebuilt->table, *trx);
1882 	} else {
1883 		/* Always update the table modification counter. */
1884 		prebuilt->table->stat_modified_counter++;
1885 	}
1886 
1887 	trx->op_info = "";
1888 
1889 	DBUG_RETURN(err);
1890 
1891 error:
1892 	trx->op_info = "";
1893 	if (got_s_lock) {
1894 		row_mysql_unfreeze_data_dictionary(trx);
1895 	}
1896 
1897 	DBUG_RETURN(err);
1898 }
1899 
1900 /** This can only be used when the current transaction is at
1901 READ COMMITTED or READ UNCOMMITTED isolation level.
1902 Before calling this function row_search_for_mysql() must have
1903 initialized prebuilt->new_rec_locks to store the information which new
1904 record locks really were set. This function removes a newly set
1905 clustered index record lock under prebuilt->pcur or
1906 prebuilt->clust_pcur.  Thus, this implements a 'mini-rollback' that
1907 releases the latest clustered index record lock we set.
1908 @param[in,out]	prebuilt		prebuilt struct in MySQL handle
1909 @param[in]	has_latches_on_recs	TRUE if called so that we have the
1910 					latches on the records under pcur
1911 					and clust_pcur, and we do not need
1912 					to reposition the cursors. */
1913 void
row_unlock_for_mysql(row_prebuilt_t * prebuilt,ibool has_latches_on_recs)1914 row_unlock_for_mysql(
1915 	row_prebuilt_t*	prebuilt,
1916 	ibool		has_latches_on_recs)
1917 {
1918 	btr_pcur_t*	pcur		= prebuilt->pcur;
1919 	btr_pcur_t*	clust_pcur	= prebuilt->clust_pcur;
1920 	trx_t*		trx		= prebuilt->trx;
1921 
1922 	ut_ad(prebuilt != NULL);
1923 	ut_ad(trx != NULL);
1924 	ut_ad(trx->isolation_level <= TRX_ISO_READ_COMMITTED);
1925 
1926 	if (dict_index_is_spatial(prebuilt->index)) {
1927 		return;
1928 	}
1929 
1930 	trx->op_info = "unlock_row";
1931 
1932 	if (prebuilt->new_rec_locks >= 1) {
1933 
1934 		const rec_t*	rec;
1935 		dict_index_t*	index;
1936 		trx_id_t	rec_trx_id;
1937 		mtr_t		mtr;
1938 
1939 		mtr_start(&mtr);
1940 
1941 		/* Restore the cursor position and find the record */
1942 
1943 		if (!has_latches_on_recs) {
1944 			btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, &mtr);
1945 		}
1946 
1947 		rec = btr_pcur_get_rec(pcur);
1948 		index = btr_pcur_get_btr_cur(pcur)->index;
1949 
1950 		if (prebuilt->new_rec_locks >= 2) {
1951 			/* Restore the cursor position and find the record
1952 			in the clustered index. */
1953 
1954 			if (!has_latches_on_recs) {
1955 				btr_pcur_restore_position(BTR_SEARCH_LEAF,
1956 							  clust_pcur, &mtr);
1957 			}
1958 
1959 			rec = btr_pcur_get_rec(clust_pcur);
1960 			index = btr_pcur_get_btr_cur(clust_pcur)->index;
1961 		}
1962 
1963 		if (!dict_index_is_clust(index)) {
1964 			/* This is not a clustered index record.  We
1965 			do not know how to unlock the record. */
1966 			goto no_unlock;
1967 		}
1968 
1969 		/* If the record has been modified by this
1970 		transaction, do not unlock it. */
1971 
1972 		if (index->trx_id_offset) {
1973 			rec_trx_id = trx_read_trx_id(rec
1974 						     + index->trx_id_offset);
1975 		} else {
1976 			mem_heap_t*	heap			= NULL;
1977 			rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
1978 			rec_offs* offsets				= offsets_;
1979 
1980 			rec_offs_init(offsets_);
1981 			offsets = rec_get_offsets(rec, index, offsets,
1982 						  index->n_core_fields,
1983 						  ULINT_UNDEFINED, &heap);
1984 
1985 			rec_trx_id = row_get_rec_trx_id(rec, index, offsets);
1986 
1987 			if (UNIV_LIKELY_NULL(heap)) {
1988 				mem_heap_free(heap);
1989 			}
1990 		}
1991 
1992 		if (rec_trx_id != trx->id) {
1993 			/* We did not update the record: unlock it */
1994 
1995 			rec = btr_pcur_get_rec(pcur);
1996 
1997 			lock_rec_unlock(
1998 				trx,
1999 				btr_pcur_get_block(pcur),
2000 				rec,
2001 				static_cast<enum lock_mode>(
2002 					prebuilt->select_lock_type));
2003 
2004 			if (prebuilt->new_rec_locks >= 2) {
2005 				rec = btr_pcur_get_rec(clust_pcur);
2006 
2007 				lock_rec_unlock(
2008 					trx,
2009 					btr_pcur_get_block(clust_pcur),
2010 					rec,
2011 					static_cast<enum lock_mode>(
2012 						prebuilt->select_lock_type));
2013 			}
2014 		}
2015 no_unlock:
2016 		mtr_commit(&mtr);
2017 	}
2018 
2019 	trx->op_info = "";
2020 }
2021 
2022 /*********************************************************************//**
2023 Locks the data dictionary in shared mode from modifications, for performing
2024 foreign key check, rollback, or other operation invisible to MySQL. */
2025 void
row_mysql_freeze_data_dictionary_func(trx_t * trx,const char * file,unsigned line)2026 row_mysql_freeze_data_dictionary_func(
2027 /*==================================*/
2028 	trx_t*		trx,	/*!< in/out: transaction */
2029 	const char*	file,	/*!< in: file name */
2030 	unsigned	line)	/*!< in: line number */
2031 {
2032 	ut_a(trx->dict_operation_lock_mode == 0);
2033 
2034 	rw_lock_s_lock_inline(&dict_sys.latch, 0, file, line);
2035 
2036 	trx->dict_operation_lock_mode = RW_S_LATCH;
2037 }
2038 
2039 /*********************************************************************//**
2040 Unlocks the data dictionary shared lock. */
2041 void
row_mysql_unfreeze_data_dictionary(trx_t * trx)2042 row_mysql_unfreeze_data_dictionary(
2043 /*===============================*/
2044 	trx_t*	trx)	/*!< in/out: transaction */
2045 {
2046 	ut_ad(lock_trx_has_sys_table_locks(trx) == NULL);
2047 
2048 	ut_a(trx->dict_operation_lock_mode == RW_S_LATCH);
2049 
2050 	rw_lock_s_unlock(&dict_sys.latch);
2051 
2052 	trx->dict_operation_lock_mode = 0;
2053 }
2054 
2055 /** Write query start time as SQL field data to a buffer. Needed by InnoDB.
2056 @param	thd	Thread object
2057 @param	buf	Buffer to hold start time data */
2058 void thd_get_query_start_data(THD *thd, char *buf);
2059 
2060 /** Insert history row when evaluating foreign key referential action.
2061 
2062 1. Create new dtuple_t 'row' from node->historical_row;
2063 2. Update its row_end to current timestamp;
2064 3. Insert it to a table;
2065 4. Update table statistics.
2066 
2067 This is used in UPDATE CASCADE/SET NULL of a system versioned referenced table.
2068 
2069 node->historical_row: dtuple_t containing pointers of row changed by refertial
2070 action.
2071 
2072 @param[in]	thr	current query thread
2073 @param[in]	node	a node which just updated a row in a foreign table
2074 @return DB_SUCCESS or some error */
row_update_vers_insert(que_thr_t * thr,upd_node_t * node)2075 static dberr_t row_update_vers_insert(que_thr_t* thr, upd_node_t* node)
2076 {
2077 	trx_t* trx = thr_get_trx(thr);
2078 	dfield_t* row_end;
2079 	char row_end_data[8];
2080 	dict_table_t* table = node->table;
2081 	const unsigned zip_size = table->space->zip_size();
2082 	ut_ad(table->versioned());
2083 
2084 	dtuple_t*       row;
2085 	const ulint     n_cols        = dict_table_get_n_cols(table);
2086 	const ulint     n_v_cols      = dict_table_get_n_v_cols(table);
2087 
2088 	ut_ad(n_cols == dtuple_get_n_fields(node->historical_row));
2089 	ut_ad(n_v_cols == dtuple_get_n_v_fields(node->historical_row));
2090 
2091 	row = dtuple_create_with_vcol(node->historical_heap, n_cols, n_v_cols);
2092 
2093 	dict_table_copy_types(row, table);
2094 
2095 	ins_node_t* insert_node =
2096 		ins_node_create(INS_DIRECT, table, node->historical_heap);
2097 
2098 	if (!insert_node) {
2099 		trx->error_state = DB_OUT_OF_MEMORY;
2100 		goto exit;
2101 	}
2102 
2103 	insert_node->common.parent = thr;
2104 	ins_node_set_new_row(insert_node, row);
2105 
2106 	ut_ad(n_cols > DATA_N_SYS_COLS);
2107 	// Exclude DB_ROW_ID, DB_TRX_ID, DB_ROLL_PTR
2108 	for (ulint i = 0; i < n_cols - DATA_N_SYS_COLS; i++) {
2109 		dfield_t *src= dtuple_get_nth_field(node->historical_row, i);
2110 		dfield_t *dst= dtuple_get_nth_field(row, i);
2111 		dfield_copy(dst, src);
2112 		if (dfield_is_ext(src)) {
2113 			byte *field_data
2114 				= static_cast<byte*>(dfield_get_data(src));
2115 			ulint ext_len;
2116 			ulint field_len = dfield_get_len(src);
2117 
2118 			ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
2119 
2120 			ut_a(memcmp(field_data + field_len
2121 				     - BTR_EXTERN_FIELD_REF_SIZE,
2122 				     field_ref_zero,
2123 				     BTR_EXTERN_FIELD_REF_SIZE));
2124 
2125 			byte *data = btr_copy_externally_stored_field(
2126 				&ext_len, field_data, zip_size, field_len,
2127 				node->historical_heap);
2128 			dfield_set_data(dst, data, ext_len);
2129 		}
2130 	}
2131 
2132 	for (ulint i = 0; i < n_v_cols; i++) {
2133 		dfield_t *dst= dtuple_get_nth_v_field(row, i);
2134 		dfield_t *src= dtuple_get_nth_v_field(node->historical_row, i);
2135 		dfield_copy(dst, src);
2136 	}
2137 
2138 	node->historical_row = NULL;
2139 
2140 	row_end = dtuple_get_nth_field(row, table->vers_end);
2141 	if (dict_table_get_nth_col(table, table->vers_end)->vers_native()) {
2142 		mach_write_to_8(row_end_data, trx->id);
2143 		dfield_set_data(row_end, row_end_data, 8);
2144 	} else {
2145 		thd_get_query_start_data(trx->mysql_thd, row_end_data);
2146 		dfield_set_data(row_end, row_end_data, 7);
2147 	}
2148 
2149 	for (;;) {
2150 		thr->run_node = insert_node;
2151 		thr->prev_node = insert_node;
2152 
2153 		row_ins_step(thr);
2154 
2155 		switch (trx->error_state) {
2156 		case DB_LOCK_WAIT:
2157 			que_thr_stop_for_mysql(thr);
2158 			lock_wait_suspend_thread(thr);
2159 
2160 			if (trx->error_state == DB_SUCCESS) {
2161 				continue;
2162 			}
2163 
2164 			/* fall through */
2165 		default:
2166 			/* Other errors are handled for the parent node. */
2167 			thr->fk_cascade_depth = 0;
2168 			goto exit;
2169 
2170 		case DB_SUCCESS:
2171 			srv_stats.n_rows_inserted.inc(
2172 				static_cast<size_t>(trx->id));
2173 			dict_stats_update_if_needed(table, *trx);
2174 			goto exit;
2175 		}
2176 	}
2177 exit:
2178 	que_graph_free_recursive(insert_node);
2179 	mem_heap_free(node->historical_heap);
2180 	node->historical_heap = NULL;
2181 	return trx->error_state;
2182 }
2183 
2184 /**********************************************************************//**
2185 Does a cascaded delete or set null in a foreign key operation.
2186 @return error code or DB_SUCCESS */
2187 dberr_t
row_update_cascade_for_mysql(que_thr_t * thr,upd_node_t * node,dict_table_t * table)2188 row_update_cascade_for_mysql(
2189 /*=========================*/
2190         que_thr_t*      thr,    /*!< in: query thread */
2191         upd_node_t*     node,   /*!< in: update node used in the cascade
2192                                 or set null operation */
2193         dict_table_t*   table)  /*!< in: table where we do the operation */
2194 {
2195         /* Increment fk_cascade_depth to record the recursive call depth on
2196         a single update/delete that affects multiple tables chained
2197         together with foreign key relations. */
2198 
2199         if (++thr->fk_cascade_depth > FK_MAX_CASCADE_DEL) {
2200                 return(DB_FOREIGN_EXCEED_MAX_CASCADE);
2201         }
2202 
2203 	const trx_t* trx = thr_get_trx(thr);
2204 
2205 	if (table->versioned()) {
2206 		if (node->is_delete == PLAIN_DELETE) {
2207                   node->vers_make_delete(trx);
2208                 } else if (node->update->affects_versioned()) {
2209 			dberr_t err = row_update_vers_insert(thr, node);
2210 			if (err != DB_SUCCESS) {
2211 				return err;
2212 			}
2213                         node->vers_make_update(trx);
2214                 }
2215 	}
2216 
2217 	for (;;) {
2218 		thr->run_node = node;
2219 		thr->prev_node = node;
2220 
2221 		DEBUG_SYNC_C("foreign_constraint_update_cascade");
2222 		{
2223 			TABLE *mysql_table = thr->prebuilt->m_mysql_table;
2224 			thr->prebuilt->m_mysql_table = NULL;
2225 			row_upd_step(thr);
2226 			thr->prebuilt->m_mysql_table = mysql_table;
2227 		}
2228 
2229 		switch (trx->error_state) {
2230 		case DB_LOCK_WAIT:
2231 			que_thr_stop_for_mysql(thr);
2232 			lock_wait_suspend_thread(thr);
2233 
2234 			if (trx->error_state == DB_SUCCESS) {
2235 				continue;
2236 			}
2237 
2238 			/* fall through */
2239 		default:
2240 			/* Other errors are handled for the parent node. */
2241 			thr->fk_cascade_depth = 0;
2242 			return trx->error_state;
2243 
2244 		case DB_SUCCESS:
2245 			thr->fk_cascade_depth = 0;
2246 			bool stats;
2247 
2248 			if (node->is_delete == PLAIN_DELETE) {
2249 				/* Not protected by dict_sys.mutex for
2250 				performance reasons, we would rather
2251 				get garbage in stat_n_rows (which is
2252 				just an estimate anyway) than
2253 				protecting the following code with a
2254 				latch. */
2255 				dict_table_n_rows_dec(node->table);
2256 
2257 				stats = !srv_stats_include_delete_marked;
2258 				srv_stats.n_rows_deleted.inc(size_t(trx->id));
2259 			} else {
2260 				stats = !(node->cmpl_info
2261 					  & UPD_NODE_NO_ORD_CHANGE);
2262 				srv_stats.n_rows_updated.inc(size_t(trx->id));
2263 			}
2264 
2265 			if (stats) {
2266 				dict_stats_update_if_needed(node->table, *trx);
2267 			} else {
2268 				/* Always update the table
2269 				modification counter. */
2270 				node->table->stat_modified_counter++;
2271 			}
2272 
2273 			return(DB_SUCCESS);
2274 		}
2275 	}
2276 }
2277 
2278 /*********************************************************************//**
2279 Locks the data dictionary exclusively for performing a table create or other
2280 data dictionary modification operation. */
2281 void
row_mysql_lock_data_dictionary_func(trx_t * trx,const char * file,unsigned line)2282 row_mysql_lock_data_dictionary_func(
2283 /*================================*/
2284 	trx_t*		trx,	/*!< in/out: transaction */
2285 	const char*	file,	/*!< in: file name */
2286 	unsigned	line)	/*!< in: line number */
2287 {
2288 	ut_a(trx->dict_operation_lock_mode == 0
2289 	     || trx->dict_operation_lock_mode == RW_X_LATCH);
2290 	dict_sys.lock(file, line);
2291 	trx->dict_operation_lock_mode = RW_X_LATCH;
2292 }
2293 
2294 /*********************************************************************//**
2295 Unlocks the data dictionary exclusive lock. */
2296 void
row_mysql_unlock_data_dictionary(trx_t * trx)2297 row_mysql_unlock_data_dictionary(
2298 /*=============================*/
2299 	trx_t*	trx)	/*!< in/out: transaction */
2300 {
2301 	ut_ad(lock_trx_has_sys_table_locks(trx) == NULL);
2302 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
2303 	trx->dict_operation_lock_mode = 0;
2304 	dict_sys.unlock();
2305 }
2306 
2307 /*********************************************************************//**
2308 Creates a table for MySQL. On failure the transaction will be rolled back
2309 and the 'table' object will be freed.
2310 @return error code or DB_SUCCESS */
2311 dberr_t
row_create_table_for_mysql(dict_table_t * table,trx_t * trx,fil_encryption_t mode,uint32_t key_id)2312 row_create_table_for_mysql(
2313 /*=======================*/
2314 	dict_table_t*	table,	/*!< in, own: table definition
2315 				(will be freed, or on DB_SUCCESS
2316 				added to the data dictionary cache) */
2317 	trx_t*		trx,	/*!< in/out: transaction */
2318 	fil_encryption_t mode,	/*!< in: encryption mode */
2319 	uint32_t	key_id)	/*!< in: encryption key_id */
2320 {
2321 	tab_node_t*	node;
2322 	mem_heap_t*	heap;
2323 	que_thr_t*	thr;
2324 	dberr_t		err;
2325 
2326 	ut_d(dict_sys.assert_locked());
2327 	ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
2328 
2329 	DBUG_EXECUTE_IF(
2330 		"ib_create_table_fail_at_start_of_row_create_table_for_mysql",
2331 		dict_mem_table_free(table);
2332 		trx->op_info = "";
2333 		return DB_ERROR;
2334 	);
2335 
2336 	trx->op_info = "creating table";
2337 
2338 	trx_start_if_not_started_xa(trx, true);
2339 
2340 	heap = mem_heap_create(512);
2341 
2342 	switch (trx_get_dict_operation(trx)) {
2343 	case TRX_DICT_OP_NONE:
2344 		trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
2345 	case TRX_DICT_OP_TABLE:
2346 		break;
2347 	case TRX_DICT_OP_INDEX:
2348 		/* If the transaction was previously flagged as
2349 		TRX_DICT_OP_INDEX, we should be creating auxiliary
2350 		tables for full-text indexes. */
2351 		ut_ad(strstr(table->name.m_name, "/FTS_") != NULL);
2352 	}
2353 
2354 	node = tab_create_graph_create(table, heap, mode, key_id);
2355 
2356 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
2357 
2358 	ut_a(thr == que_fork_start_command(
2359 			static_cast<que_fork_t*>(que_node_get_parent(thr))));
2360 
2361 	que_run_threads(thr);
2362 
2363 	err = trx->error_state;
2364 
2365 	/* Update SYS_TABLESPACES and SYS_DATAFILES if a new file-per-table
2366 	tablespace was created. */
2367 	if (err == DB_SUCCESS && dict_table_is_file_per_table(table)) {
2368 		err = dict_replace_tablespace_in_dictionary(
2369 			table->space_id, table->name.m_name,
2370 			table->space->flags,
2371 			table->space->chain.start->name, trx);
2372 
2373 		if (err != DB_SUCCESS) {
2374 
2375 			/* We must delete the link file. */
2376 			RemoteDatafile::delete_link_file(table->name.m_name);
2377 		}
2378 	}
2379 
2380 	switch (err) {
2381 	case DB_SUCCESS:
2382 		break;
2383 	case DB_OUT_OF_FILE_SPACE:
2384 		trx->error_state = DB_SUCCESS;
2385 		trx->rollback();
2386 
2387 		ib::warn() << "Cannot create table "
2388 			<< table->name
2389 			<< " because tablespace full";
2390 
2391 		if (dict_table_open_on_name(table->name.m_name, TRUE, FALSE,
2392 					    DICT_ERR_IGNORE_NONE)) {
2393 
2394 			dict_table_close_and_drop(trx, table);
2395 		} else {
2396 			dict_mem_table_free(table);
2397 		}
2398 
2399 		break;
2400 
2401 	case DB_UNSUPPORTED:
2402 	case DB_TOO_MANY_CONCURRENT_TRXS:
2403 		/* We already have .ibd file here. it should be deleted. */
2404 
2405 		if (dict_table_is_file_per_table(table)
2406 		    && fil_delete_tablespace(table->space_id) != DB_SUCCESS) {
2407 			ib::error() << "Cannot delete the file of table "
2408 				<< table->name;
2409 		}
2410 		/* fall through */
2411 
2412 	case DB_DUPLICATE_KEY:
2413 	case DB_TABLESPACE_EXISTS:
2414 	default:
2415 		trx->error_state = DB_SUCCESS;
2416 		trx->rollback();
2417 		dict_mem_table_free(table);
2418 		break;
2419 	}
2420 
2421 	que_graph_free((que_t*) que_node_get_parent(thr));
2422 
2423 	trx->op_info = "";
2424 
2425 	return(err);
2426 }
2427 
2428 /*********************************************************************//**
2429 Create an index when creating a table.
2430 On failure, the caller must drop the table!
2431 @return error number or DB_SUCCESS */
2432 dberr_t
row_create_index_for_mysql(dict_index_t * index,trx_t * trx,const ulint * field_lengths)2433 row_create_index_for_mysql(
2434 /*=======================*/
2435 	dict_index_t*	index,		/*!< in, own: index definition
2436 					(will be freed) */
2437 	trx_t*		trx,		/*!< in: transaction handle */
2438 	const ulint*	field_lengths)	/*!< in: if not NULL, must contain
2439 					dict_index_get_n_fields(index)
2440 					actual field lengths for the
2441 					index columns, which are
2442 					then checked for not being too
2443 					large. */
2444 {
2445 	ind_node_t*	node;
2446 	mem_heap_t*	heap;
2447 	que_thr_t*	thr;
2448 	dberr_t		err;
2449 	ulint		i;
2450 	ulint		len;
2451 	dict_table_t*	table = index->table;
2452 
2453 	ut_d(dict_sys.assert_locked());
2454 
2455 	for (i = 0; i < index->n_def; i++) {
2456 		/* Check that prefix_len and actual length
2457 		< DICT_MAX_INDEX_COL_LEN */
2458 
2459 		len = dict_index_get_nth_field(index, i)->prefix_len;
2460 
2461 		if (field_lengths && field_lengths[i]) {
2462 			len = ut_max(len, field_lengths[i]);
2463 		}
2464 
2465 		DBUG_EXECUTE_IF(
2466 			"ib_create_table_fail_at_create_index",
2467 			len = DICT_MAX_FIELD_LEN_BY_FORMAT(table) + 1;
2468 		);
2469 
2470 		/* Column or prefix length exceeds maximum column length */
2471 		if (len > (ulint) DICT_MAX_FIELD_LEN_BY_FORMAT(table)) {
2472 			dict_mem_index_free(index);
2473 			return DB_TOO_BIG_INDEX_COL;
2474 		}
2475 	}
2476 
2477 	trx->op_info = "creating index";
2478 
2479 	/* For temp-table we avoid insertion into SYSTEM TABLES to
2480 	maintain performance and so we have separate path that directly
2481 	just updates dictonary cache. */
2482 	if (!table->is_temporary()) {
2483 		trx_start_if_not_started_xa(trx, true);
2484 		trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
2485 		/* Note that the space id where we store the index is
2486 		inherited from the table in dict_build_index_def_step()
2487 		in dict0crea.cc. */
2488 
2489 		heap = mem_heap_create(512);
2490 		node = ind_create_graph_create(index, table->name.m_name,
2491 					       heap);
2492 
2493 		thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
2494 
2495 		ut_a(thr == que_fork_start_command(
2496 				static_cast<que_fork_t*>(
2497 					que_node_get_parent(thr))));
2498 
2499 		que_run_threads(thr);
2500 
2501 		err = trx->error_state;
2502 
2503 		index = node->index;
2504 
2505 		ut_ad(!index == (err != DB_SUCCESS));
2506 
2507 		que_graph_free((que_t*) que_node_get_parent(thr));
2508 
2509 		if (index && (index->type & DICT_FTS)) {
2510 			err = fts_create_index_tables(trx, index, table->id);
2511 		}
2512 	} else {
2513 		dict_build_index_def(table, index, trx);
2514 
2515 		err = dict_index_add_to_cache(index, FIL_NULL);
2516 		ut_ad((index == NULL) == (err != DB_SUCCESS));
2517 		if (UNIV_LIKELY(err == DB_SUCCESS)) {
2518 			ut_ad(!index->is_instant());
2519 			index->n_core_null_bytes = static_cast<uint8_t>(
2520 				UT_BITS_IN_BYTES(unsigned(index->n_nullable)));
2521 
2522 			err = dict_create_index_tree_in_mem(index, trx);
2523 #ifdef BTR_CUR_HASH_ADAPT
2524 			ut_ad(!index->search_info->ref_count);
2525 #endif /* BTR_CUR_HASH_ADAPT */
2526 
2527 			if (err != DB_SUCCESS) {
2528 				dict_index_remove_from_cache(table, index);
2529 			}
2530 		}
2531 	}
2532 
2533 	trx->op_info = "";
2534 
2535 	return(err);
2536 }
2537 
2538 /*********************************************************************//**
2539 Drops a table for MySQL as a background operation. MySQL relies on Unix
2540 in ALTER TABLE to the fact that the table handler does not remove the
2541 table before all handles to it has been removed. Furhermore, the MySQL's
2542 call to drop table must be non-blocking. Therefore we do the drop table
2543 as a background operation, which is taken care of by the master thread
2544 in srv0srv.cc.
2545 @return error code or DB_SUCCESS */
2546 static
2547 dberr_t
row_drop_table_for_mysql_in_background(const char * name)2548 row_drop_table_for_mysql_in_background(
2549 /*===================================*/
2550 	const char*	name)	/*!< in: table name */
2551 {
2552 	dberr_t	error;
2553 	trx_t*	trx;
2554 
2555 	trx = trx_create();
2556 
2557 	/* If the original transaction was dropping a table referenced by
2558 	foreign keys, we must set the following to be able to drop the
2559 	table: */
2560 
2561 	trx->check_foreigns = false;
2562 
2563 	/* Try to drop the table in InnoDB */
2564 
2565 	error = row_drop_table_for_mysql(name, trx, SQLCOM_TRUNCATE);
2566 
2567 	trx_commit_for_mysql(trx);
2568 
2569 	trx->free();
2570 
2571 	return(error);
2572 }
2573 
2574 /*********************************************************************//**
2575 The master thread in srv0srv.cc calls this regularly to drop tables which
2576 we must drop in background after queries to them have ended. Such lazy
2577 dropping of tables is needed in ALTER TABLE on Unix.
2578 @return how many tables dropped + remaining tables in list */
2579 ulint
row_drop_tables_for_mysql_in_background(void)2580 row_drop_tables_for_mysql_in_background(void)
2581 /*=========================================*/
2582 {
2583 	row_mysql_drop_t*	drop;
2584 	dict_table_t*		table;
2585 	ulint			n_tables;
2586 	ulint			n_tables_dropped = 0;
2587 loop:
2588 	mutex_enter(&row_drop_list_mutex);
2589 
2590 	ut_a(row_mysql_drop_list_inited);
2591 next:
2592 	drop = UT_LIST_GET_FIRST(row_mysql_drop_list);
2593 
2594 	n_tables = UT_LIST_GET_LEN(row_mysql_drop_list);
2595 
2596 	mutex_exit(&row_drop_list_mutex);
2597 
2598 	if (drop == NULL) {
2599 		/* All tables dropped */
2600 
2601 		return(n_tables + n_tables_dropped);
2602 	}
2603 
2604 	/* On fast shutdown, just empty the list without dropping tables. */
2605 	table = srv_shutdown_state == SRV_SHUTDOWN_NONE || !srv_fast_shutdown
2606 		? dict_table_open_on_id(drop->table_id, FALSE,
2607 					DICT_TABLE_OP_OPEN_ONLY_IF_CACHED)
2608 		: NULL;
2609 
2610 	if (!table) {
2611 		n_tables_dropped++;
2612 		mutex_enter(&row_drop_list_mutex);
2613 		UT_LIST_REMOVE(row_mysql_drop_list, drop);
2614 		MONITOR_DEC(MONITOR_BACKGROUND_DROP_TABLE);
2615 		ut_free(drop);
2616 		goto next;
2617 	}
2618 
2619 	ut_a(!table->can_be_evicted);
2620 
2621 	bool skip = false;
2622 
2623 	if (!table->to_be_dropped) {
2624 skip:
2625 		dict_table_close(table, FALSE, FALSE);
2626 
2627 		mutex_enter(&row_drop_list_mutex);
2628 		UT_LIST_REMOVE(row_mysql_drop_list, drop);
2629 		if (!skip) {
2630 			UT_LIST_ADD_LAST(row_mysql_drop_list, drop);
2631 		} else {
2632 			ut_free(drop);
2633 		}
2634 		goto next;
2635 	}
2636 
2637 	if (!srv_fast_shutdown && !trx_sys.any_active_transactions()) {
2638 		lock_mutex_enter();
2639 		skip = UT_LIST_GET_LEN(table->locks) != 0;
2640 		lock_mutex_exit();
2641 		if (skip) {
2642 			/* We cannot drop tables that are locked by XA
2643 			PREPARE transactions. */
2644 			goto skip;
2645 		}
2646 	}
2647 
2648 	char* name = mem_strdup(table->name.m_name);
2649 
2650 	dict_table_close(table, FALSE, FALSE);
2651 
2652 	dberr_t err = row_drop_table_for_mysql_in_background(name);
2653 
2654 	ut_free(name);
2655 
2656 	if (err != DB_SUCCESS) {
2657 		/* If the DROP fails for some table, we return, and let the
2658 		main thread retry later */
2659 		return(n_tables + n_tables_dropped);
2660 	}
2661 
2662 	goto loop;
2663 }
2664 
2665 /*********************************************************************//**
2666 Get the background drop list length. NOTE: the caller must own the
2667 drop list mutex!
2668 @return how many tables in list */
2669 ulint
row_get_background_drop_list_len_low(void)2670 row_get_background_drop_list_len_low(void)
2671 /*======================================*/
2672 {
2673 	ulint	len;
2674 
2675 	mutex_enter(&row_drop_list_mutex);
2676 
2677 	ut_a(row_mysql_drop_list_inited);
2678 
2679 	len = UT_LIST_GET_LEN(row_mysql_drop_list);
2680 
2681 	mutex_exit(&row_drop_list_mutex);
2682 
2683 	return(len);
2684 }
2685 
2686 /** Drop garbage tables during recovery. */
2687 void
row_mysql_drop_garbage_tables()2688 row_mysql_drop_garbage_tables()
2689 {
2690 	mem_heap_t*	heap = mem_heap_create(FN_REFLEN);
2691 	btr_pcur_t	pcur;
2692 	mtr_t		mtr;
2693 	trx_t*		trx = trx_create();
2694 	trx->op_info = "dropping garbage tables";
2695 	row_mysql_lock_data_dictionary(trx);
2696 
2697 	mtr.start();
2698 	btr_pcur_open_at_index_side(
2699 		true, dict_table_get_first_index(dict_sys.sys_tables),
2700 		BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
2701 
2702 	for (;;) {
2703 		const rec_t*	rec;
2704 		const byte*	field;
2705 		ulint		len;
2706 		const char*	table_name;
2707 
2708 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
2709 
2710 		if (!btr_pcur_is_on_user_rec(&pcur)) {
2711 			break;
2712 		}
2713 
2714 		rec = btr_pcur_get_rec(&pcur);
2715 		if (rec_get_deleted_flag(rec, 0)) {
2716 			continue;
2717 		}
2718 
2719 		field = rec_get_nth_field_old(rec, 0/*NAME*/, &len);
2720 		if (len == UNIV_SQL_NULL || len == 0) {
2721 			/* Corrupted SYS_TABLES.NAME */
2722 			continue;
2723 		}
2724 
2725 		table_name = mem_heap_strdupl(
2726 			heap,
2727 			reinterpret_cast<const char*>(field), len);
2728 		if (strstr(table_name, "/" TEMP_FILE_PREFIX "-") &&
2729                     !strstr(table_name, "/" TEMP_FILE_PREFIX "-backup-") &&
2730                     !strstr(table_name, "/" TEMP_FILE_PREFIX "-exchange-"))
2731                 {
2732 			btr_pcur_store_position(&pcur, &mtr);
2733 			btr_pcur_commit_specify_mtr(&pcur, &mtr);
2734 
2735 			if (dict_load_table(table_name,
2736 					    DICT_ERR_IGNORE_DROP)) {
2737 				row_drop_table_for_mysql(table_name, trx,
2738 							 SQLCOM_DROP_TABLE);
2739 				trx_commit_for_mysql(trx);
2740 			}
2741 
2742 			mtr.start();
2743 			btr_pcur_restore_position(BTR_SEARCH_LEAF,
2744 						  &pcur, &mtr);
2745 		}
2746 
2747 		mem_heap_empty(heap);
2748 	}
2749 
2750 	btr_pcur_close(&pcur);
2751 	mtr.commit();
2752 	row_mysql_unlock_data_dictionary(trx);
2753 	trx->free();
2754 	mem_heap_free(heap);
2755 }
2756 
2757 /*********************************************************************//**
2758 If a table is not yet in the drop list, adds the table to the list of tables
2759 which the master thread drops in background. We need this on Unix because in
2760 ALTER TABLE MySQL may call drop table even if the table has running queries on
2761 it. Also, if there are running foreign key checks on the table, we drop the
2762 table lazily.
2763 @return	whether background DROP TABLE was scheduled for the first time */
2764 static
2765 bool
row_add_table_to_background_drop_list(table_id_t table_id)2766 row_add_table_to_background_drop_list(table_id_t table_id)
2767 {
2768 	row_mysql_drop_t*	drop;
2769 	bool			added = true;
2770 
2771 	mutex_enter(&row_drop_list_mutex);
2772 
2773 	ut_a(row_mysql_drop_list_inited);
2774 
2775 	/* Look if the table already is in the drop list */
2776 	for (drop = UT_LIST_GET_FIRST(row_mysql_drop_list);
2777 	     drop != NULL;
2778 	     drop = UT_LIST_GET_NEXT(row_mysql_drop_list, drop)) {
2779 
2780 		if (drop->table_id == table_id) {
2781 			added = false;
2782 			goto func_exit;
2783 		}
2784 	}
2785 
2786 	drop = static_cast<row_mysql_drop_t*>(ut_malloc_nokey(sizeof *drop));
2787 	drop->table_id = table_id;
2788 
2789 	UT_LIST_ADD_LAST(row_mysql_drop_list, drop);
2790 
2791 	MONITOR_INC(MONITOR_BACKGROUND_DROP_TABLE);
2792 func_exit:
2793 	mutex_exit(&row_drop_list_mutex);
2794 	return added;
2795 }
2796 
2797 /** Reassigns the table identifier of a table.
2798 @param[in,out]	table	table
2799 @param[in,out]	trx	transaction
2800 @param[out]	new_id	new table id
2801 @return error code or DB_SUCCESS */
2802 static
2803 dberr_t
row_mysql_table_id_reassign(dict_table_t * table,trx_t * trx,table_id_t * new_id)2804 row_mysql_table_id_reassign(
2805 	dict_table_t*	table,
2806 	trx_t*		trx,
2807 	table_id_t*	new_id)
2808 {
2809 	dberr_t		err;
2810 	pars_info_t*	info	= pars_info_create();
2811 
2812 	dict_hdr_get_new_id(new_id, NULL, NULL);
2813 
2814 	pars_info_add_ull_literal(info, "old_id", table->id);
2815 	pars_info_add_ull_literal(info, "new_id", *new_id);
2816 
2817 	/* Note: This cannot be rolled back. Rollback would see the
2818 	UPDATE SYS_INDEXES as two operations: DELETE and INSERT.
2819 	It would invoke btr_free_if_exists() when rolling back the
2820 	INSERT, effectively dropping all indexes of the table. */
2821 	err = que_eval_sql(
2822 		info,
2823 		"PROCEDURE RENUMBER_TABLE_PROC () IS\n"
2824 		"BEGIN\n"
2825 		"UPDATE SYS_TABLES SET ID = :new_id\n"
2826 		" WHERE ID = :old_id;\n"
2827 		"UPDATE SYS_COLUMNS SET TABLE_ID = :new_id\n"
2828 		" WHERE TABLE_ID = :old_id;\n"
2829 		"UPDATE SYS_INDEXES SET TABLE_ID = :new_id\n"
2830 		" WHERE TABLE_ID = :old_id;\n"
2831 		"UPDATE SYS_VIRTUAL SET TABLE_ID = :new_id\n"
2832 		" WHERE TABLE_ID = :old_id;\n"
2833 		"END;\n", FALSE, trx);
2834 
2835 	return(err);
2836 }
2837 
2838 /*********************************************************************//**
2839 Setup the pre-requisites for DISCARD TABLESPACE. It will start the transaction,
2840 acquire the data dictionary lock in X mode and open the table.
2841 @return table instance or 0 if not found. */
2842 static
2843 dict_table_t*
row_discard_tablespace_begin(const char * name,trx_t * trx)2844 row_discard_tablespace_begin(
2845 /*=========================*/
2846 	const char*	name,	/*!< in: table name */
2847 	trx_t*		trx)	/*!< in: transaction handle */
2848 {
2849 	trx->op_info = "discarding tablespace";
2850 
2851 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
2852 
2853 	trx_start_if_not_started_xa(trx, true);
2854 
2855 	/* Serialize data dictionary operations with dictionary mutex:
2856 	this is to avoid deadlocks during data dictionary operations */
2857 
2858 	row_mysql_lock_data_dictionary(trx);
2859 
2860 	dict_table_t*	table;
2861 
2862 	table = dict_table_open_on_name(
2863 		name, TRUE, FALSE, DICT_ERR_IGNORE_FK_NOKEY);
2864 
2865 	if (table) {
2866 		dict_stats_wait_bg_to_stop_using_table(table, trx);
2867 		ut_a(!is_system_tablespace(table->space_id));
2868 		ut_ad(!table->n_foreign_key_checks_running);
2869 	}
2870 
2871 	return(table);
2872 }
2873 
2874 /*********************************************************************//**
2875 Do the foreign key constraint checks.
2876 @return DB_SUCCESS or error code. */
2877 static
2878 dberr_t
row_discard_tablespace_foreign_key_checks(const trx_t * trx,const dict_table_t * table)2879 row_discard_tablespace_foreign_key_checks(
2880 /*======================================*/
2881 	const trx_t*		trx,	/*!< in: transaction handle */
2882 	const dict_table_t*	table)	/*!< in: table to be discarded */
2883 {
2884 
2885 	if (srv_read_only_mode || !trx->check_foreigns) {
2886 		return(DB_SUCCESS);
2887 	}
2888 
2889 	/* Check if the table is referenced by foreign key constraints from
2890 	some other table (not the table itself) */
2891 	dict_foreign_set::const_iterator	it
2892 		= std::find_if(table->referenced_set.begin(),
2893 			       table->referenced_set.end(),
2894 			       dict_foreign_different_tables());
2895 
2896 	if (it == table->referenced_set.end()) {
2897 		return(DB_SUCCESS);
2898 	}
2899 
2900 	const dict_foreign_t*	foreign	= *it;
2901 	FILE*			ef	= dict_foreign_err_file;
2902 
2903 	ut_ad(foreign->foreign_table != table);
2904 	ut_ad(foreign->referenced_table == table);
2905 
2906 	/* We only allow discarding a referenced table if
2907 	FOREIGN_KEY_CHECKS is set to 0 */
2908 
2909 	mutex_enter(&dict_foreign_err_mutex);
2910 
2911 	rewind(ef);
2912 
2913 	ut_print_timestamp(ef);
2914 
2915 	fputs("  Cannot DISCARD table ", ef);
2916 	ut_print_name(ef, trx, table->name.m_name);
2917 	fputs("\n"
2918 	      "because it is referenced by ", ef);
2919 	ut_print_name(ef, trx, foreign->foreign_table_name);
2920 	putc('\n', ef);
2921 
2922 	mutex_exit(&dict_foreign_err_mutex);
2923 
2924 	return(DB_CANNOT_DROP_CONSTRAINT);
2925 }
2926 
2927 /*********************************************************************//**
2928 Cleanup after the DISCARD TABLESPACE operation.
2929 @return error code. */
2930 static
2931 dberr_t
row_discard_tablespace_end(trx_t * trx,dict_table_t * table,dberr_t err)2932 row_discard_tablespace_end(
2933 /*=======================*/
2934 	trx_t*		trx,	/*!< in/out: transaction handle */
2935 	dict_table_t*	table,	/*!< in/out: table to be discarded */
2936 	dberr_t		err)	/*!< in: error code */
2937 {
2938 	if (table != 0) {
2939 		dict_table_close(table, TRUE, FALSE);
2940 	}
2941 
2942 	DBUG_EXECUTE_IF("ib_discard_before_commit_crash",
2943 			log_buffer_flush_to_disk();
2944 			DBUG_SUICIDE(););
2945 
2946 	trx_commit_for_mysql(trx);
2947 
2948 	DBUG_EXECUTE_IF("ib_discard_after_commit_crash",
2949 			log_buffer_flush_to_disk();
2950 			DBUG_SUICIDE(););
2951 
2952 	row_mysql_unlock_data_dictionary(trx);
2953 
2954 	trx->op_info = "";
2955 
2956 	return(err);
2957 }
2958 
2959 /*********************************************************************//**
2960 Do the DISCARD TABLESPACE operation.
2961 @return DB_SUCCESS or error code. */
2962 static
2963 dberr_t
row_discard_tablespace(trx_t * trx,dict_table_t * table)2964 row_discard_tablespace(
2965 /*===================*/
2966 	trx_t*		trx,	/*!< in/out: transaction handle */
2967 	dict_table_t*	table)	/*!< in/out: table to be discarded */
2968 {
2969 	dberr_t		err;
2970 
2971 	/* How do we prevent crashes caused by ongoing operations on
2972 	the table? Old operations could try to access non-existent
2973 	pages. MySQL will block all DML on the table using MDL and a
2974 	DISCARD will not start unless all existing operations on the
2975 	table to be discarded are completed.
2976 
2977 	1) Acquire the data dictionary latch in X mode. To prevent any
2978 	internal operations that MySQL is not aware off and also for
2979 	the internal SQL parser.
2980 
2981 	2) Purge and rollback: we assign a new table id for the
2982 	table. Since purge and rollback look for the table based on
2983 	the table id, they see the table as 'dropped' and discard
2984 	their operations.
2985 
2986 	3) Insert buffer: we remove all entries for the tablespace in
2987 	the insert buffer tree. */
2988 
2989 	ibuf_delete_for_discarded_space(table->space_id);
2990 
2991 	table_id_t	new_id;
2992 
2993 	/* Set the TABLESPACE DISCARD flag in the table definition
2994 	on disk. */
2995 	err = row_import_update_discarded_flag(trx, table->id, true);
2996 
2997 	if (err != DB_SUCCESS) {
2998 		return(err);
2999 	}
3000 
3001 	/* Update the index root pages in the system tables, on disk */
3002 	err = row_import_update_index_root(trx, table, true);
3003 
3004 	if (err != DB_SUCCESS) {
3005 		return(err);
3006 	}
3007 
3008 	/* Drop all the FTS auxiliary tables. */
3009 	if (dict_table_has_fts_index(table)
3010 	    || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
3011 
3012 		fts_drop_tables(trx, table);
3013 	}
3014 
3015 	/* Assign a new space ID to the table definition so that purge
3016 	can ignore the changes. Update the system table on disk. */
3017 
3018 	err = row_mysql_table_id_reassign(table, trx, &new_id);
3019 
3020 	if (err != DB_SUCCESS) {
3021 		return(err);
3022 	}
3023 
3024 	/* Discard the physical file that is used for the tablespace. */
3025 	err = fil_delete_tablespace(table->space_id);
3026 	switch (err) {
3027 	case DB_IO_ERROR:
3028 		ib::warn() << "ALTER TABLE " << table->name
3029 			<< " DISCARD TABLESPACE failed to delete file";
3030 		break;
3031 	case DB_TABLESPACE_NOT_FOUND:
3032 		ib::warn() << "ALTER TABLE " << table->name
3033 			<< " DISCARD TABLESPACE failed to find tablespace";
3034 		break;
3035 	case DB_SUCCESS:
3036 		break;
3037 	default:
3038 		ut_error;
3039 	}
3040 
3041 	/* All persistent operations successful, update the
3042 	data dictionary memory cache. */
3043 
3044 	table->file_unreadable = true;
3045 	table->space = NULL;
3046 	table->flags2 |= DICT_TF2_DISCARDED;
3047 	dict_table_change_id_in_cache(table, new_id);
3048 
3049 	dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
3050 	if (index) index->clear_instant_alter();
3051 
3052 	/* Reset the root page numbers. */
3053 	for (; index; index = UT_LIST_GET_NEXT(indexes, index)) {
3054 		index->page = FIL_NULL;
3055 	}
3056 
3057 	/* If the tablespace did not already exist or we couldn't
3058 	write to it, we treat that as a successful DISCARD. It is
3059 	unusable anyway. */
3060 	return DB_SUCCESS;
3061 }
3062 
3063 /*********************************************************************//**
3064 Discards the tablespace of a table which stored in an .ibd file. Discarding
3065 means that this function renames the .ibd file and assigns a new table id for
3066 the table. Also the file_unreadable flag is set.
3067 @return error code or DB_SUCCESS */
3068 dberr_t
row_discard_tablespace_for_mysql(const char * name,trx_t * trx)3069 row_discard_tablespace_for_mysql(
3070 /*=============================*/
3071 	const char*	name,	/*!< in: table name */
3072 	trx_t*		trx)	/*!< in: transaction handle */
3073 {
3074 	dberr_t		err;
3075 	dict_table_t*	table;
3076 
3077 	/* Open the table and start the transaction if not started. */
3078 
3079 	table = row_discard_tablespace_begin(name, trx);
3080 
3081 	if (table == 0) {
3082 		err = DB_TABLE_NOT_FOUND;
3083 	} else if (table->is_temporary()) {
3084 
3085 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3086 			    ER_CANNOT_DISCARD_TEMPORARY_TABLE);
3087 
3088 		err = DB_ERROR;
3089 
3090 	} else if (table->space_id == TRX_SYS_SPACE) {
3091 		char	table_name[MAX_FULL_NAME_LEN + 1];
3092 
3093 		innobase_format_name(
3094 			table_name, sizeof(table_name),
3095 			table->name.m_name);
3096 
3097 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
3098 			    ER_TABLE_IN_SYSTEM_TABLESPACE, table_name);
3099 
3100 		err = DB_ERROR;
3101 
3102 	} else {
3103 		ut_ad(!table->n_foreign_key_checks_running);
3104 
3105 		bool fts_exist = (dict_table_has_fts_index(table)
3106 				  || DICT_TF2_FLAG_IS_SET(
3107 					  table, DICT_TF2_FTS_HAS_DOC_ID));
3108 
3109 		if (fts_exist) {
3110 			row_mysql_unlock_data_dictionary(trx);
3111 			fts_optimize_remove_table(table);
3112 			row_mysql_lock_data_dictionary(trx);
3113 		}
3114 
3115 		/* Do foreign key constraint checks. */
3116 
3117 		err = row_discard_tablespace_foreign_key_checks(trx, table);
3118 
3119 		if (err == DB_SUCCESS) {
3120 			/* Note: This cannot be rolled back.
3121 			Rollback would see the UPDATE SYS_INDEXES
3122 			as two operations: DELETE and INSERT.
3123 			It would invoke btr_free_if_exists()
3124 			when rolling back the INSERT, effectively
3125 			dropping all indexes of the table. */
3126 			err = row_discard_tablespace(trx, table);
3127 		}
3128 
3129 		if (fts_exist && err != DB_SUCCESS) {
3130 			fts_optimize_add_table(table);
3131 		}
3132 	}
3133 
3134 	return(row_discard_tablespace_end(trx, table, err));
3135 }
3136 
3137 /*********************************************************************//**
3138 Sets an exclusive lock on a table.
3139 @return error code or DB_SUCCESS */
3140 dberr_t
row_mysql_lock_table(trx_t * trx,dict_table_t * table,enum lock_mode mode,const char * op_info)3141 row_mysql_lock_table(
3142 /*=================*/
3143 	trx_t*		trx,		/*!< in/out: transaction */
3144 	dict_table_t*	table,		/*!< in: table to lock */
3145 	enum lock_mode	mode,		/*!< in: LOCK_X or LOCK_S */
3146 	const char*	op_info)	/*!< in: string for trx->op_info */
3147 {
3148 	mem_heap_t*	heap;
3149 	que_thr_t*	thr;
3150 	dberr_t		err;
3151 	sel_node_t*	node;
3152 
3153 	ut_ad(mode == LOCK_X || mode == LOCK_S);
3154 
3155 	heap = mem_heap_create(512);
3156 
3157 	trx->op_info = op_info;
3158 
3159 	node = sel_node_create(heap);
3160 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
3161 	thr->graph->state = QUE_FORK_ACTIVE;
3162 
3163 	/* We use the select query graph as the dummy graph needed
3164 	in the lock module call */
3165 
3166 	thr = que_fork_get_first_thr(
3167 		static_cast<que_fork_t*>(que_node_get_parent(thr)));
3168 
3169 	thr->start_running();
3170 
3171 run_again:
3172 	thr->run_node = thr;
3173 	thr->prev_node = thr->common.parent;
3174 
3175 	err = lock_table(0, table, mode, thr);
3176 
3177 	trx->error_state = err;
3178 
3179 	if (err == DB_SUCCESS) {
3180 		thr->stop_no_error();
3181 	} else {
3182 		que_thr_stop_for_mysql(thr);
3183 
3184 		if (row_mysql_handle_errors(&err, trx, thr, NULL)) {
3185 			goto run_again;
3186 		}
3187 	}
3188 
3189 	que_graph_free(thr->graph);
3190 	trx->op_info = "";
3191 
3192 	return(err);
3193 }
3194 
3195 /** Drop ancillary FTS tables as part of dropping a table.
3196 @param[in,out]	table		Table cache entry
3197 @param[in,out]	trx		Transaction handle
3198 @return error code or DB_SUCCESS */
3199 UNIV_INLINE
3200 dberr_t
row_drop_ancillary_fts_tables(dict_table_t * table,trx_t * trx)3201 row_drop_ancillary_fts_tables(
3202 	dict_table_t*	table,
3203 	trx_t*		trx)
3204 {
3205 	/* Drop ancillary FTS tables */
3206 	if (dict_table_has_fts_index(table)
3207 	    || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
3208 
3209 		ut_ad(table->get_ref_count() == 0);
3210 		ut_ad(trx_is_started(trx));
3211 
3212 		dberr_t err = fts_drop_tables(trx, table);
3213 
3214 		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
3215 			ib::error() << " Unable to remove ancillary FTS"
3216 				" tables for table "
3217 				<< table->name << " : " << err;
3218 
3219 			return(err);
3220 		}
3221 	}
3222 
3223 	/* The table->fts flag can be set on the table for which
3224 	the cluster index is being rebuilt. Such table might not have
3225 	DICT_TF2_FTS flag set. So keep this out of above
3226 	dict_table_has_fts_index condition */
3227 	if (table->fts != NULL) {
3228 		/* fts_que_graph_free_check_lock would try to acquire
3229 		dict mutex lock */
3230 		table->fts->dict_locked = true;
3231 
3232 		fts_free(table);
3233 	}
3234 
3235 	return(DB_SUCCESS);
3236 }
3237 
3238 /** Drop a table from the memory cache as part of dropping a table.
3239 @param[in]	tablename	A copy of table->name. Used when table == null
3240 @param[in,out]	table		Table cache entry
3241 @param[in,out]	trx		Transaction handle
3242 @return error code or DB_SUCCESS */
3243 UNIV_INLINE
3244 dberr_t
row_drop_table_from_cache(const char * tablename,dict_table_t * table,trx_t * trx)3245 row_drop_table_from_cache(
3246 	const char*	tablename,
3247 	dict_table_t*	table,
3248 	trx_t*		trx)
3249 {
3250 	dberr_t	err = DB_SUCCESS;
3251 	ut_ad(!table->is_temporary());
3252 
3253 	/* Remove the pointer to this table object from the list
3254 	of modified tables by the transaction because the object
3255 	is going to be destroyed below. */
3256 	trx->mod_tables.erase(table);
3257 
3258 	dict_sys.remove(table);
3259 
3260 	if (dict_load_table(tablename, DICT_ERR_IGNORE_FK_NOKEY)) {
3261 		ib::error() << "Not able to remove table "
3262 			<< ut_get_name(trx, tablename)
3263 			<< " from the dictionary cache!";
3264 		err = DB_ERROR;
3265 	}
3266 
3267 	return(err);
3268 }
3269 
3270 /** Drop a table for MySQL.
3271 If the data dictionary was not already locked by the transaction,
3272 the transaction will be committed.  Otherwise, the data dictionary
3273 will remain locked.
3274 @param[in]	name		Table name
3275 @param[in,out]	trx		Transaction handle
3276 @param[in]	sqlcom		type of SQL operation
3277 @param[in]	create_failed	true=create table failed
3278 				because e.g. foreign key column
3279 @param[in]	nonatomic	Whether it is permitted to release
3280 				and reacquire dict_sys.latch
3281 @return error code or DB_SUCCESS */
3282 dberr_t
row_drop_table_for_mysql(const char * name,trx_t * trx,enum_sql_command sqlcom,bool create_failed,bool nonatomic)3283 row_drop_table_for_mysql(
3284 	const char*		name,
3285 	trx_t*			trx,
3286 	enum_sql_command	sqlcom,
3287 	bool			create_failed,
3288 	bool			nonatomic)
3289 {
3290 	dberr_t		err;
3291 	dict_foreign_t*	foreign;
3292 	dict_table_t*	table;
3293 	char*		tablename		= NULL;
3294 	bool		locked_dictionary	= false;
3295 	pars_info_t*	info			= NULL;
3296 	mem_heap_t*	heap			= NULL;
3297 
3298 
3299 	DBUG_ENTER("row_drop_table_for_mysql");
3300 	DBUG_PRINT("row_drop_table_for_mysql", ("table: '%s'", name));
3301 
3302 	ut_a(name != NULL);
3303 
3304 	/* Serialize data dictionary operations with dictionary mutex:
3305 	no deadlocks can occur then in these operations */
3306 
3307 	trx->op_info = "dropping table";
3308 
3309 	if (trx->dict_operation_lock_mode != RW_X_LATCH) {
3310 		/* Prevent foreign key checks etc. while we are
3311 		dropping the table */
3312 
3313 		row_mysql_lock_data_dictionary(trx);
3314 
3315 		locked_dictionary = true;
3316 		nonatomic = true;
3317 	}
3318 
3319 	ut_d(dict_sys.assert_locked());
3320 
3321 	table = dict_table_open_on_name(
3322 		name, TRUE, FALSE,
3323 		static_cast<dict_err_ignore_t>(
3324 			DICT_ERR_IGNORE_INDEX_ROOT
3325 			| DICT_ERR_IGNORE_CORRUPT));
3326 
3327 	if (!table) {
3328 		if (locked_dictionary) {
3329 			row_mysql_unlock_data_dictionary(trx);
3330 		}
3331 		trx->op_info = "";
3332 		DBUG_RETURN(DB_TABLE_NOT_FOUND);
3333 	}
3334 
3335 	std::vector<pfs_os_file_t> detached_handles;
3336 
3337 	const bool is_temp_name = strstr(table->name.m_name,
3338 					 "/" TEMP_FILE_PREFIX);
3339 
3340 	if (table->is_temporary()) {
3341 		ut_ad(table->space == fil_system.temp_space);
3342 		for (dict_index_t* index = dict_table_get_first_index(table);
3343 		     index != NULL;
3344 		     index = dict_table_get_next_index(index)) {
3345 			btr_free(page_id_t(SRV_TMP_SPACE_ID, index->page));
3346 		}
3347 		/* Remove the pointer to this table object from the list
3348 		of modified tables by the transaction because the object
3349 		is going to be destroyed below. */
3350 		trx->mod_tables.erase(table);
3351 		table->release();
3352 		dict_sys.remove(table);
3353 		err = DB_SUCCESS;
3354 		goto funct_exit_all_freed;
3355 	}
3356 
3357 	/* This function is called recursively via fts_drop_tables(). */
3358 	if (!trx_is_started(trx)) {
3359 		trx_start_for_ddl(trx, TRX_DICT_OP_TABLE);
3360 	}
3361 
3362 	/* Turn on this drop bit before we could release the dictionary
3363 	latch */
3364 	table->to_be_dropped = true;
3365 
3366 	if (nonatomic) {
3367 		/* This trx did not acquire any locks on dictionary
3368 		table records yet. Thus it is safe to release and
3369 		reacquire the data dictionary latches. */
3370 		if (table->fts) {
3371 			row_mysql_unlock_data_dictionary(trx);
3372 			fts_optimize_remove_table(table);
3373 			row_mysql_lock_data_dictionary(trx);
3374 		}
3375 
3376 		dict_stats_wait_bg_to_stop_using_table(table, trx);
3377 	}
3378 
3379 	/* make sure background stats thread is not running on the table */
3380 	ut_ad(!(table->stats_bg_flag & BG_STAT_IN_PROGRESS));
3381 	if (!table->no_rollback()) {
3382 		if (table->space != fil_system.sys_space) {
3383 			/* Delete the link file if used. */
3384 			if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3385 				RemoteDatafile::delete_link_file(name);
3386 			}
3387 		}
3388 
3389 		dict_stats_recalc_pool_del(table);
3390 		dict_stats_defrag_pool_del(table, NULL);
3391 		if (btr_defragment_active) {
3392 			/* During fts_drop_orphaned_tables() the
3393 			btr_defragment_mutex has not yet been
3394 			initialized by btr_defragment_init(). */
3395 			btr_defragment_remove_table(table);
3396 		}
3397 
3398 		if (UNIV_LIKELY(!strstr(name, "/" TEMP_FILE_PREFIX_INNODB))) {
3399 			/* Remove any persistent statistics for this table,
3400 			in a separate transaction. */
3401 			char errstr[1024];
3402 			err = dict_stats_drop_table(name, errstr,
3403 						    sizeof errstr);
3404 			if (err != DB_SUCCESS) {
3405 				ib::warn() << errstr;
3406 			}
3407 		}
3408 	}
3409 
3410 	dict_table_prevent_eviction(table);
3411 	dict_table_close(table, TRUE, FALSE);
3412 
3413 	/* Check if the table is referenced by foreign key constraints from
3414 	some other table (not the table itself) */
3415 
3416 	if (!srv_read_only_mode && trx->check_foreigns) {
3417 
3418 		for (dict_foreign_set::iterator it
3419 			= table->referenced_set.begin();
3420 		     it != table->referenced_set.end();
3421 		     ++it) {
3422 
3423 			foreign = *it;
3424 
3425 			const bool	ref_ok = sqlcom == SQLCOM_DROP_DB
3426 				&& dict_tables_have_same_db(
3427 					name,
3428 					foreign->foreign_table_name_lookup);
3429 
3430 			/* We should allow dropping a referenced table if creating
3431 			that referenced table has failed for some reason. For example
3432 			if referenced table is created but it column types that are
3433 			referenced do not match. */
3434 			if (foreign->foreign_table != table &&
3435 			    !create_failed && !ref_ok) {
3436 
3437 				FILE*	ef	= dict_foreign_err_file;
3438 
3439 				/* We only allow dropping a referenced table
3440 				if FOREIGN_KEY_CHECKS is set to 0 */
3441 
3442 				err = DB_CANNOT_DROP_CONSTRAINT;
3443 
3444 				mutex_enter(&dict_foreign_err_mutex);
3445 				rewind(ef);
3446 				ut_print_timestamp(ef);
3447 
3448 				fputs("  Cannot drop table ", ef);
3449 				ut_print_name(ef, trx, name);
3450 				fputs("\n"
3451 				      "because it is referenced by ", ef);
3452 				ut_print_name(ef, trx,
3453 					      foreign->foreign_table_name);
3454 				putc('\n', ef);
3455 				mutex_exit(&dict_foreign_err_mutex);
3456 
3457 				goto funct_exit;
3458 			}
3459 		}
3460 	}
3461 
3462 	DBUG_EXECUTE_IF("row_drop_table_add_to_background", goto defer;);
3463 
3464 	/* TODO: could we replace the counter n_foreign_key_checks_running
3465 	with lock checks on the table? Acquire here an exclusive lock on the
3466 	table, and rewrite lock0lock.cc and the lock wait in srv0srv.cc so that
3467 	they can cope with the table having been dropped here? Foreign key
3468 	checks take an IS or IX lock on the table. */
3469 
3470 	if (table->n_foreign_key_checks_running > 0) {
3471 defer:
3472 		/* Rename #sql-backup to #sql-ib if table has open ref count
3473 		while dropping the table. This scenario can happen
3474 		when purge thread is waiting for dict_sys.mutex so
3475 		that it could close the table. But drop table acquires
3476 		dict_sys.mutex.
3477                 In the future this should use 'tmp_file_prefix'!
3478                 */
3479 		if (!is_temp_name
3480 		    || strstr(table->name.m_name, "/#sql-backup-")) {
3481 			heap = mem_heap_create(FN_REFLEN);
3482 			const char* tmp_name
3483 				= dict_mem_create_temporary_tablename(
3484 					heap, table->name.m_name, table->id);
3485 			ib::info() << "Deferring DROP TABLE " << table->name
3486 				   << "; renaming to " << tmp_name;
3487 			err = row_rename_table_for_mysql(
3488 				table->name.m_name, tmp_name, trx,
3489 				false, false);
3490 		} else {
3491 			err = DB_SUCCESS;
3492 		}
3493 		if (err == DB_SUCCESS) {
3494 			row_add_table_to_background_drop_list(table->id);
3495 		}
3496 		goto funct_exit;
3497 	}
3498 
3499 	/* Remove all locks that are on the table or its records, if there
3500 	are no references to the table but it has record locks, we release
3501 	the record locks unconditionally. One use case is:
3502 
3503 		CREATE TABLE t2 (PRIMARY KEY (a)) SELECT * FROM t1;
3504 
3505 	If after the user transaction has done the SELECT and there is a
3506 	problem in completing the CREATE TABLE operation, MySQL will drop
3507 	the table. InnoDB will create a new background transaction to do the
3508 	actual drop, the trx instance that is passed to this function. To
3509 	preserve existing behaviour we remove the locks but ideally we
3510 	shouldn't have to. There should never be record locks on a table
3511 	that is going to be dropped. */
3512 
3513 	if (table->get_ref_count() > 0 || table->n_rec_locks > 0
3514 	    || lock_table_has_locks(table)) {
3515 		goto defer;
3516 	}
3517 
3518 	/* The "to_be_dropped" marks table that is to be dropped, but
3519 	has not been dropped, instead, was put in the background drop
3520 	list due to being used by concurrent DML operations. Clear it
3521 	here since there are no longer any concurrent activities on it,
3522 	and it is free to be dropped */
3523 	table->to_be_dropped = false;
3524 
3525 	switch (trx_get_dict_operation(trx)) {
3526 	case TRX_DICT_OP_NONE:
3527 		trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
3528 		trx->table_id = table->id;
3529 	case TRX_DICT_OP_TABLE:
3530 		break;
3531 	case TRX_DICT_OP_INDEX:
3532 		/* If the transaction was previously flagged as
3533 		TRX_DICT_OP_INDEX, we should be dropping auxiliary
3534 		tables for full-text indexes. */
3535 		ut_ad(strstr(table->name.m_name, "/FTS_"));
3536 	}
3537 
3538 	/* Mark all indexes unavailable in the data dictionary cache
3539 	before starting to drop the table. */
3540 
3541 	unsigned*	page_no;
3542 	unsigned*	page_nos;
3543 	heap = mem_heap_create(
3544 		200 + UT_LIST_GET_LEN(table->indexes) * sizeof *page_nos);
3545 	tablename = mem_heap_strdup(heap, name);
3546 
3547 	page_no = page_nos = static_cast<unsigned*>(
3548 		mem_heap_alloc(
3549 			heap,
3550 			UT_LIST_GET_LEN(table->indexes) * sizeof *page_no));
3551 
3552 	for (dict_index_t* index = dict_table_get_first_index(table);
3553 	     index != NULL;
3554 	     index = dict_table_get_next_index(index)) {
3555 		rw_lock_x_lock(dict_index_get_lock(index));
3556 		/* Save the page numbers so that we can restore them
3557 		if the operation fails. */
3558 		*page_no++ = index->page;
3559 		/* Mark the index unusable. */
3560 		index->page = FIL_NULL;
3561 		rw_lock_x_unlock(dict_index_get_lock(index));
3562 	}
3563 
3564 	/* Deleting a row from SYS_INDEXES table will invoke
3565 	dict_drop_index_tree(). */
3566 	info = pars_info_create();
3567 
3568 	pars_info_add_str_literal(info, "name", name);
3569 
3570 	if (sqlcom != SQLCOM_TRUNCATE
3571 	    && strchr(name, '/')
3572 	    && dict_table_get_low("SYS_FOREIGN")
3573 	    && dict_table_get_low("SYS_FOREIGN_COLS")) {
3574 		err = que_eval_sql(
3575 			info,
3576 			"PROCEDURE DROP_FOREIGN_PROC () IS\n"
3577 			"fid CHAR;\n"
3578 
3579 			"DECLARE CURSOR fk IS\n"
3580 			"SELECT ID FROM SYS_FOREIGN\n"
3581 			"WHERE FOR_NAME = :name\n"
3582 			"AND TO_BINARY(FOR_NAME) = TO_BINARY(:name)\n"
3583 			"FOR UPDATE;\n"
3584 
3585 			"BEGIN\n"
3586 			"OPEN fk;\n"
3587 			"WHILE 1 = 1 LOOP\n"
3588 			"  FETCH fk INTO fid;\n"
3589 			"  IF (SQL % NOTFOUND) THEN RETURN; END IF;\n"
3590 			"  DELETE FROM SYS_FOREIGN_COLS WHERE ID=fid;\n"
3591 			"  DELETE FROM SYS_FOREIGN WHERE ID=fid;\n"
3592 			"END LOOP;\n"
3593 			"CLOSE fk;\n"
3594 			"END;\n", FALSE, trx);
3595 		if (err == DB_SUCCESS) {
3596 			info = pars_info_create();
3597 			pars_info_add_str_literal(info, "name", name);
3598 			goto do_drop;
3599 		}
3600 	} else {
3601 do_drop:
3602 		if (dict_table_get_low("SYS_VIRTUAL")) {
3603 			err = que_eval_sql(
3604 				info,
3605 				"PROCEDURE DROP_VIRTUAL_PROC () IS\n"
3606 				"tid CHAR;\n"
3607 
3608 				"BEGIN\n"
3609 				"SELECT ID INTO tid FROM SYS_TABLES\n"
3610 				"WHERE NAME = :name FOR UPDATE;\n"
3611 				"IF (SQL % NOTFOUND) THEN RETURN;"
3612 				" END IF;\n"
3613 				"DELETE FROM SYS_VIRTUAL"
3614 				" WHERE TABLE_ID = tid;\n"
3615 				"END;\n", FALSE, trx);
3616 			if (err == DB_SUCCESS) {
3617 				info = pars_info_create();
3618 				pars_info_add_str_literal(
3619 					info, "name", name);
3620 			}
3621 		} else {
3622 			err = DB_SUCCESS;
3623 		}
3624 
3625 		err = err == DB_SUCCESS ? que_eval_sql(
3626 			info,
3627 			"PROCEDURE DROP_TABLE_PROC () IS\n"
3628 			"tid CHAR;\n"
3629 			"iid CHAR;\n"
3630 
3631 			"DECLARE CURSOR cur_idx IS\n"
3632 			"SELECT ID FROM SYS_INDEXES\n"
3633 			"WHERE TABLE_ID = tid FOR UPDATE;\n"
3634 
3635 			"BEGIN\n"
3636 			"SELECT ID INTO tid FROM SYS_TABLES\n"
3637 			"WHERE NAME = :name FOR UPDATE;\n"
3638 			"IF (SQL % NOTFOUND) THEN RETURN; END IF;\n"
3639 
3640 			"OPEN cur_idx;\n"
3641 			"WHILE 1 = 1 LOOP\n"
3642 			"  FETCH cur_idx INTO iid;\n"
3643 			"  IF (SQL % NOTFOUND) THEN EXIT; END IF;\n"
3644 			"  DELETE FROM SYS_FIELDS\n"
3645 			"  WHERE INDEX_ID = iid;\n"
3646 			"  DELETE FROM SYS_INDEXES\n"
3647 			"  WHERE ID = iid AND TABLE_ID = tid;\n"
3648 			"END LOOP;\n"
3649 			"CLOSE cur_idx;\n"
3650 
3651 			"DELETE FROM SYS_COLUMNS WHERE TABLE_ID=tid;\n"
3652 			"DELETE FROM SYS_TABLES WHERE NAME=:name;\n"
3653 
3654 			"END;\n", FALSE, trx) : err;
3655 
3656 		if (err == DB_SUCCESS && table->space
3657 		    && dict_table_get_low("SYS_TABLESPACES")
3658 		    && dict_table_get_low("SYS_DATAFILES")) {
3659 			info = pars_info_create();
3660 			pars_info_add_int4_literal(info, "id",
3661 						   lint(table->space_id));
3662 			err = que_eval_sql(
3663 				info,
3664 				"PROCEDURE DROP_SPACE_PROC () IS\n"
3665 				"BEGIN\n"
3666 				"DELETE FROM SYS_TABLESPACES\n"
3667 				"WHERE SPACE = :id;\n"
3668 				"DELETE FROM SYS_DATAFILES\n"
3669 				"WHERE SPACE = :id;\n"
3670 				"END;\n", FALSE, trx);
3671 		}
3672 	}
3673 
3674 	switch (err) {
3675 		fil_space_t* space;
3676 		char* filepath;
3677 	case DB_SUCCESS:
3678 		if (!table->no_rollback()) {
3679 			err = row_drop_ancillary_fts_tables(table, trx);
3680 			if (err != DB_SUCCESS) {
3681 				break;
3682 			}
3683 		}
3684 
3685 		space = table->space;
3686 		ut_ad(!space || space->id == table->space_id);
3687 		/* Determine the tablespace filename before we drop
3688 		dict_table_t. */
3689 		if (DICT_TF_HAS_DATA_DIR(table->flags)) {
3690 			dict_get_and_save_data_dir_path(table, true);
3691 			ut_ad(table->data_dir_path || !space);
3692 			filepath = space ? NULL : fil_make_filepath(
3693 				table->data_dir_path,
3694 				table->name.m_name, IBD,
3695 				table->data_dir_path != NULL);
3696 		} else {
3697 			filepath = space ? NULL : fil_make_filepath(
3698 				NULL, table->name.m_name, IBD, false);
3699 		}
3700 
3701 		/* Free the dict_table_t object. */
3702 		err = row_drop_table_from_cache(tablename, table, trx);
3703 		if (err != DB_SUCCESS) {
3704 			ut_free(filepath);
3705 			break;
3706 		}
3707 
3708 		/* Do not attempt to drop known-to-be-missing tablespaces,
3709 		nor the system tablespace. */
3710 		if (!space) {
3711 			fil_delete_file(filepath);
3712 			ut_free(filepath);
3713 			break;
3714 		}
3715 
3716 		ut_ad(!filepath);
3717 
3718 		if (space->id != TRX_SYS_SPACE) {
3719 			err = fil_delete_tablespace(space->id, false,
3720 						    &detached_handles);
3721 		}
3722 		break;
3723 
3724 	case DB_OUT_OF_FILE_SPACE:
3725 		err = DB_MUST_GET_MORE_FILE_SPACE;
3726 		trx->error_state = err;
3727 		row_mysql_handle_errors(&err, trx, NULL, NULL);
3728 
3729 		/* raise error */
3730 		ut_error;
3731 		break;
3732 
3733 	case DB_TOO_MANY_CONCURRENT_TRXS:
3734 		/* Cannot even find a free slot for the
3735 		the undo log. We can directly exit here
3736 		and return the DB_TOO_MANY_CONCURRENT_TRXS
3737 		error. */
3738 
3739 	default:
3740 		/* This is some error we do not expect. Print
3741 		the error number and rollback the transaction */
3742 		ib::error() << "Unknown error code " << err << " while"
3743 			" dropping table: "
3744 			<< ut_get_name(trx, tablename) << ".";
3745 
3746 		trx->error_state = DB_SUCCESS;
3747 		trx->rollback();
3748 		trx->error_state = DB_SUCCESS;
3749 
3750 		/* Mark all indexes available in the data dictionary
3751 		cache again. */
3752 
3753 		page_no = page_nos;
3754 
3755 		for (dict_index_t* index = dict_table_get_first_index(table);
3756 		     index != NULL;
3757 		     index = dict_table_get_next_index(index)) {
3758 			rw_lock_x_lock(dict_index_get_lock(index));
3759 			ut_a(index->page == FIL_NULL);
3760 			index->page = *page_no++;
3761 			rw_lock_x_unlock(dict_index_get_lock(index));
3762 		}
3763 	}
3764 
3765 	if (err != DB_SUCCESS && table != NULL) {
3766 		/* Drop table has failed with error but as drop table is not
3767 		transaction safe we should mark the table as corrupted to avoid
3768 		unwarranted follow-up action on this table that can result
3769 		in more serious issues. */
3770 
3771 		table->corrupted = true;
3772 		for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
3773 		     index != NULL;
3774 		     index = UT_LIST_GET_NEXT(indexes, index)) {
3775 			dict_set_corrupted(index, trx, "DROP TABLE");
3776 		}
3777 	}
3778 
3779 funct_exit:
3780 	if (heap) {
3781 		mem_heap_free(heap);
3782 	}
3783 
3784 funct_exit_all_freed:
3785 	if (locked_dictionary) {
3786 
3787 		if (trx_is_started(trx)) {
3788 
3789 			trx_commit_for_mysql(trx);
3790 		}
3791 
3792 		/* Add the table to fts queue if drop table fails */
3793 		if (err != DB_SUCCESS && table->fts) {
3794 			fts_optimize_add_table(table);
3795 		}
3796 
3797 		row_mysql_unlock_data_dictionary(trx);
3798 	}
3799 
3800 	for (const auto& handle : detached_handles) {
3801 		ut_ad(handle != OS_FILE_CLOSED);
3802 		os_file_close(handle);
3803 	}
3804 
3805 	trx->op_info = "";
3806 
3807 	DBUG_RETURN(err);
3808 }
3809 
3810 /** Drop a table after failed CREATE TABLE. */
row_drop_table_after_create_fail(const char * name,trx_t * trx)3811 dberr_t row_drop_table_after_create_fail(const char* name, trx_t* trx)
3812 {
3813 	ib::warn() << "Dropping incompletely created " << name << " table.";
3814 	return row_drop_table_for_mysql(name, trx, SQLCOM_DROP_DB, true);
3815 }
3816 
3817 /*******************************************************************//**
3818 Drop all foreign keys in a database, see Bug#18942.
3819 Called at the end of row_drop_database_for_mysql().
3820 @return error code or DB_SUCCESS */
3821 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3822 dberr_t
drop_all_foreign_keys_in_db(const char * name,trx_t * trx)3823 drop_all_foreign_keys_in_db(
3824 /*========================*/
3825 	const char*	name,	/*!< in: database name which ends to '/' */
3826 	trx_t*		trx)	/*!< in: transaction handle */
3827 {
3828 	pars_info_t*	pinfo;
3829 	dberr_t		err;
3830 
3831 	ut_a(name[strlen(name) - 1] == '/');
3832 
3833 	pinfo = pars_info_create();
3834 
3835 	pars_info_add_str_literal(pinfo, "dbname", name);
3836 
3837 /** true if for_name is not prefixed with dbname */
3838 #define TABLE_NOT_IN_THIS_DB \
3839 "SUBSTR(for_name, 0, LENGTH(:dbname)) <> :dbname"
3840 
3841 	err = que_eval_sql(pinfo,
3842 			   "PROCEDURE DROP_ALL_FOREIGN_KEYS_PROC () IS\n"
3843 			   "foreign_id CHAR;\n"
3844 			   "for_name CHAR;\n"
3845 			   "found INT;\n"
3846 			   "DECLARE CURSOR cur IS\n"
3847 			   "SELECT ID, FOR_NAME FROM SYS_FOREIGN\n"
3848 			   "WHERE FOR_NAME >= :dbname\n"
3849 			   "LOCK IN SHARE MODE\n"
3850 			   "ORDER BY FOR_NAME;\n"
3851 			   "BEGIN\n"
3852 			   "found := 1;\n"
3853 			   "OPEN cur;\n"
3854 			   "WHILE found = 1 LOOP\n"
3855 			   "        FETCH cur INTO foreign_id, for_name;\n"
3856 			   "        IF (SQL % NOTFOUND) THEN\n"
3857 			   "                found := 0;\n"
3858 			   "        ELSIF (" TABLE_NOT_IN_THIS_DB ") THEN\n"
3859 			   "                found := 0;\n"
3860 			   "        ELSIF (1=1) THEN\n"
3861 			   "                DELETE FROM SYS_FOREIGN_COLS\n"
3862 			   "                WHERE ID = foreign_id;\n"
3863 			   "                DELETE FROM SYS_FOREIGN\n"
3864 			   "                WHERE ID = foreign_id;\n"
3865 			   "        END IF;\n"
3866 			   "END LOOP;\n"
3867 			   "CLOSE cur;\n"
3868 			   "COMMIT WORK;\n"
3869 			   "END;\n",
3870 			   FALSE, /* do not reserve dict mutex,
3871 				  we are already holding it */
3872 			   trx);
3873 
3874 	return(err);
3875 }
3876 
3877 /** Drop a database for MySQL.
3878 @param[in]	name	database name which ends at '/'
3879 @param[in]	trx	transaction handle
3880 @param[out]	found	number of dropped tables/partitions
3881 @return error code or DB_SUCCESS */
3882 dberr_t
row_drop_database_for_mysql(const char * name,trx_t * trx,ulint * found)3883 row_drop_database_for_mysql(
3884 	const char*	name,
3885 	trx_t*		trx,
3886 	ulint*		found)
3887 {
3888 	dict_table_t*	table;
3889 	char*		table_name;
3890 	dberr_t		err	= DB_SUCCESS;
3891 	ulint		namelen	= strlen(name);
3892 	bool		is_partition = false;
3893 
3894 	ut_ad(found != NULL);
3895 
3896 	DBUG_ENTER("row_drop_database_for_mysql");
3897 
3898 	DBUG_PRINT("row_drop_database_for_mysql", ("db: '%s'", name));
3899 
3900 	ut_a(name != NULL);
3901 	/* Assert DB name or partition name. */
3902 	if (name[namelen - 1] == '#') {
3903 		ut_ad(name[namelen - 2] != '/');
3904 		is_partition = true;
3905 		trx->op_info = "dropping partitions";
3906 	} else {
3907 		ut_a(name[namelen - 1] == '/');
3908 		trx->op_info = "dropping database";
3909 	}
3910 
3911 	*found = 0;
3912 
3913 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
3914 
3915 	trx_start_if_not_started_xa(trx, true);
3916 
3917 loop:
3918 	row_mysql_lock_data_dictionary(trx);
3919 
3920 	while ((table_name = dict_get_first_table_name_in_db(name))) {
3921 		/* Drop parent table if it is a fts aux table, to
3922 		avoid accessing dropped fts aux tables in information
3923 		scheam when parent table still exists.
3924 		Note: Drop parent table will drop fts aux tables. */
3925 		char*		parent_table_name = NULL;
3926 		table_id_t	table_id;
3927 		index_id_t	index_id;
3928 
3929 		if (fts_check_aux_table(
3930 				table_name, &table_id, &index_id)) {
3931 			dict_table_t* parent_table = dict_table_open_on_id(
3932 					table_id, TRUE, DICT_TABLE_OP_NORMAL);
3933 			if (parent_table != NULL) {
3934 				parent_table_name = mem_strdupl(
3935 					parent_table->name.m_name,
3936 					strlen(parent_table->name.m_name));
3937 				dict_table_close(parent_table, TRUE, FALSE);
3938 			}
3939 		}
3940 
3941 		if (parent_table_name != NULL) {
3942 			ut_free(table_name);
3943 			table_name = parent_table_name;
3944 		}
3945 
3946 		ut_a(memcmp(table_name, name, namelen) == 0);
3947 
3948 		table = dict_table_open_on_name(
3949 			table_name, TRUE, FALSE, static_cast<dict_err_ignore_t>(
3950 				DICT_ERR_IGNORE_INDEX_ROOT
3951 				| DICT_ERR_IGNORE_CORRUPT));
3952 
3953 		if (!table) {
3954 			ib::error() << "Cannot load table " << table_name
3955 				<< " from InnoDB internal data dictionary"
3956 				" during drop database";
3957 			ut_free(table_name);
3958 			err = DB_TABLE_NOT_FOUND;
3959 			break;
3960 
3961 		}
3962 
3963 		if (!table->name.is_temporary()) {
3964 			/* There could be orphan temp tables left from
3965 			interrupted alter table. Leave them, and handle
3966 			the rest.*/
3967 			if (table->can_be_evicted
3968 			    && (name[namelen - 1] != '#')) {
3969 				ib::warn() << "Orphan table encountered during"
3970 					" DROP DATABASE. This is possible if '"
3971 					<< table->name << ".frm' was lost.";
3972 			}
3973 
3974 			if (!table->is_readable() && !table->space) {
3975 				ib::warn() << "Missing .ibd file for table "
3976 					<< table->name << ".";
3977 			}
3978 		}
3979 
3980 		dict_table_close(table, TRUE, FALSE);
3981 
3982 		/* The dict_table_t object must not be accessed before
3983 		dict_table_open() or after dict_table_close(). But this is OK
3984 		if we are holding, the dict_sys.mutex. */
3985 		ut_ad(mutex_own(&dict_sys.mutex));
3986 
3987 		/* Disable statistics on the found table. */
3988 		if (!dict_stats_stop_bg(table)) {
3989 			row_mysql_unlock_data_dictionary(trx);
3990 
3991 			os_thread_sleep(250000);
3992 
3993 			ut_free(table_name);
3994 
3995 			goto loop;
3996 		}
3997 
3998 		/* Wait until MySQL does not have any queries running on
3999 		the table */
4000 
4001 		if (table->get_ref_count() > 0) {
4002 			row_mysql_unlock_data_dictionary(trx);
4003 
4004 			ib::warn() << "MySQL is trying to drop database "
4005 				<< ut_get_name(trx, name) << " though"
4006 				" there are still open handles to table "
4007 				<< table->name << ".";
4008 
4009 			os_thread_sleep(1000000);
4010 
4011 			ut_free(table_name);
4012 
4013 			goto loop;
4014 		}
4015 
4016 		err = row_drop_table_for_mysql(
4017 			table_name, trx, SQLCOM_DROP_DB);
4018 		trx_commit_for_mysql(trx);
4019 
4020 		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
4021 			ib::error() << "DROP DATABASE "
4022 				<< ut_get_name(trx, name) << " failed"
4023 				" with error (" << err << ") for"
4024 				" table " << ut_get_name(trx, table_name);
4025 			ut_free(table_name);
4026 			break;
4027 		}
4028 
4029 		ut_free(table_name);
4030 		(*found)++;
4031 	}
4032 
4033 	/* Partitioning does not yet support foreign keys. */
4034 	if (err == DB_SUCCESS && !is_partition) {
4035 		/* after dropping all tables try to drop all leftover
4036 		foreign keys in case orphaned ones exist */
4037 		err = drop_all_foreign_keys_in_db(name, trx);
4038 
4039 		if (err != DB_SUCCESS) {
4040 			const std::string&	db = ut_get_name(trx, name);
4041 			ib::error() << "DROP DATABASE " << db << " failed with"
4042 				" error " << err << " while dropping all"
4043 				" foreign keys";
4044 		}
4045 	}
4046 
4047 	trx_commit_for_mysql(trx);
4048 
4049 	row_mysql_unlock_data_dictionary(trx);
4050 
4051 	trx->op_info = "";
4052 
4053 	DBUG_RETURN(err);
4054 }
4055 
4056 /****************************************************************//**
4057 Delete a single constraint.
4058 @return error code or DB_SUCCESS */
4059 static MY_ATTRIBUTE((nonnull, warn_unused_result))
4060 dberr_t
row_delete_constraint_low(const char * id,trx_t * trx)4061 row_delete_constraint_low(
4062 /*======================*/
4063 	const char*	id,		/*!< in: constraint id */
4064 	trx_t*		trx)		/*!< in: transaction handle */
4065 {
4066 	pars_info_t*	info = pars_info_create();
4067 
4068 	pars_info_add_str_literal(info, "id", id);
4069 
4070 	return(que_eval_sql(info,
4071 			    "PROCEDURE DELETE_CONSTRAINT () IS\n"
4072 			    "BEGIN\n"
4073 			    "DELETE FROM SYS_FOREIGN_COLS WHERE ID = :id;\n"
4074 			    "DELETE FROM SYS_FOREIGN WHERE ID = :id;\n"
4075 			    "END;\n"
4076 			    , FALSE, trx));
4077 }
4078 
4079 /****************************************************************//**
4080 Delete a single constraint.
4081 @return error code or DB_SUCCESS */
4082 static MY_ATTRIBUTE((nonnull, warn_unused_result))
4083 dberr_t
row_delete_constraint(const char * id,const char * database_name,mem_heap_t * heap,trx_t * trx)4084 row_delete_constraint(
4085 /*==================*/
4086 	const char*	id,		/*!< in: constraint id */
4087 	const char*	database_name,	/*!< in: database name, with the
4088 					trailing '/' */
4089 	mem_heap_t*	heap,		/*!< in: memory heap */
4090 	trx_t*		trx)		/*!< in: transaction handle */
4091 {
4092 	dberr_t	err;
4093 
4094 	/* New format constraints have ids <databasename>/<constraintname>. */
4095 	err = row_delete_constraint_low(
4096 		mem_heap_strcat(heap, database_name, id), trx);
4097 
4098 	if ((err == DB_SUCCESS) && !strchr(id, '/')) {
4099 		/* Old format < 4.0.18 constraints have constraint ids
4100 		NUMBER_NUMBER. We only try deleting them if the
4101 		constraint name does not contain a '/' character, otherwise
4102 		deleting a new format constraint named 'foo/bar' from
4103 		database 'baz' would remove constraint 'bar' from database
4104 		'foo', if it existed. */
4105 
4106 		err = row_delete_constraint_low(id, trx);
4107 	}
4108 
4109 	return(err);
4110 }
4111 
4112 /*********************************************************************//**
4113 Renames a table for MySQL.
4114 @return error code or DB_SUCCESS */
4115 dberr_t
row_rename_table_for_mysql(const char * old_name,const char * new_name,trx_t * trx,bool commit,bool use_fk)4116 row_rename_table_for_mysql(
4117 /*=======================*/
4118 	const char*	old_name,	/*!< in: old table name */
4119 	const char*	new_name,	/*!< in: new table name */
4120 	trx_t*		trx,		/*!< in/out: transaction */
4121 	bool		commit,		/*!< in: whether to commit trx */
4122 	bool		use_fk)		/*!< in: whether to parse and enforce
4123 					FOREIGN KEY constraints */
4124 {
4125 	dict_table_t*	table			= NULL;
4126 	dberr_t		err			= DB_ERROR;
4127 	mem_heap_t*	heap			= NULL;
4128 	const char**	constraints_to_drop	= NULL;
4129 	ulint		n_constraints_to_drop	= 0;
4130 	ibool		old_is_tmp, new_is_tmp;
4131 	pars_info_t*	info			= NULL;
4132 	int		retry;
4133 	bool		aux_fts_rename		= false;
4134 	char*		is_part 		= NULL;
4135 
4136 	ut_a(old_name != NULL);
4137 	ut_a(new_name != NULL);
4138 	ut_ad(trx->state == TRX_STATE_ACTIVE);
4139 	const bool dict_locked = trx->dict_operation_lock_mode == RW_X_LATCH;
4140 	ut_ad(!commit || dict_locked);
4141 
4142 	if (high_level_read_only) {
4143 		return(DB_READ_ONLY);
4144 	}
4145 
4146 	trx->op_info = "renaming table";
4147 
4148 	old_is_tmp = dict_table_t::is_temporary_name(old_name);
4149 	new_is_tmp = dict_table_t::is_temporary_name(new_name);
4150 
4151 	table = dict_table_open_on_name(old_name, dict_locked, FALSE,
4152 					DICT_ERR_IGNORE_FK_NOKEY);
4153 
4154 	/* We look for pattern #P# to see if the table is partitioned
4155 	MySQL table. */
4156 #ifdef __WIN__
4157 	is_part = strstr((char *)old_name, (char *)"#p#");
4158 #else
4159 	is_part = strstr((char *)old_name, (char *)"#P#");
4160 #endif /* __WIN__ */
4161 
4162 	/* MySQL partition engine hard codes the file name
4163 	separator as "#P#". The text case is fixed even if
4164 	lower_case_table_names is set to 1 or 2. This is true
4165 	for sub-partition names as well. InnoDB always
4166 	normalises file names to lower case on Windows, this
4167 	can potentially cause problems when copying/moving
4168 	tables between platforms.
4169 
4170 	1) If boot against an installation from Windows
4171 	platform, then its partition table name could
4172 	be all be in lower case in system tables. So we
4173 	will need to check lower case name when load table.
4174 
4175 	2) If  we boot an installation from other case
4176 	sensitive platform in Windows, we might need to
4177 	check the existence of table name without lowering
4178 	case them in the system table. */
4179 	if (!table &&
4180 	    is_part &&
4181 	    innobase_get_lower_case_table_names() == 1) {
4182 		char par_case_name[MAX_FULL_NAME_LEN + 1];
4183 #ifndef __WIN__
4184 		/* Check for the table using lower
4185 		case name, including the partition
4186 		separator "P" */
4187 		memcpy(par_case_name, old_name,
4188 			strlen(old_name));
4189 		par_case_name[strlen(old_name)] = 0;
4190 		innobase_casedn_str(par_case_name);
4191 #else
4192 		/* On Windows platfrom, check
4193 		whether there exists table name in
4194 		system table whose name is
4195 		not being normalized to lower case */
4196 		normalize_table_name_c_low(
4197 			par_case_name, old_name, FALSE);
4198 #endif
4199 		table = dict_table_open_on_name(par_case_name, dict_locked, FALSE,
4200 						DICT_ERR_IGNORE_FK_NOKEY);
4201 	}
4202 
4203 	if (!table) {
4204 		err = DB_TABLE_NOT_FOUND;
4205 		goto funct_exit;
4206 
4207 	} else if (!table->is_readable() && !table->space
4208 		   && !(table->flags2 & DICT_TF2_DISCARDED)) {
4209 
4210 		err = DB_TABLE_NOT_FOUND;
4211 
4212 		ib::error() << "Table " << old_name << " does not have an .ibd"
4213 			" file in the database directory. "
4214 			<< TROUBLESHOOTING_MSG;
4215 
4216 		goto funct_exit;
4217 
4218 	} else if (use_fk && !old_is_tmp && new_is_tmp) {
4219 		/* MySQL is doing an ALTER TABLE command and it renames the
4220 		original table to a temporary table name. We want to preserve
4221 		the original foreign key constraint definitions despite the
4222 		name change. An exception is those constraints for which
4223 		the ALTER TABLE contained DROP FOREIGN KEY <foreign key id>.*/
4224 
4225 		heap = mem_heap_create(100);
4226 
4227 		err = dict_foreign_parse_drop_constraints(
4228 			heap, trx, table, &n_constraints_to_drop,
4229 			&constraints_to_drop);
4230 
4231 		if (err != DB_SUCCESS) {
4232 			goto funct_exit;
4233 		}
4234 	}
4235 
4236 	/* Is a foreign key check running on this table? */
4237 	for (retry = 0; retry < 100
4238 	     && table->n_foreign_key_checks_running > 0; ++retry) {
4239 		row_mysql_unlock_data_dictionary(trx);
4240 		os_thread_yield();
4241 		row_mysql_lock_data_dictionary(trx);
4242 	}
4243 
4244 	if (table->n_foreign_key_checks_running > 0) {
4245 		ib::error() << "In ALTER TABLE "
4246 			<< ut_get_name(trx, old_name)
4247 			<< " a FOREIGN KEY check is running. Cannot rename"
4248 			" table.";
4249 		err = DB_TABLE_IN_FK_CHECK;
4250 		goto funct_exit;
4251 	}
4252 
4253 	if (!table->is_temporary()) {
4254 		if (commit) {
4255 			dict_stats_wait_bg_to_stop_using_table(table, trx);
4256 		}
4257 
4258 		err = trx_undo_report_rename(trx, table);
4259 
4260 		if (err != DB_SUCCESS) {
4261 			goto funct_exit;
4262 		}
4263 	}
4264 
4265 	/* We use the private SQL parser of Innobase to generate the query
4266 	graphs needed in updating the dictionary data from system tables. */
4267 
4268 	info = pars_info_create();
4269 
4270 	pars_info_add_str_literal(info, "new_table_name", new_name);
4271 	pars_info_add_str_literal(info, "old_table_name", old_name);
4272 
4273 	err = que_eval_sql(info,
4274 			   "PROCEDURE RENAME_TABLE () IS\n"
4275 			   "BEGIN\n"
4276 			   "UPDATE SYS_TABLES"
4277 			   " SET NAME = :new_table_name\n"
4278 			   " WHERE NAME = :old_table_name;\n"
4279 			   "END;\n"
4280 			   , FALSE, trx);
4281 
4282 	/* Assume the caller guarantees destination name doesn't exist. */
4283 	ut_ad(err != DB_DUPLICATE_KEY);
4284 
4285 	/* SYS_TABLESPACES and SYS_DATAFILES need to be updated if
4286 	the table is in a single-table tablespace. */
4287 	if (err != DB_SUCCESS || !dict_table_is_file_per_table(table)) {
4288 	} else if (table->space) {
4289 		/* If old path and new path are the same means tablename
4290 		has not changed and only the database name holding the table
4291 		has changed so we need to make the complete filepath again. */
4292 		char*	new_path = dict_tables_have_same_db(old_name, new_name)
4293 			? os_file_make_new_pathname(
4294 				table->space->chain.start->name, new_name)
4295 			: fil_make_filepath(NULL, new_name, IBD, false);
4296 
4297 		info = pars_info_create();
4298 
4299 		pars_info_add_str_literal(info, "new_table_name", new_name);
4300 		pars_info_add_str_literal(info, "new_path_name", new_path);
4301 		pars_info_add_int4_literal(info, "space_id", table->space_id);
4302 
4303 		err = que_eval_sql(info,
4304 				   "PROCEDURE RENAME_SPACE () IS\n"
4305 				   "BEGIN\n"
4306 				   "UPDATE SYS_TABLESPACES"
4307 				   " SET NAME = :new_table_name\n"
4308 				   " WHERE SPACE = :space_id;\n"
4309 				   "UPDATE SYS_DATAFILES"
4310 				   " SET PATH = :new_path_name\n"
4311 				   " WHERE SPACE = :space_id;\n"
4312 				   "END;\n"
4313 				   , FALSE, trx);
4314 
4315 		ut_free(new_path);
4316 	}
4317 	if (err != DB_SUCCESS) {
4318 		goto err_exit;
4319 	}
4320 
4321 	if (!new_is_tmp) {
4322 		/* Rename all constraints. */
4323 		char	new_table_name[MAX_TABLE_NAME_LEN + 1];
4324 		char	old_table_utf8[MAX_TABLE_NAME_LEN + 1];
4325 		uint	errors = 0;
4326 
4327 		strncpy(old_table_utf8, old_name, MAX_TABLE_NAME_LEN);
4328 		old_table_utf8[MAX_TABLE_NAME_LEN] = '\0';
4329 		innobase_convert_to_system_charset(
4330 			strchr(old_table_utf8, '/') + 1,
4331 			strchr(old_name, '/') +1,
4332 			MAX_TABLE_NAME_LEN, &errors);
4333 
4334 		if (errors) {
4335 			/* Table name could not be converted from charset
4336 			my_charset_filename to UTF-8. This means that the
4337 			table name is already in UTF-8 (#mysql#50). */
4338 			strncpy(old_table_utf8, old_name, MAX_TABLE_NAME_LEN);
4339 			old_table_utf8[MAX_TABLE_NAME_LEN] = '\0';
4340 		}
4341 
4342 		info = pars_info_create();
4343 
4344 		pars_info_add_str_literal(info, "new_table_name", new_name);
4345 		pars_info_add_str_literal(info, "old_table_name", old_name);
4346 		pars_info_add_str_literal(info, "old_table_name_utf8",
4347 					  old_table_utf8);
4348 
4349 		strncpy(new_table_name, new_name, MAX_TABLE_NAME_LEN);
4350 		new_table_name[MAX_TABLE_NAME_LEN] = '\0';
4351 		innobase_convert_to_system_charset(
4352 			strchr(new_table_name, '/') + 1,
4353 			strchr(new_name, '/') +1,
4354 			MAX_TABLE_NAME_LEN, &errors);
4355 
4356 		if (errors) {
4357 			/* Table name could not be converted from charset
4358 			my_charset_filename to UTF-8. This means that the
4359 			table name is already in UTF-8 (#mysql#50). */
4360 			strncpy(new_table_name, new_name, MAX_TABLE_NAME_LEN);
4361 			new_table_name[MAX_TABLE_NAME_LEN] = '\0';
4362 		}
4363 
4364 		pars_info_add_str_literal(info, "new_table_utf8", new_table_name);
4365 
4366 		err = que_eval_sql(
4367 			info,
4368 			"PROCEDURE RENAME_CONSTRAINT_IDS () IS\n"
4369 			"gen_constr_prefix CHAR;\n"
4370 			"new_db_name CHAR;\n"
4371 			"foreign_id CHAR;\n"
4372 			"new_foreign_id CHAR;\n"
4373 			"old_db_name_len INT;\n"
4374 			"old_t_name_len INT;\n"
4375 			"new_db_name_len INT;\n"
4376 			"id_len INT;\n"
4377 			"offset INT;\n"
4378 			"found INT;\n"
4379 			"BEGIN\n"
4380 			"found := 1;\n"
4381 			"old_db_name_len := INSTR(:old_table_name, '/')-1;\n"
4382 			"new_db_name_len := INSTR(:new_table_name, '/')-1;\n"
4383 			"new_db_name := SUBSTR(:new_table_name, 0,\n"
4384 			"                      new_db_name_len);\n"
4385 			"old_t_name_len := LENGTH(:old_table_name);\n"
4386 			"gen_constr_prefix := CONCAT(:old_table_name_utf8,\n"
4387 			"                            '_ibfk_');\n"
4388 			"WHILE found = 1 LOOP\n"
4389 			"       SELECT ID INTO foreign_id\n"
4390 			"        FROM SYS_FOREIGN\n"
4391 			"        WHERE FOR_NAME = :old_table_name\n"
4392 			"         AND TO_BINARY(FOR_NAME)\n"
4393 			"           = TO_BINARY(:old_table_name)\n"
4394 			"         LOCK IN SHARE MODE;\n"
4395 			"       IF (SQL % NOTFOUND) THEN\n"
4396 			"        found := 0;\n"
4397 			"       ELSE\n"
4398 			"        UPDATE SYS_FOREIGN\n"
4399 			"        SET FOR_NAME = :new_table_name\n"
4400 			"         WHERE ID = foreign_id;\n"
4401 			"        id_len := LENGTH(foreign_id);\n"
4402 			"        IF (INSTR(foreign_id, '/') > 0) THEN\n"
4403 			"               IF (INSTR(foreign_id,\n"
4404 			"                         gen_constr_prefix) > 0)\n"
4405 			"               THEN\n"
4406                         "                offset := INSTR(foreign_id, '_ibfk_') - 1;\n"
4407 			"                new_foreign_id :=\n"
4408 			"                CONCAT(:new_table_utf8,\n"
4409 			"                SUBSTR(foreign_id, offset,\n"
4410 			"                       id_len - offset));\n"
4411 			"               ELSE\n"
4412 			"                new_foreign_id :=\n"
4413 			"                CONCAT(new_db_name,\n"
4414 			"                SUBSTR(foreign_id,\n"
4415 			"                       old_db_name_len,\n"
4416 			"                       id_len - old_db_name_len));\n"
4417 			"               END IF;\n"
4418 			"               UPDATE SYS_FOREIGN\n"
4419 			"                SET ID = new_foreign_id\n"
4420 			"                WHERE ID = foreign_id;\n"
4421 			"               UPDATE SYS_FOREIGN_COLS\n"
4422 			"                SET ID = new_foreign_id\n"
4423 			"                WHERE ID = foreign_id;\n"
4424 			"        END IF;\n"
4425 			"       END IF;\n"
4426 			"END LOOP;\n"
4427 			"UPDATE SYS_FOREIGN SET REF_NAME = :new_table_name\n"
4428 			"WHERE REF_NAME = :old_table_name\n"
4429 			"  AND TO_BINARY(REF_NAME)\n"
4430 			"    = TO_BINARY(:old_table_name);\n"
4431 			"END;\n"
4432 			, FALSE, trx);
4433 
4434 	} else if (n_constraints_to_drop > 0) {
4435 		/* Drop some constraints of tmp tables. */
4436 
4437 		ulint	db_name_len = dict_get_db_name_len(old_name) + 1;
4438 		char*	db_name = mem_heap_strdupl(heap, old_name,
4439 						   db_name_len);
4440 		ulint	i;
4441 
4442 		for (i = 0; i < n_constraints_to_drop; i++) {
4443 			err = row_delete_constraint(constraints_to_drop[i],
4444 						    db_name, heap, trx);
4445 
4446 			if (err != DB_SUCCESS) {
4447 				break;
4448 			}
4449 		}
4450 	}
4451 
4452 	if (err == DB_SUCCESS
4453 	    && (dict_table_has_fts_index(table)
4454 	    || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID))
4455 	    && !dict_tables_have_same_db(old_name, new_name)) {
4456 		err = fts_rename_aux_tables(table, new_name, trx);
4457 		if (err != DB_TABLE_NOT_FOUND) {
4458 			aux_fts_rename = true;
4459 		}
4460 	}
4461 
4462 	if (err != DB_SUCCESS) {
4463 err_exit:
4464 		if (err == DB_DUPLICATE_KEY) {
4465 			ib::error() << "Possible reasons:";
4466 			ib::error() << "(1) Table rename would cause two"
4467 				" FOREIGN KEY constraints to have the same"
4468 				" internal name in case-insensitive"
4469 				" comparison.";
4470 			ib::error() << "(2) Table "
4471 				<< ut_get_name(trx, new_name)
4472 				<< " exists in the InnoDB internal data"
4473 				" dictionary though MySQL is trying to rename"
4474 				" table " << ut_get_name(trx, old_name)
4475 				<< " to it. Have you deleted the .frm file and"
4476 				" not used DROP TABLE?";
4477 			ib::info() << TROUBLESHOOTING_MSG;
4478 			ib::error() << "If table "
4479 				<< ut_get_name(trx, new_name)
4480 				<< " is a temporary table #sql..., then"
4481 				" it can be that there are still queries"
4482 				" running on the table, and it will be dropped"
4483 				" automatically when the queries end. You can"
4484 				" drop the orphaned table inside InnoDB by"
4485 				" creating an InnoDB table with the same name"
4486 				" in another database and copying the .frm file"
4487 				" to the current database. Then MySQL thinks"
4488 				" the table exists, and DROP TABLE will"
4489 				" succeed.";
4490 		}
4491 		trx->error_state = DB_SUCCESS;
4492 		trx->rollback();
4493 		trx->error_state = DB_SUCCESS;
4494 	} else {
4495 		/* The following call will also rename the .ibd data file if
4496 		the table is stored in a single-table tablespace */
4497 
4498 		err = dict_table_rename_in_cache(
4499 			table, new_name, !new_is_tmp);
4500 		if (err != DB_SUCCESS) {
4501 			trx->error_state = DB_SUCCESS;
4502 			trx->rollback();
4503 			trx->error_state = DB_SUCCESS;
4504 			goto funct_exit;
4505 		}
4506 
4507 		/* In case of copy alter, template db_name and
4508 		table_name should be renamed only for newly
4509 		created table. */
4510 		if (table->vc_templ != NULL && !new_is_tmp) {
4511 			innobase_rename_vc_templ(table);
4512 		}
4513 
4514 		/* We only want to switch off some of the type checking in
4515 		an ALTER TABLE, not in a RENAME. */
4516 		dict_names_t	fk_tables;
4517 
4518 		err = dict_load_foreigns(
4519 			new_name, NULL, false,
4520 			!old_is_tmp || trx->check_foreigns,
4521 			use_fk
4522 			? DICT_ERR_IGNORE_NONE
4523 			: DICT_ERR_IGNORE_FK_NOKEY,
4524 			fk_tables);
4525 
4526 		if (err != DB_SUCCESS) {
4527 
4528 			if (old_is_tmp) {
4529 				/* In case of copy alter, ignore the
4530 				loading of foreign key constraint
4531 				when foreign_key_check is disabled */
4532 				ib::error_or_warn(trx->check_foreigns)
4533 					<< "In ALTER TABLE "
4534 					<< ut_get_name(trx, new_name)
4535 					<< " has or is referenced in foreign"
4536 					" key constraints which are not"
4537 					" compatible with the new table"
4538 					" definition.";
4539 				if (!trx->check_foreigns) {
4540 					err = DB_SUCCESS;
4541 					goto funct_exit;
4542 				}
4543 			} else {
4544 				ib::error() << "In RENAME TABLE table "
4545 					<< ut_get_name(trx, new_name)
4546 					<< " is referenced in foreign key"
4547 					" constraints which are not compatible"
4548 					" with the new table definition.";
4549 			}
4550 
4551 			trx->error_state = DB_SUCCESS;
4552 			trx->rollback();
4553 			trx->error_state = DB_SUCCESS;
4554 		}
4555 
4556 		/* Check whether virtual column or stored column affects
4557 		the foreign key constraint of the table. */
4558 		if (dict_foreigns_has_s_base_col(
4559 				table->foreign_set, table)) {
4560 			err = DB_NO_FK_ON_S_BASE_COL;
4561 			ut_a(DB_SUCCESS == dict_table_rename_in_cache(
4562 				table, old_name, FALSE));
4563 			trx->error_state = DB_SUCCESS;
4564 			trx->rollback();
4565 			trx->error_state = DB_SUCCESS;
4566 			goto funct_exit;
4567 		}
4568 
4569 		/* Fill the virtual column set in foreign when
4570 		the table undergoes copy alter operation. */
4571 		dict_mem_table_free_foreign_vcol_set(table);
4572 		dict_mem_table_fill_foreign_vcol_set(table);
4573 
4574 		while (!fk_tables.empty()) {
4575 			dict_load_table(fk_tables.front(),
4576 					DICT_ERR_IGNORE_NONE);
4577 			fk_tables.pop_front();
4578 		}
4579 
4580 		table->data_dir_path= NULL;
4581 	}
4582 
4583 funct_exit:
4584 	if (aux_fts_rename && err != DB_SUCCESS
4585 	    && table != NULL && (table->space != 0)) {
4586 
4587 		char*	orig_name = table->name.m_name;
4588 		trx_t*	trx_bg = trx_create();
4589 
4590 		/* If the first fts_rename fails, the trx would
4591 		be rolled back and committed, we can't use it any more,
4592 		so we have to start a new background trx here. */
4593 		ut_a(trx_state_eq(trx_bg, TRX_STATE_NOT_STARTED));
4594 		trx_bg->op_info = "Revert the failing rename "
4595 				  "for fts aux tables";
4596 		trx_bg->dict_operation_lock_mode = RW_X_LATCH;
4597 		trx_start_for_ddl(trx_bg, TRX_DICT_OP_TABLE);
4598 
4599 		/* If rename fails and table has its own tablespace,
4600 		we need to call fts_rename_aux_tables again to
4601 		revert the ibd file rename, which is not under the
4602 		control of trx. Also notice the parent table name
4603 		in cache is not changed yet. If the reverting fails,
4604 		the ibd data may be left in the new database, which
4605 		can be fixed only manually. */
4606 		table->name.m_name = const_cast<char*>(new_name);
4607 		fts_rename_aux_tables(table, old_name, trx_bg);
4608 		table->name.m_name = orig_name;
4609 
4610 		trx_bg->dict_operation_lock_mode = 0;
4611 		trx_commit_for_mysql(trx_bg);
4612 		trx_bg->free();
4613 	}
4614 
4615 	if (table != NULL) {
4616 		if (commit && !table->is_temporary()) {
4617 			table->stats_bg_flag &= byte(~BG_STAT_SHOULD_QUIT);
4618 		}
4619 		dict_table_close(table, dict_locked, FALSE);
4620 	}
4621 
4622 	if (commit) {
4623 		DEBUG_SYNC(trx->mysql_thd, "before_rename_table_commit");
4624 		trx_commit_for_mysql(trx);
4625 	}
4626 
4627 	if (UNIV_LIKELY_NULL(heap)) {
4628 		mem_heap_free(heap);
4629 	}
4630 
4631 	trx->op_info = "";
4632 
4633 	return(err);
4634 }
4635 
4636 /*********************************************************************//**
4637 Scans an index for either COUNT(*) or CHECK TABLE.
4638 If CHECK TABLE; Checks that the index contains entries in an ascending order,
4639 unique constraint is not broken, and calculates the number of index entries
4640 in the read view of the current transaction.
4641 @return DB_SUCCESS or other error */
4642 dberr_t
row_scan_index_for_mysql(row_prebuilt_t * prebuilt,const dict_index_t * index,ulint * n_rows)4643 row_scan_index_for_mysql(
4644 /*=====================*/
4645 	row_prebuilt_t*		prebuilt,	/*!< in: prebuilt struct
4646 						in MySQL handle */
4647 	const dict_index_t*	index,		/*!< in: index */
4648 	ulint*			n_rows)		/*!< out: number of entries
4649 						seen in the consistent read */
4650 {
4651 	dtuple_t*	prev_entry	= NULL;
4652 	ulint		matched_fields;
4653 	byte*		buf;
4654 	dberr_t		ret;
4655 	rec_t*		rec;
4656 	int		cmp;
4657 	ibool		contains_null;
4658 	ulint		i;
4659 	ulint		cnt;
4660 	mem_heap_t*	heap		= NULL;
4661 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
4662 	rec_offs*	offsets;
4663 	rec_offs_init(offsets_);
4664 
4665 	*n_rows = 0;
4666 
4667 	/* Don't support RTree Leaf level scan */
4668 	ut_ad(!dict_index_is_spatial(index));
4669 
4670 	if (dict_index_is_clust(index)) {
4671 		/* The clustered index of a table is always available.
4672 		During online ALTER TABLE that rebuilds the table, the
4673 		clustered index in the old table will have
4674 		index->online_log pointing to the new table. All
4675 		indexes of the old table will remain valid and the new
4676 		table will be unaccessible to MySQL until the
4677 		completion of the ALTER TABLE. */
4678 	} else if (dict_index_is_online_ddl(index)
4679 		   || (index->type & DICT_FTS)) {
4680 		/* Full Text index are implemented by auxiliary tables,
4681 		not the B-tree. We also skip secondary indexes that are
4682 		being created online. */
4683 		return(DB_SUCCESS);
4684 	}
4685 
4686 	ulint bufsize = std::max<ulint>(srv_page_size,
4687 					prebuilt->mysql_row_len);
4688 	buf = static_cast<byte*>(ut_malloc_nokey(bufsize));
4689 	heap = mem_heap_create(100);
4690 
4691 	cnt = 1000;
4692 
4693 	ret = row_search_for_mysql(buf, PAGE_CUR_G, prebuilt, 0, 0);
4694 loop:
4695 	/* Check thd->killed every 1,000 scanned rows */
4696 	if (--cnt == 0) {
4697 		if (trx_is_interrupted(prebuilt->trx)) {
4698 			ret = DB_INTERRUPTED;
4699 			goto func_exit;
4700 		}
4701 		cnt = 1000;
4702 	}
4703 
4704 	switch (ret) {
4705 	case DB_SUCCESS:
4706 		break;
4707 	case DB_DEADLOCK:
4708 	case DB_LOCK_TABLE_FULL:
4709 	case DB_LOCK_WAIT_TIMEOUT:
4710 	case DB_INTERRUPTED:
4711 		goto func_exit;
4712 	default:
4713 		ib::warn() << "CHECK TABLE on index " << index->name << " of"
4714 			" table " << index->table->name << " returned " << ret;
4715 		/* (this error is ignored by CHECK TABLE) */
4716 		/* fall through */
4717 	case DB_END_OF_INDEX:
4718 		ret = DB_SUCCESS;
4719 func_exit:
4720 		ut_free(buf);
4721 		mem_heap_free(heap);
4722 
4723 		return(ret);
4724 	}
4725 
4726 	*n_rows = *n_rows + 1;
4727 
4728 	/* else this code is doing handler::check() for CHECK TABLE */
4729 
4730 	/* row_search... returns the index record in buf, record origin offset
4731 	within buf stored in the first 4 bytes, because we have built a dummy
4732 	template */
4733 
4734 	rec = buf + mach_read_from_4(buf);
4735 
4736 	offsets = rec_get_offsets(rec, index, offsets_, index->n_core_fields,
4737 				  ULINT_UNDEFINED, &heap);
4738 
4739 	if (prev_entry != NULL) {
4740 		matched_fields = 0;
4741 
4742 		cmp = cmp_dtuple_rec_with_match(prev_entry, rec, offsets,
4743 						&matched_fields);
4744 		contains_null = FALSE;
4745 
4746 		/* In a unique secondary index we allow equal key values if
4747 		they contain SQL NULLs */
4748 
4749 		for (i = 0;
4750 		     i < dict_index_get_n_ordering_defined_by_user(index);
4751 		     i++) {
4752 			if (UNIV_SQL_NULL == dfield_get_len(
4753 				    dtuple_get_nth_field(prev_entry, i))) {
4754 
4755 				contains_null = TRUE;
4756 				break;
4757 			}
4758 		}
4759 
4760 		const char* msg;
4761 
4762 		if (cmp > 0) {
4763 			ret = DB_INDEX_CORRUPT;
4764 			msg = "index records in a wrong order in ";
4765 not_ok:
4766 			ib::error()
4767 				<< msg << index->name
4768 				<< " of table " << index->table->name
4769 				<< ": " << *prev_entry << ", "
4770 				<< rec_offsets_print(rec, offsets);
4771 			/* Continue reading */
4772 		} else if (dict_index_is_unique(index)
4773 			   && !contains_null
4774 			   && matched_fields
4775 			   >= dict_index_get_n_ordering_defined_by_user(
4776 				   index)) {
4777 			ret = DB_DUPLICATE_KEY;
4778 			msg = "duplicate key in ";
4779 			goto not_ok;
4780 		}
4781 	}
4782 
4783 	{
4784 		mem_heap_t*	tmp_heap = NULL;
4785 
4786 		/* Empty the heap on each round.  But preserve offsets[]
4787 		for the row_rec_to_index_entry() call, by copying them
4788 		into a separate memory heap when needed. */
4789 		if (UNIV_UNLIKELY(offsets != offsets_)) {
4790 			ulint	size = rec_offs_get_n_alloc(offsets)
4791 				* sizeof *offsets;
4792 
4793 			tmp_heap = mem_heap_create(size);
4794 
4795 			offsets = static_cast<rec_offs*>(
4796 				mem_heap_dup(tmp_heap, offsets, size));
4797 		}
4798 
4799 		mem_heap_empty(heap);
4800 
4801 		prev_entry = row_rec_to_index_entry(
4802 			rec, index, offsets, heap);
4803 
4804 		if (UNIV_LIKELY_NULL(tmp_heap)) {
4805 			mem_heap_free(tmp_heap);
4806 		}
4807 	}
4808 
4809 	ret = row_search_for_mysql(
4810 		buf, PAGE_CUR_G, prebuilt, 0, ROW_SEL_NEXT);
4811 
4812 	goto loop;
4813 }
4814 
4815 /*********************************************************************//**
4816 Initialize this module */
4817 void
row_mysql_init(void)4818 row_mysql_init(void)
4819 /*================*/
4820 {
4821 	mutex_create(LATCH_ID_ROW_DROP_LIST, &row_drop_list_mutex);
4822 
4823 	UT_LIST_INIT(
4824 		row_mysql_drop_list,
4825 		&row_mysql_drop_t::row_mysql_drop_list);
4826 
4827 	row_mysql_drop_list_inited = true;
4828 }
4829 
row_mysql_close()4830 void row_mysql_close()
4831 {
4832   ut_ad(!UT_LIST_GET_LEN(row_mysql_drop_list) ||
4833         srv_force_recovery >= SRV_FORCE_NO_BACKGROUND);
4834   if (row_mysql_drop_list_inited)
4835   {
4836     row_mysql_drop_list_inited= false;
4837     mutex_free(&row_drop_list_mutex);
4838 
4839     while (row_mysql_drop_t *drop= UT_LIST_GET_FIRST(row_mysql_drop_list))
4840     {
4841       UT_LIST_REMOVE(row_mysql_drop_list, drop);
4842       ut_free(drop);
4843     }
4844   }
4845 }
4846