1 /*****************************************************************************
2 
3 Copyright (c) 2000, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0mysql.cc
29 Interface between Innobase row operations and MySQL.
30 Contains also create table and other data dictionary operations.
31 
32 Created 9/17/2000 Heikki Tuuri
33 *******************************************************/
34 
35 #include "ha_prototypes.h"
36 #include <debug_sync.h>
37 #include <gstream.h>
38 #include <spatial.h>
39 #include <log.h>
40 #include <mysys_err.h>
41 #include <sql_error.h>
42 #include <vector>
43 
44 #include "row0mysql.h"
45 
46 #ifdef UNIV_NONINL
47 #include "row0mysql.ic"
48 #endif
49 
50 #include "btr0sea.h"
51 #include "dict0boot.h"
52 #include "dict0crea.h"
53 #include <sql_const.h>
54 #include "dict0dict.h"
55 #include "dict0load.h"
56 #include "dict0priv.h"
57 #include "dict0boot.h"
58 #include "dict0stats.h"
59 #include "dict0stats_bg.h"
60 #include "fil0fil.h"
61 #include "fsp0file.h"
62 #include "fsp0sysspace.h"
63 #include "fts0fts.h"
64 #include "fts0types.h"
65 #include "ibuf0ibuf.h"
66 #include "lock0lock.h"
67 #include "log0log.h"
68 #include "pars0pars.h"
69 #include "que0que.h"
70 #include "rem0cmp.h"
71 #include "row0import.h"
72 #include "row0ins.h"
73 #include "row0merge.h"
74 #include "row0row.h"
75 #include "row0sel.h"
76 #include "row0upd.h"
77 #include "srv0srv.h"
78 #include "trx0purge.h"
79 #include "trx0rec.h"
80 #include "trx0roll.h"
81 #include "trx0undo.h"
82 #include "row0ext.h"
83 #include "ut0new.h"
84 #include "zlib.h"
85 #include <algorithm>
86 #include <deque>
87 #include <vector>
88 #include "fil0crypt.h"
89 
90 const char* MODIFICATIONS_NOT_ALLOWED_MSG_FORCE_RECOVERY =
91 	"innodb_force_recovery is on. We do not allow database modifications"
92 	" by the user. Shut down mysqld and edit my.cnf to set"
93 	" innodb_force_recovery=0";
94 
95 /** Provide optional 4.x backwards compatibility for 5.0 and above */
96 ibool	row_rollback_on_timeout	= FALSE;
97 
98 /**
99 Z_NO_COMPRESSION = 0
100 Z_BEST_SPEED = 1
101 Z_BEST_COMPRESSION = 9
102 Z_DEFAULT_COMPRESSION = -1
103 Compression level to be used by zlib for compressed-blob columns.
104 Settable by user.
105 */
106 uint	srv_compressed_columns_zip_level = DEFAULT_COMPRESSION_LEVEL;
107 /**
108 (Z_FILTERED | Z_HUFFMAN_ONLY | Z_RLE | Z_FIXED | Z_DEFAULT_STRATEGY)
109 
110 The strategy parameter is used to tune the compression algorithm. Use the
111 value Z_DEFAULT_STRATEGY for normal data, Z_FILTERED for data produced by a
112 filter (or predictor), Z_HUFFMAN_ONLY to force Huffman encoding only
113 (no string match), or Z_RLE to limit match distances to one
114 (run-length encoding). Filtered data consists mostly of small values with a
115 somewhat random distribution. In this case, the compression algorithm is
116 tuned to compress them better.
117 The effect of Z_FILTERED is to force more Huffman coding and less string
118 matching; it is somewhat intermediate between Z_DEFAULT_STRATEGY and
119 Z_HUFFMAN_ONLY. Z_RLE is designed to be almost as fast as Z_HUFFMAN_ONLY,
120 but give better compression for PNG image data. The strategy parameter only
121 affects the compression ratio but not the correctness of the compressed
122 output even if it is not set appropriately. Z_FIXED prevents the use of
123 dynamic Huffman codes, allowing for a simpler decoder for special
124 applications.
125 */
126 const uint	srv_compressed_columns_zlib_strategy = Z_DEFAULT_STRATEGY;
127 /** Compress the column if the data length exceeds this value. */
128 ulong	srv_compressed_columns_threshold = 96;
129 /**
130 Determine if zlib needs to compute adler32 value for the compressed data.
131 This variables is similar to page_zip_zlib_wrap, but only used by
132 compressed blob columns.
133 */
134 const bool	srv_compressed_columns_zlib_wrap = true;
135 /**
136 Determine if zlib will use custom memory allocation functions based on
137 InnoDB memory heap routines (mem_heap_t*).
138 */
139 const bool	srv_compressed_columns_zlib_use_heap = false;
140 /** Chain node of the list of tables to drop in the background. */
141 struct row_mysql_drop_t{
142 	char*				table_name;	/*!< table name */
143 	UT_LIST_NODE_T(row_mysql_drop_t)row_mysql_drop_list;
144 							/*!< list chain node */
145 };
146 
147 /** @brief List of tables we should drop in background.
148 
149 ALTER TABLE in MySQL requires that the table handler can drop the
150 table in background when there are no queries to it any
151 more.  Protected by row_drop_list_mutex. */
152 static UT_LIST_BASE_NODE_T(row_mysql_drop_t)	row_mysql_drop_list;
153 
154 /** Mutex protecting the background table drop list. */
155 static ib_mutex_t row_drop_list_mutex;
156 
157 /** Flag: has row_mysql_drop_list been initialized? */
158 static ibool	row_mysql_drop_list_inited	= FALSE;
159 
160 extern ib_mutex_t	master_key_id_mutex;
161 
162 /*******************************************************************//**
163 Determine if the given name is a name reserved for MySQL system tables.
164 @return TRUE if name is a MySQL system table name */
165 static
166 ibool
row_mysql_is_system_table(const char * name)167 row_mysql_is_system_table(
168 /*======================*/
169 	const char*	name)
170 {
171 	if (strncmp(name, "mysql/", 6) != 0) {
172 
173 		return(FALSE);
174 	}
175 
176 	return(0 == strcmp(name + 6, "host")
177 	       || 0 == strcmp(name + 6, "user")
178 	       || 0 == strcmp(name + 6, "db"));
179 }
180 
181 /*********************************************************************//**
182 If a table is not yet in the drop list, adds the table to the list of tables
183 which the master thread drops in background. We need this on Unix because in
184 ALTER TABLE MySQL may call drop table even if the table has running queries on
185 it. Also, if there are running foreign key checks on the table, we drop the
186 table lazily.
187 @return TRUE if the table was not yet in the drop list, and was added there */
188 static
189 ibool
190 row_add_table_to_background_drop_list(
191 /*==================================*/
192 	const char*	name);	/*!< in: table name */
193 
194 #ifdef UNIV_DEBUG
195 /** Wait for the background drop list to become empty. */
196 void
row_wait_for_background_drop_list_empty()197 row_wait_for_background_drop_list_empty()
198 {
199 	bool	empty = false;
200 	while (!empty) {
201 		mutex_enter(&row_drop_list_mutex);
202 		empty = (UT_LIST_GET_LEN(row_mysql_drop_list) == 0);
203 		mutex_exit(&row_drop_list_mutex);
204 		os_thread_sleep(100000);
205 	}
206 }
207 #endif /* UNIV_DEBUG */
208 
209 /*******************************************************************//**
210 Delays an INSERT, DELETE or UPDATE operation if the purge is lagging. */
211 static
212 void
row_mysql_delay_if_needed(void)213 row_mysql_delay_if_needed(void)
214 /*===========================*/
215 {
216 	if (srv_dml_needed_delay) {
217 		os_thread_sleep(srv_dml_needed_delay);
218 	}
219 }
220 
221 /*******************************************************************//**
222 Frees the blob heap in prebuilt when no longer needed. */
223 void
row_mysql_prebuilt_free_blob_heap(row_prebuilt_t * prebuilt)224 row_mysql_prebuilt_free_blob_heap(
225 /*==============================*/
226 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct of a
227 					ha_innobase:: table handle */
228 {
229 	DBUG_ENTER("row_mysql_prebuilt_free_blob_heap");
230 
231 	DBUG_PRINT("row_mysql_prebuilt_free_blob_heap",
232 		   ("blob_heap freeing: %p", prebuilt->blob_heap));
233 
234 	mem_heap_free(prebuilt->blob_heap);
235 	prebuilt->blob_heap = NULL;
236 	DBUG_VOID_RETURN;
237 }
238 
239 /** Frees the compress heap in prebuilt when no longer needed. */
240 void
row_mysql_prebuilt_free_compress_heap(row_prebuilt_t * prebuilt)241 row_mysql_prebuilt_free_compress_heap(
242 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct of a
243 					ha_innobase:: table handle */
244 {
245 	mem_heap_free(prebuilt->compress_heap);
246 	prebuilt->compress_heap = NULL;
247 }
248 
249 /*******************************************************************//**
250 Stores a >= 5.0.3 format true VARCHAR length to dest, in the MySQL row
251 format.
252 @return pointer to the data, we skip the 1 or 2 bytes at the start
253 that are used to store the len */
254 byte*
row_mysql_store_true_var_len(byte * dest,ulint len,ulint lenlen)255 row_mysql_store_true_var_len(
256 /*=========================*/
257 	byte*	dest,	/*!< in: where to store */
258 	ulint	len,	/*!< in: length, must fit in two bytes */
259 	ulint	lenlen)	/*!< in: storage length of len: either 1 or 2 bytes */
260 {
261 	if (lenlen == 2) {
262 		ut_a(len < 256 * 256);
263 
264 		mach_write_to_2_little_endian(dest, len);
265 
266 		return(dest + 2);
267 	}
268 
269 	ut_a(lenlen == 1);
270 	ut_a(len < 256);
271 
272 	mach_write_to_1(dest, len);
273 
274 	return(dest + 1);
275 }
276 
277 /*******************************************************************//**
278 Reads a >= 5.0.3 format true VARCHAR length, in the MySQL row format, and
279 returns a pointer to the data.
280 @return pointer to the data, we skip the 1 or 2 bytes at the start
281 that are used to store the len */
282 const byte*
row_mysql_read_true_varchar(ulint * len,const byte * field,ulint lenlen)283 row_mysql_read_true_varchar(
284 /*========================*/
285 	ulint*		len,	/*!< out: variable-length field length */
286 	const byte*	field,	/*!< in: field in the MySQL format */
287 	ulint		lenlen)	/*!< in: storage length of len: either 1
288 				or 2 bytes */
289 {
290 	if (lenlen == 2) {
291 		*len = mach_read_from_2_little_endian(field);
292 
293 		return(field + 2);
294 	}
295 
296 	ut_a(lenlen == 1);
297 
298 	*len = mach_read_from_1(field);
299 
300 	return(field + 1);
301 }
302 
303 /**
304   Compressed BLOB header format:
305   ---------------------------------------------------------------
306   | reserved | wrap | algorithm | len-len | compressed | unused |
307   |      [1] |  [1] |       [5] |     [3] |        [1] |    [5] |
308   ---------------------------------------------------------------
309   | 0      0 | 1  1 | 2       6 | 7     9 | 10      10 | 11  15 |
310   ---------------------------------------------------------------
311   * 'reserved' bit is planned to be used in future versions of the BLOB
312   header. In this version it must always be
313   'default_zip_column_reserved_value' (0).
314   * 'wrap' identifies if compression algorithm calculated a checksum
315   (adler32 in case of zlib) and appended it to the compressed data.
316   * 'algorithm' identifies which algoritm was used to compress this BLOB.
317   Currently, the only value 'default_zip_column_algorithm_value' (0) is
318   supported.
319   * 'len-len' field identifies the length of the column length data portion
320   followed by this header (see below).
321   * If 'compressed' bit is set to 1, then this header is immediately followed
322   by 1..8 bytes (depending on the value of 'len-len' bitfield) which
323   determine original (uncompressed) block size. These 'len-len' bytes are
324   followed by compressed representation of the original data.
325   * If 'compressed' bit is set to 0, every other bitfield ('wrap',
326   'algorithm' and 'le-len') must be ignored. In this case the header is
327   immediately followed by uncompressed (original) data.
328 */
329 
330 /**
331   Currently the only supported value for the 'reserved' field is
332   false (0).
333 */
334 static const bool default_zip_column_reserved_value = false;
335 
336 /**
337   Currently the only supported value for the 'algorithm' field is 0, which
338   means 'zlib'.
339 */
340 static const uint default_zip_column_algorithm_value = 0;
341 
342 static const size_t zip_column_prefix_max_length =
343 	ZIP_COLUMN_HEADER_LENGTH + 8;
344 static const size_t zip_column_header_length = ZIP_COLUMN_HEADER_LENGTH;
345 
346 /* 'reserved', bit 0 */
347 static const uint zip_column_reserved = 0;
348 /* 0000 0000 0000 0001 */
349 static const uint zip_column_reserved_mask = 0x0001;
350 
351 /* 'wrap', bit 1 */
352 static const uint zip_column_wrap = 1;
353 /* 0000 0000 0000 0010 */
354 static const uint zip_column_wrap_mask = 0x0002;
355 
356 /* 'algorithm', bit 2,3,4,5,6 */
357 static const uint zip_column_algorithm = 2;
358 /* 0000 0000 0111 1100 */
359 static const uint zip_column_algorithm_mask = 0x007C;
360 
361 /* 'len-len', bit 7,8,9 */
362 static const uint zip_column_data_length = 7;
363 /* 0000 0011 1000 0000 */
364 static const uint zip_column_data_length_mask = 0x0380;
365 
366 /* 'compressed', bit 10 */
367 static const uint zip_column_compressed = 10;
368 /* 0000 0100 0000 0000 */
369 static const uint zip_column_compressed_mask = 0x0400;
370 
371 /** Updates compressed block header with the given components */
372 static void
column_set_compress_header(byte * data,bool compressed,ulint lenlen,uint alg,bool wrap,bool reserved)373 column_set_compress_header(
374 	byte*	data,
375 	bool	compressed,
376 	ulint	lenlen,
377 	uint	alg,
378 	bool	wrap,
379 	bool	reserved)
380 {
381 	ulint header = 0;
382 	header |= (compressed << zip_column_compressed);
383 	header |= (lenlen << zip_column_data_length);
384 	header |= (alg << zip_column_algorithm);
385 	header |= (wrap << zip_column_wrap);
386 	header |= (reserved << zip_column_reserved);
387 	mach_write_to_2(data, header);
388 }
389 
390 /** Parse compressed block header into components */
391 static void
column_get_compress_header(const byte * data,bool * compressed,ulint * lenlen,uint * alg,bool * wrap,bool * reserved)392 column_get_compress_header(
393 	const byte*	data,
394 	bool*		compressed,
395 	ulint*		lenlen,
396 	uint*		alg,
397 	bool*		wrap,
398 	bool*		reserved
399 )
400 {
401 	ulint header = mach_read_from_2(data);
402 	*compressed = ((header & zip_column_compressed_mask) >>
403 		zip_column_compressed);
404 	*lenlen = ((header & zip_column_data_length_mask) >>
405 		zip_column_data_length);
406 	*alg = ((header & zip_column_algorithm_mask) >>
407 		zip_column_algorithm);
408 	*wrap = ((header & zip_column_wrap_mask) >>
409 		zip_column_wrap);
410 	*reserved = ((header & zip_column_reserved_mask) >>
411 		zip_column_reserved);
412 }
413 
414 /** Allocate memory for zlib. */
415 static
416 void*
column_zip_zalloc(void * opaque,uInt items,uInt size)417 column_zip_zalloc(
418 	void*	opaque,	/*!< in/out: memory heap */
419 	uInt	items,	/*!< in: number of items to allocate */
420 	uInt	size)	/*!< in: size of an item in bytes */
421 {
422 	return(mem_heap_zalloc(static_cast<mem_heap_t*>(opaque),
423 		items * size));
424 }
425 
426 /** Deallocate memory for zlib. */
427 static
428 void
column_zip_free(void * opaque MY_ATTRIBUTE ((unused)),void * address MY_ATTRIBUTE ((unused)))429 column_zip_free(
430 	 void*	opaque MY_ATTRIBUTE((unused)),	/*!< in: memory heap */
431 	 void*	address MY_ATTRIBUTE((unused)))	/*!< in: object to free */
432 {
433 }
434 
435 /** Configure the zlib allocator to use the given memory heap. */
436 static
437 void
column_zip_set_alloc(void * stream,mem_heap_t * heap)438 column_zip_set_alloc(
439 	void*		stream,	/*!< in/out: zlib stream */
440 	mem_heap_t*	heap)	/*!< in: memory heap to use */
441 {
442 	z_stream* strm = static_cast<z_stream*>(stream);
443 
444 	if (srv_compressed_columns_zlib_use_heap) {
445 		strm->zalloc = column_zip_zalloc;
446 		strm->zfree = column_zip_free;
447 		strm->opaque = heap;
448 	} else {
449 		strm->zalloc = (alloc_func)0;
450 		strm->zfree = (free_func)0;
451 		strm->opaque = (voidpf)0;
452 	}
453 }
454 
455 /** Compress blob/text/varchar column using zlib
456 @return pointer to the compressed data */
457 byte*
row_compress_column(const byte * data,ulint * len,ulint lenlen,const byte * dict_data,ulint dict_data_len,row_prebuilt_t * prebuilt)458 row_compress_column(
459 	const byte*	data,	/*!< in: data in mysql(uncompressed)
460 				format */
461 	ulint		*len,	/*!< in: data length; out: length of
462 				compressed data*/
463 	ulint		lenlen,	/*!< in: bytes used to store the length of
464 				data */
465 	const byte*	dict_data,
466 				/*!< in: optional dictionary data used for
467 				compression */
468 	ulint		dict_data_len,
469 				/*!< in: optional dictionary data length */
470 	row_prebuilt_t*	prebuilt)
471 				/*!< in: use prebuilt->compress_heap only
472 				here*/
473 {
474 	int err = 0;
475 	ulint comp_len = *len;
476 	ulint buf_len = *len + zip_column_prefix_max_length;
477 	byte* buf;
478 	byte* ptr;
479 	z_stream c_stream;
480 	bool wrap = srv_compressed_columns_zlib_wrap;
481 
482 	int window_bits = wrap ? MAX_WBITS : -MAX_WBITS;
483 
484 	if (!prebuilt->compress_heap) {
485 		prebuilt->compress_heap =
486 			mem_heap_create(ut_max(UNIV_PAGE_SIZE, buf_len));
487 	}
488 
489 	buf = static_cast<byte*>(mem_heap_zalloc(
490 			prebuilt->compress_heap,buf_len));
491 
492 	if (*len < srv_compressed_columns_threshold ||
493 		srv_compressed_columns_zip_level == Z_NO_COMPRESSION)
494 		goto do_not_compress;
495 
496 	ptr = buf + zip_column_header_length + lenlen;
497 
498 	/*init deflate object*/
499 	c_stream.next_in = const_cast<Bytef*>(data);
500 	c_stream.avail_in = *len;
501 	c_stream.next_out = ptr;
502 	c_stream.avail_out = comp_len;
503 
504 	column_zip_set_alloc(&c_stream, prebuilt->compress_heap);
505 
506 	err = deflateInit2(&c_stream, srv_compressed_columns_zip_level,
507 		Z_DEFLATED, window_bits, MAX_MEM_LEVEL,
508 		srv_compressed_columns_zlib_strategy);
509 	ut_a(err == Z_OK);
510 
511 	if (dict_data != 0 && dict_data_len != 0) {
512 		err = deflateSetDictionary(&c_stream, dict_data,
513 			dict_data_len);
514 		ut_a(err == Z_OK);
515 	}
516 
517 	err = deflate(&c_stream, Z_FINISH);
518 	if (err != Z_STREAM_END) {
519 		deflateEnd(&c_stream);
520 		if (err == Z_OK)
521 			err = Z_BUF_ERROR;
522 	} else {
523 		comp_len = c_stream.total_out;
524 		err = deflateEnd(&c_stream);
525 	}
526 
527 	switch (err) {
528 	case Z_OK:
529 		break;
530 	case Z_BUF_ERROR:
531 		/* data after compress is larger than uncompressed data*/
532 		break;
533 	default:
534 		ib::error() << "failed to compress the column, error: " <<
535 			err << '\n';
536 	}
537 
538 	/* make sure the compressed data size is smaller than
539 	uncompressed data */
540 	if (err == Z_OK &&
541 		*len > (comp_len + zip_column_header_length + lenlen)) {
542 		column_set_compress_header(buf, true, lenlen - 1,
543 			default_zip_column_algorithm_value, wrap,
544 			default_zip_column_reserved_value);
545 		ptr = buf + zip_column_header_length;
546 		/*store the uncompressed data length*/
547 		switch (lenlen) {
548 		case 1:
549 			mach_write_to_1(ptr, *len);
550 			break;
551 		case 2:
552 			mach_write_to_2(ptr, *len);
553 			break;
554 		case 3:
555 			mach_write_to_3(ptr, *len);
556 			break;
557 		case 4:
558 			mach_write_to_4(ptr, *len);
559 			break;
560 		default:
561 			ut_error;
562 		}
563 
564 		*len = comp_len + zip_column_header_length + lenlen;
565 		return buf;
566 	}
567 
568 do_not_compress:
569 	ptr = buf;
570 	column_set_compress_header(ptr, false, 0,
571 		default_zip_column_algorithm_value, false,
572 		default_zip_column_reserved_value);
573 	ptr += zip_column_header_length;
574 	memcpy(ptr, data, *len);
575 	*len += zip_column_header_length;
576 	return buf;
577 }
578 
579 /** Uncompress blob/text/varchar column using zlib
580 @return pointer to the uncompressed data */
581 const byte*
row_decompress_column(const byte * data,ulint * len,const byte * dict_data,ulint dict_data_len,row_prebuilt_t * prebuilt)582 row_decompress_column(
583 	const byte*	data,	/*!< in: data in innodb(compressed) format */
584 	ulint		*len,	/*!< in: data length; out: length of
585 				decompressed data*/
586 	const byte*	dict_data,
587 				/*!< in: optional dictionary data used for
588 				decompression */
589 	ulint		dict_data_len,
590 				/*!< in: optional dictionary data length */
591 	row_prebuilt_t*	prebuilt)
592 				/*!< in: use prebuilt->compress_heap only
593 				here*/
594 {
595 	ulint buf_len = 0;
596 	byte* buf;
597 	int err = 0;
598 	int window_bits = 0;
599 	z_stream d_stream;
600 	bool is_compressed = false;
601 	bool wrap = false;
602 	bool reserved = false;
603 	ulint lenlen = 0;
604 	uint alg = 0;
605 
606 	ut_ad(*len != ULINT_UNDEFINED);
607 	ut_ad(*len >= zip_column_header_length);
608 
609 	column_get_compress_header(data, &is_compressed, &lenlen, &alg,
610 		&wrap, &reserved);
611 
612 	if (reserved != default_zip_column_reserved_value) {
613 		ib::fatal() <<
614 			"unsupported compressed BLOB header format\n";
615 	}
616 
617 	if (alg != default_zip_column_algorithm_value) {
618 		ib::fatal() <<
619 			"unsupported 'algorithm' value in the"
620 			" compressed BLOB header\n";
621 	}
622 
623 	ut_a(lenlen < 4);
624 
625 	data += zip_column_header_length;
626 	if (!is_compressed) { /* column not compressed */
627 		*len -= zip_column_header_length;
628 		return data;
629 	}
630 
631 	lenlen++;
632 
633 	ulint comp_len = *len - zip_column_header_length - lenlen;
634 
635 	ulint uncomp_len = 0;
636 	switch (lenlen) {
637 	case 1:
638 		uncomp_len = mach_read_from_1(data);
639 		break;
640 	case 2:
641 		uncomp_len = mach_read_from_2(data);
642 		break;
643 	case 3:
644 		uncomp_len = mach_read_from_3(data);
645 		break;
646 	case 4:
647 		uncomp_len = mach_read_from_4(data);
648 		break;
649 	default:
650 		ut_error;
651 	}
652 
653 	data += lenlen;
654 
655 	/* data is compressed, decompress it*/
656 	if (!prebuilt->compress_heap) {
657 		prebuilt->compress_heap =
658 			mem_heap_create(ut_max(UNIV_PAGE_SIZE, uncomp_len));
659 	}
660 
661 	buf_len = uncomp_len;
662 	buf = static_cast<byte*>(mem_heap_zalloc(
663 				 prebuilt->compress_heap, buf_len));
664 
665 	/* init d_stream */
666 	d_stream.next_in = const_cast<Bytef*>(data);
667 	d_stream.avail_in = comp_len;
668 	d_stream.next_out = buf;
669 	d_stream.avail_out = buf_len;
670 
671 	column_zip_set_alloc(&d_stream, prebuilt->compress_heap);
672 
673 	window_bits = wrap ? MAX_WBITS : -MAX_WBITS;
674 	err = inflateInit2(&d_stream, window_bits);
675 	ut_a(err == Z_OK);
676 
677 	err = inflate(&d_stream, Z_FINISH);
678 	if (err == Z_NEED_DICT) {
679 		ut_a(dict_data != NULL);
680 		ut_a(dict_data_len != 0);
681 		err = inflateSetDictionary(&d_stream, dict_data,
682 			dict_data_len);
683 		ut_a(err == Z_OK);
684 		err = inflate(&d_stream, Z_FINISH);
685 	}
686 
687 	if (err != Z_STREAM_END) {
688 		inflateEnd(&d_stream);
689 		if (err == Z_BUF_ERROR && d_stream.avail_in == 0)
690 			err = Z_DATA_ERROR;
691 	} else {
692 		buf_len = d_stream.total_out;
693 		err = inflateEnd(&d_stream);
694 	}
695 
696 	switch (err) {
697 	case Z_OK:
698 		break;
699 	case Z_BUF_ERROR:
700 		ib::fatal() <<
701 			"zlib buf error, this shouldn't happen\n";
702 		break;
703 	default:
704 		ib::fatal() <<
705 			"failed to decompress column, error: " <<
706 			err << '\n';
707 	}
708 
709 	if (err == Z_OK) {
710 		if (buf_len != uncomp_len) {
711 			ib::fatal() <<
712 				"failed to decompress blob column, may"
713 				" be corrupted\n";
714 		}
715 		*len = buf_len;
716 		return buf;
717 	}
718 
719 	*len -= (zip_column_header_length + lenlen);
720 	return data;
721 }
722 
723 
724 /*******************************************************************//**
725 Stores a reference to a BLOB in the MySQL format. */
726 void
row_mysql_store_blob_ref(byte * dest,ulint col_len,const void * data,ulint len,bool need_decompression,const byte * dict_data,ulint dict_data_len,row_prebuilt_t * prebuilt)727 row_mysql_store_blob_ref(
728 /*=====================*/
729 	byte*		dest,	/*!< in: where to store */
730 	ulint		col_len,/*!< in: dest buffer size: determines into
731 				how many bytes the BLOB length is stored,
732 				the space for the length may vary from 1
733 				to 4 bytes */
734 	const void*	data,	/*!< in: BLOB data; if the value to store
735 				is SQL NULL this should be NULL pointer */
736 	ulint		len,	/*!< in: BLOB length; if the value to store
737 				is SQL NULL this should be 0; remember
738 				also to set the NULL bit in the MySQL record
739 				header! */
740 	bool		need_decompression,
741 				/*!< in: if the data need to be compressed*/
742 	const byte*	dict_data,
743 				/*!< in: optional compression dictionary
744 				data */
745 	ulint		dict_data_len,
746 				/*!< in: optional compression dictionary data
747 				length */
748 	row_prebuilt_t*	prebuilt)
749 				/*<! in: use prebuilt->compress_heap only
750 				here */
751 {
752 	/* MySQL might assume the field is set to zero except the length and
753 	the pointer fields */
754 
755 	memset(dest, '\0', col_len);
756 
757 	/* In dest there are 1 - 4 bytes reserved for the BLOB length,
758 	and after that 8 bytes reserved for the pointer to the data.
759 	In 32-bit architectures we only use the first 4 bytes of the pointer
760 	slot. */
761 
762 	ut_a(col_len - 8 > 1 ||
763 		len < 256 +
764 		(need_decompression ? ZIP_COLUMN_HEADER_LENGTH : 0));
765 	ut_a(col_len - 8 > 2 ||
766 		len < 256 * 256 +
767 		(need_decompression ? ZIP_COLUMN_HEADER_LENGTH : 0));
768 	ut_a(col_len - 8 > 3 ||
769 		len < 256 * 256 * 256 +
770 		(need_decompression ? ZIP_COLUMN_HEADER_LENGTH : 0));
771 
772 	const byte *ptr = NULL;
773 
774 	if (need_decompression)
775 		ptr = row_decompress_column((const byte*)data, &len,
776 			dict_data, dict_data_len, prebuilt);
777 
778 	if (ptr)
779 		memcpy(dest + col_len - 8, &ptr, sizeof ptr);
780 	else
781 		memcpy(dest + col_len - 8, &data, sizeof data);
782 
783 	mach_write_to_n_little_endian(dest, col_len - 8, len);
784 }
785 
786 /*******************************************************************//**
787 Reads a reference to a BLOB in the MySQL format.
788 @return pointer to BLOB data */
789 const byte*
row_mysql_read_blob_ref(ulint * len,const byte * ref,ulint col_len,bool need_compression,const byte * dict_data,ulint dict_data_len,row_prebuilt_t * prebuilt)790 row_mysql_read_blob_ref(
791 /*====================*/
792 	ulint*		len,		/*!< out: BLOB length */
793 	const byte*	ref,		/*!< in: BLOB reference in the
794 					MySQL format */
795 	ulint		col_len,	/*!< in: BLOB reference length
796 					(not BLOB length) */
797 	bool		need_compression,
798 					/*!< in: if the data need to be
799 					compressed*/
800 	const byte*	dict_data,	/*!< in: optional compression
801 					dictionary data */
802 	ulint		dict_data_len,	/*!< in: optional compression
803 					dictionary data length */
804 	row_prebuilt_t*	prebuilt)	/*!< in: use prebuilt->compress_heap
805 					only here */
806 {
807 	byte*	data = NULL;
808 	byte*	ptr = NULL;
809 
810 	*len = mach_read_from_n_little_endian(ref, col_len - 8);
811 
812 	memcpy(&data, ref + col_len - 8, sizeof data);
813 
814 	if (need_compression) {
815 		ptr = row_compress_column(data, len, col_len - 8, dict_data,
816 			dict_data_len, prebuilt);
817 		if (ptr)
818 			data = ptr;
819 	}
820 
821 	return(data);
822 }
823 
824 /*******************************************************************//**
825 Converting InnoDB geometry data format to MySQL data format. */
826 void
row_mysql_store_geometry(byte * dest,ulint dest_len,const byte * src,ulint src_len)827 row_mysql_store_geometry(
828 /*=====================*/
829 	byte*		dest,		/*!< in/out: where to store */
830 	ulint		dest_len,	/*!< in: dest buffer size: determines
831 					into how many bytes the GEOMETRY length
832 					is stored, the space for the length
833 					may vary from 1 to 4 bytes */
834 	const byte*	src,		/*!< in: GEOMETRY data; if the value to
835 					store is SQL NULL this should be NULL
836 					pointer */
837 	ulint		src_len)	/*!< in: GEOMETRY length; if the value
838 					to store is SQL NULL this should be 0;
839 					remember also to set the NULL bit in
840 					the MySQL record header! */
841 {
842 	/* MySQL might assume the field is set to zero except the length and
843 	the pointer fields */
844 	UNIV_MEM_ASSERT_RW(src, src_len);
845 	UNIV_MEM_ASSERT_W(dest, dest_len);
846 	UNIV_MEM_INVALID(dest, dest_len);
847 
848 	memset(dest, '\0', dest_len);
849 
850 	/* In dest there are 1 - 4 bytes reserved for the BLOB length,
851 	and after that 8 bytes reserved for the pointer to the data.
852 	In 32-bit architectures we only use the first 4 bytes of the pointer
853 	slot. */
854 
855 	ut_ad(dest_len - 8 > 1 || src_len < 1<<8);
856 	ut_ad(dest_len - 8 > 2 || src_len < 1<<16);
857 	ut_ad(dest_len - 8 > 3 || src_len < 1<<24);
858 
859 	mach_write_to_n_little_endian(dest, dest_len - 8, src_len);
860 
861 	memcpy(dest + dest_len - 8, &src, sizeof src);
862 
863 	DBUG_EXECUTE_IF("row_print_geometry_data",
864 	{
865 		String  res;
866 		Geometry_buffer buffer;
867 		String  wkt;
868 
869 		/** Show the meaning of geometry data. */
870 		Geometry* g = Geometry::construct(
871 			&buffer, (const char*)src, (uint32) src_len);
872 
873 		if (g)
874 		{
875 			if (g->as_wkt(&wkt) == 0)
876 			{
877 				ib::info() << "Write geometry data to"
878 					" MySQL WKT format: "
879 					<< wkt.c_ptr_safe() << ".";
880 			}
881 		}
882 	});
883 }
884 
885 /*******************************************************************//**
886 Read geometry data in the MySQL format.
887 @return pointer to geometry data */
888 const byte*
row_mysql_read_geometry(ulint * len,const byte * ref,ulint col_len)889 row_mysql_read_geometry(
890 /*====================*/
891 	ulint*		len,		/*!< out: data length */
892 	const byte*	ref,		/*!< in: geometry data in the
893 					MySQL format */
894 	ulint		col_len)	/*!< in: MySQL format length */
895 {
896 	byte*		data;
897 
898 	*len = mach_read_from_n_little_endian(ref, col_len - 8);
899 
900 	memcpy(&data, ref + col_len - 8, sizeof data);
901 
902 	DBUG_EXECUTE_IF("row_print_geometry_data",
903 	{
904 		String  res;
905 		Geometry_buffer buffer;
906 		String  wkt;
907 
908 		/** Show the meaning of geometry data. */
909 		Geometry* g = Geometry::construct(
910 			&buffer, (const char*) data, (uint32) *len);
911 
912 		if (g)
913 		{
914 			if (g->as_wkt(&wkt) == 0)
915 			{
916 				ib::info() << "Read geometry data in"
917 					" MySQL's WKT format: "
918 					<< wkt.c_ptr_safe() << ".";
919 			}
920 		}
921 	});
922 
923 	return(data);
924 }
925 
926 /**************************************************************//**
927 Pad a column with spaces. */
928 void
row_mysql_pad_col(ulint mbminlen,byte * pad,ulint len)929 row_mysql_pad_col(
930 /*==============*/
931 	ulint	mbminlen,	/*!< in: minimum size of a character,
932 				in bytes */
933 	byte*	pad,		/*!< out: padded buffer */
934 	ulint	len)		/*!< in: number of bytes to pad */
935 {
936 	const byte*	pad_end;
937 
938 	switch (UNIV_EXPECT(mbminlen, 1)) {
939 	default:
940 		ut_error;
941 	case 1:
942 		/* space=0x20 */
943 		memset(pad, 0x20, len);
944 		break;
945 	case 2:
946 		/* space=0x0020 */
947 		pad_end = pad + len;
948 		ut_a(!(len % 2));
949 		while (pad < pad_end) {
950 			*pad++ = 0x00;
951 			*pad++ = 0x20;
952 		};
953 		break;
954 	case 4:
955 		/* space=0x00000020 */
956 		pad_end = pad + len;
957 		ut_a(!(len % 4));
958 		while (pad < pad_end) {
959 			*pad++ = 0x00;
960 			*pad++ = 0x00;
961 			*pad++ = 0x00;
962 			*pad++ = 0x20;
963 		}
964 		break;
965 	}
966 }
967 
968 /**************************************************************//**
969 Stores a non-SQL-NULL field given in the MySQL format in the InnoDB format.
970 The counterpart of this function is row_sel_field_store_in_mysql_format() in
971 row0sel.cc.
972 @return up to which byte we used buf in the conversion */
973 byte*
row_mysql_store_col_in_innobase_format(dfield_t * dfield,byte * buf,ibool row_format_col,const byte * mysql_data,ulint col_len,ulint comp,bool need_compression,const byte * dict_data,ulint dict_data_len,row_prebuilt_t * prebuilt)974 row_mysql_store_col_in_innobase_format(
975 /*===================================*/
976 	dfield_t*	dfield,		/*!< in/out: dfield where dtype
977 					information must be already set when
978 					this function is called! */
979 	byte*		buf,		/*!< in/out: buffer for a converted
980 					integer value; this must be at least
981 					col_len long then! NOTE that dfield
982 					may also get a pointer to 'buf',
983 					therefore do not discard this as long
984 					as dfield is used! */
985 	ibool		row_format_col,	/*!< TRUE if the mysql_data is from
986 					a MySQL row, FALSE if from a MySQL
987 					key value;
988 					in MySQL, a true VARCHAR storage
989 					format differs in a row and in a
990 					key value: in a key value the length
991 					is always stored in 2 bytes! */
992 	const byte*	mysql_data,	/*!< in: MySQL column value, not
993 					SQL NULL; NOTE that dfield may also
994 					get a pointer to mysql_data,
995 					therefore do not discard this as long
996 					as dfield is used! */
997 	ulint		col_len,	/*!< in: MySQL column length; NOTE that
998 					this is the storage length of the
999 					column in the MySQL format row, not
1000 					necessarily the length of the actual
1001 					payload data; if the column is a true
1002 					VARCHAR then this is irrelevant */
1003 	ulint		comp,		/*!< in: nonzero=compact format */
1004 	bool		need_compression,
1005 					/*!< in: if the data need to be
1006 					compressed*/
1007 	const byte*	dict_data,	/*!< in: optional compression
1008 					dictionary data */
1009 	ulint		dict_data_len,	/*!< in: optional compression
1010 					dictionary data length */
1011 	row_prebuilt_t*	prebuilt)	/*!< in: use prebuilt->compress_heap
1012 					only here */
1013 {
1014 	const byte*	ptr	= mysql_data;
1015 	const dtype_t*	dtype;
1016 	ulint		type;
1017 	ulint		lenlen;
1018 
1019 	dtype = dfield_get_type(dfield);
1020 
1021 	type = dtype->mtype;
1022 
1023 	if (type == DATA_INT) {
1024 		/* Store integer data in Innobase in a big-endian format,
1025 		sign bit negated if the data is a signed integer. In MySQL,
1026 		integers are stored in a little-endian format. */
1027 
1028 		byte*	p = buf + col_len;
1029 
1030 		for (;;) {
1031 			p--;
1032 			*p = *mysql_data;
1033 			if (p == buf) {
1034 				break;
1035 			}
1036 			mysql_data++;
1037 		}
1038 
1039 		if (!(dtype->prtype & DATA_UNSIGNED)) {
1040 
1041 			*buf ^= 128;
1042 		}
1043 
1044 		ptr = buf;
1045 		buf += col_len;
1046 	} else if ((type == DATA_VARCHAR
1047 		    || type == DATA_VARMYSQL
1048 		    || type == DATA_BINARY)) {
1049 
1050 		if (dtype_get_mysql_type(dtype) == DATA_MYSQL_TRUE_VARCHAR) {
1051 			/* The length of the actual data is stored to 1 or 2
1052 			bytes at the start of the field */
1053 
1054 			if (row_format_col) {
1055 				if (dtype->prtype & DATA_LONG_TRUE_VARCHAR) {
1056 					lenlen = 2;
1057 				} else {
1058 					lenlen = 1;
1059 				}
1060 			} else {
1061 				/* In a MySQL key value, lenlen is always 2 */
1062 				lenlen = 2;
1063 			}
1064 
1065 			const byte* tmp_ptr = row_mysql_read_true_varchar(
1066 				&col_len, mysql_data, lenlen);
1067 			if (need_compression)
1068 				ptr = row_compress_column(tmp_ptr, &col_len,
1069 					lenlen, dict_data, dict_data_len,
1070 					prebuilt);
1071 			else
1072 				ptr = tmp_ptr;
1073 		} else {
1074 			/* Remove trailing spaces from old style VARCHAR
1075 			columns. */
1076 
1077 			/* Handle Unicode strings differently. */
1078 			ulint	mbminlen	= dtype_get_mbminlen(dtype);
1079 
1080 			ptr = mysql_data;
1081 
1082 			switch (mbminlen) {
1083 			default:
1084 				ut_error;
1085 			case 4:
1086 				/* space=0x00000020 */
1087 				/* Trim "half-chars", just in case. */
1088 				col_len &= ~3;
1089 
1090 				while (col_len >= 4
1091 				       && ptr[col_len - 4] == 0x00
1092 				       && ptr[col_len - 3] == 0x00
1093 				       && ptr[col_len - 2] == 0x00
1094 				       && ptr[col_len - 1] == 0x20) {
1095 					col_len -= 4;
1096 				}
1097 				break;
1098 			case 2:
1099 				/* space=0x0020 */
1100 				/* Trim "half-chars", just in case. */
1101 				col_len &= ~1;
1102 
1103 				while (col_len >= 2 && ptr[col_len - 2] == 0x00
1104 				       && ptr[col_len - 1] == 0x20) {
1105 					col_len -= 2;
1106 				}
1107 				break;
1108 			case 1:
1109 				/* space=0x20 */
1110 				while (col_len > 0
1111 				       && ptr[col_len - 1] == 0x20) {
1112 					col_len--;
1113 				}
1114 			}
1115 		}
1116 	} else if (comp && type == DATA_MYSQL
1117 		   && dtype_get_mbminlen(dtype) == 1
1118 		   && dtype_get_mbmaxlen(dtype) > 1) {
1119 		/* In some cases we strip trailing spaces from UTF-8 and other
1120 		multibyte charsets, from FIXED-length CHAR columns, to save
1121 		space. UTF-8 would otherwise normally use 3 * the string length
1122 		bytes to store an ASCII string! */
1123 
1124 		/* We assume that this CHAR field is encoded in a
1125 		variable-length character set where spaces have
1126 		1:1 correspondence to 0x20 bytes, such as UTF-8.
1127 
1128 		Consider a CHAR(n) field, a field of n characters.
1129 		It will contain between n * mbminlen and n * mbmaxlen bytes.
1130 		We will try to truncate it to n bytes by stripping
1131 		space padding.	If the field contains single-byte
1132 		characters only, it will be truncated to n characters.
1133 		Consider a CHAR(5) field containing the string
1134 		".a   " where "." denotes a 3-byte character represented
1135 		by the bytes "$%&". After our stripping, the string will
1136 		be stored as "$%&a " (5 bytes). The string
1137 		".abc " will be stored as "$%&abc" (6 bytes).
1138 
1139 		The space padding will be restored in row0sel.cc, function
1140 		row_sel_field_store_in_mysql_format(). */
1141 
1142 		ulint		n_chars;
1143 
1144 		ut_a(!(dtype_get_len(dtype) % dtype_get_mbmaxlen(dtype)));
1145 
1146 		n_chars = dtype_get_len(dtype) / dtype_get_mbmaxlen(dtype);
1147 
1148 		/* Strip space padding. */
1149 		while (col_len > n_chars && ptr[col_len - 1] == 0x20) {
1150 			col_len--;
1151 		}
1152 	} else if (!row_format_col) {
1153 		/* if mysql data is from a MySQL key value
1154 		since the length is always stored in 2 bytes,
1155 		we need do nothing here. */
1156 	} else if (type == DATA_BLOB) {
1157 
1158 		ptr = row_mysql_read_blob_ref(&col_len, mysql_data, col_len,
1159 			need_compression, dict_data, dict_data_len,
1160 			prebuilt);
1161 	} else if (DATA_GEOMETRY_MTYPE(type)) {
1162 		/* We use blob to store geometry data except DATA_POINT
1163 		internally, but in MySQL Layer the datatype is always blob. */
1164 		ptr = row_mysql_read_geometry(&col_len, mysql_data, col_len);
1165 	}
1166 
1167 	dfield_set_data(dfield, ptr, col_len);
1168 
1169 	return(buf);
1170 }
1171 
1172 /**************************************************************//**
1173 Convert a row in the MySQL format to a row in the Innobase format. Note that
1174 the function to convert a MySQL format key value to an InnoDB dtuple is
1175 row_sel_convert_mysql_key_to_innobase() in row0sel.cc. */
1176 static
1177 void
row_mysql_convert_row_to_innobase(dtuple_t * row,row_prebuilt_t * prebuilt,const byte * mysql_rec,mem_heap_t ** blob_heap)1178 row_mysql_convert_row_to_innobase(
1179 /*==============================*/
1180 	dtuple_t*	row,		/*!< in/out: Innobase row where the
1181 					field type information is already
1182 					copied there! */
1183 	row_prebuilt_t*	prebuilt,	/*!< in: prebuilt struct where template
1184 					must be of type ROW_MYSQL_WHOLE_ROW */
1185 	const byte*	mysql_rec,	/*!< in: row in the MySQL format;
1186 					NOTE: do not discard as long as
1187 					row is used, as row may contain
1188 					pointers to this record! */
1189 	mem_heap_t**	blob_heap)	/*!< in: FIX_ME, remove this after
1190 					server fixes its issue */
1191 {
1192 	const mysql_row_templ_t*templ;
1193 	dfield_t*		dfield;
1194 	ulint			i;
1195 	ulint			n_col = 0;
1196 	ulint			n_v_col = 0;
1197 
1198 	ut_ad(prebuilt->template_type == ROW_MYSQL_WHOLE_ROW);
1199 	ut_ad(prebuilt->mysql_template);
1200 
1201 	for (i = 0; i < prebuilt->n_template; i++) {
1202 
1203 		templ = prebuilt->mysql_template + i;
1204 
1205 		if (templ->is_virtual) {
1206 			ut_ad(n_v_col < dtuple_get_n_v_fields(row));
1207 			dfield = dtuple_get_nth_v_field(row, n_v_col);
1208 			n_v_col++;
1209 		} else {
1210 			dfield = dtuple_get_nth_field(row, n_col);
1211 			n_col++;
1212 		}
1213 
1214 		if (templ->mysql_null_bit_mask != 0) {
1215 			/* Column may be SQL NULL */
1216 
1217 			if (mysql_rec[templ->mysql_null_byte_offset]
1218 			    & (byte) (templ->mysql_null_bit_mask)) {
1219 
1220 				/* It is SQL NULL */
1221 
1222 				dfield_set_null(dfield);
1223 
1224 				goto next_column;
1225 			}
1226 		}
1227 
1228 		row_mysql_store_col_in_innobase_format(
1229 			dfield,
1230 			prebuilt->ins_upd_rec_buff + templ->mysql_col_offset,
1231 			TRUE, /* MySQL row format data */
1232 			mysql_rec + templ->mysql_col_offset,
1233 			templ->mysql_col_len,
1234 			dict_table_is_comp(prebuilt->table),
1235 			templ->compressed,
1236 			reinterpret_cast<const byte*>(
1237 				templ->zip_dict_data.str),
1238 			templ->zip_dict_data.length, prebuilt);
1239 
1240 		/* server has issue regarding handling BLOB virtual fields,
1241 		and we need to duplicate it with our own memory here */
1242 		if (templ->is_virtual
1243 		    && DATA_LARGE_MTYPE(dfield_get_type(dfield)->mtype)) {
1244 			if (*blob_heap == NULL) {
1245 				*blob_heap = mem_heap_create(dfield->len);
1246 			}
1247 			dfield_dup(dfield, *blob_heap);
1248 		}
1249 next_column:
1250 		;
1251 	}
1252 
1253 	/* If there is a FTS doc id column and it is not user supplied (
1254 	generated by server) then assign it a new doc id. */
1255 	if (prebuilt->table->fts) {
1256 
1257 		ut_a(prebuilt->table->fts->doc_col != ULINT_UNDEFINED);
1258 
1259 		fts_create_doc_id(prebuilt->table, row, prebuilt->heap);
1260 	}
1261 }
1262 
1263 /****************************************************************//**
1264 Handles user errors and lock waits detected by the database engine.
1265 @return true if it was a lock wait and we should continue running the
1266 query thread and in that case the thr is ALREADY in the running state. */
1267 bool
row_mysql_handle_errors(dberr_t * new_err,trx_t * trx,que_thr_t * thr,trx_savept_t * savept)1268 row_mysql_handle_errors(
1269 /*====================*/
1270 	dberr_t*	new_err,/*!< out: possible new error encountered in
1271 				lock wait, or if no new error, the value
1272 				of trx->error_state at the entry of this
1273 				function */
1274 	trx_t*		trx,	/*!< in: transaction */
1275 	que_thr_t*	thr,	/*!< in: query thread, or NULL */
1276 	trx_savept_t*	savept)	/*!< in: savepoint, or NULL */
1277 {
1278 	dberr_t	err;
1279 
1280 handle_new_error:
1281 	err = trx->error_state;
1282 
1283 	ut_a(err != DB_SUCCESS);
1284 
1285 	trx->error_state = DB_SUCCESS;
1286 
1287 	switch (err) {
1288 	case DB_LOCK_WAIT_TIMEOUT:
1289 		if (row_rollback_on_timeout) {
1290 			trx_rollback_to_savepoint(trx, NULL);
1291 			break;
1292 		}
1293 		/* fall through */
1294 	case DB_DUPLICATE_KEY:
1295 	case DB_FOREIGN_DUPLICATE_KEY:
1296 	case DB_TOO_BIG_RECORD:
1297 	case DB_UNDO_RECORD_TOO_BIG:
1298 	case DB_ROW_IS_REFERENCED:
1299 	case DB_NO_REFERENCED_ROW:
1300 	case DB_CANNOT_ADD_CONSTRAINT:
1301 	case DB_TOO_MANY_CONCURRENT_TRXS:
1302 	case DB_OUT_OF_FILE_SPACE:
1303 	case DB_READ_ONLY:
1304 	case DB_FTS_INVALID_DOCID:
1305 	case DB_INTERRUPTED:
1306 	case DB_CANT_CREATE_GEOMETRY_OBJECT:
1307 	case DB_DECRYPTION_FAILED:
1308 	case DB_COMPUTE_VALUE_FAILED:
1309 		DBUG_EXECUTE_IF("row_mysql_crash_if_error", {
1310 					log_buffer_flush_to_disk();
1311 					DBUG_SUICIDE(); });
1312 		if (savept) {
1313 			/* Roll back the latest, possibly incomplete insertion
1314 			or update */
1315 
1316 			trx_rollback_to_savepoint(trx, savept);
1317 		}
1318 		/* MySQL will roll back the latest SQL statement */
1319 		break;
1320 	case DB_LOCK_WAIT:
1321 
1322 		trx_kill_blocking(trx);
1323 
1324 		lock_wait_suspend_thread(thr);
1325 
1326 		if (trx->error_state != DB_SUCCESS) {
1327 			que_thr_stop_for_mysql(thr);
1328 
1329 			goto handle_new_error;
1330 		}
1331 
1332 		*new_err = err;
1333 
1334 		return(true);
1335 
1336 	case DB_DEADLOCK:
1337 	case DB_LOCK_TABLE_FULL:
1338 		/* Roll back the whole transaction; this resolution was added
1339 		to version 3.23.43 */
1340 
1341 		trx_rollback_to_savepoint(trx, NULL);
1342 		break;
1343 
1344 	case DB_MUST_GET_MORE_FILE_SPACE:
1345 		ib::fatal() << "The database cannot continue operation because"
1346 			" of lack of space. You must add a new data file"
1347 			" to my.cnf and restart the database.";
1348 		break;
1349 
1350 	case DB_CORRUPTION:
1351 		ib::error() << "We detected index corruption in an InnoDB type"
1352 			" table. You have to dump + drop + reimport the"
1353 			" table or, in a case of widespread corruption,"
1354 			" dump all InnoDB tables and recreate the whole"
1355 			" tablespace. If the mysqld server crashes after"
1356 			" the startup or when you dump the tables. "
1357 			<< FORCE_RECOVERY_MSG;
1358 		break;
1359 	case DB_FOREIGN_EXCEED_MAX_CASCADE:
1360 		ib::error() << "Cannot delete/update rows with cascading"
1361 			" foreign key constraints that exceed max depth of "
1362 			<< FK_MAX_CASCADE_DEL << ". Please drop excessive"
1363 			" foreign constraints and try again";
1364 		break;
1365 	default:
1366 		ib::fatal() << "Unknown error code " << err << ": "
1367 			<< ut_strerr(err);
1368 	}
1369 
1370 	if (trx->error_state != DB_SUCCESS) {
1371 		*new_err = trx->error_state;
1372 	} else {
1373 		*new_err = err;
1374 	}
1375 
1376 	trx->error_state = DB_SUCCESS;
1377 
1378 	return(false);
1379 }
1380 
1381 /********************************************************************//**
1382 Create a prebuilt struct for a MySQL table handle.
1383 @return own: a prebuilt struct */
1384 row_prebuilt_t*
row_create_prebuilt(dict_table_t * table,ulint mysql_row_len)1385 row_create_prebuilt(
1386 /*================*/
1387 	dict_table_t*	table,		/*!< in: Innobase table handle */
1388 	ulint		mysql_row_len)	/*!< in: length in bytes of a row in
1389 					the MySQL format */
1390 {
1391 	DBUG_ENTER("row_create_prebuilt");
1392 
1393 	row_prebuilt_t*	prebuilt;
1394 	mem_heap_t*	heap;
1395 	dict_index_t*	clust_index;
1396 	dict_index_t*	temp_index;
1397 	dtuple_t*	ref;
1398 	ulint		ref_len;
1399 	uint		srch_key_len = 0;
1400 	ulint		search_tuple_n_fields;
1401 
1402 	search_tuple_n_fields = 2 * (dict_table_get_n_cols(table)
1403 				     + dict_table_get_n_v_cols(table));
1404 
1405 	clust_index = dict_table_get_first_index(table);
1406 
1407 	/* Make sure that search_tuple is long enough for clustered index */
1408 	ut_a(2 * dict_table_get_n_cols(table) >= clust_index->n_fields);
1409 
1410 	ref_len = dict_index_get_n_unique(clust_index);
1411 
1412 
1413         /* Maximum size of the buffer needed for conversion of INTs from
1414 	little endian format to big endian format in an index. An index
1415 	can have maximum 16 columns (MAX_REF_PARTS) in it. Therfore
1416 	Max size for PK: 16 * 8 bytes (BIGINT's size) = 128 bytes
1417 	Max size Secondary index: 16 * 8 bytes + PK = 256 bytes. */
1418 #define MAX_SRCH_KEY_VAL_BUFFER         2* (8 * MAX_REF_PARTS)
1419 
1420 #define PREBUILT_HEAP_INITIAL_SIZE	\
1421 	( \
1422 	sizeof(*prebuilt) \
1423 	/* allocd in this function */ \
1424 	+ DTUPLE_EST_ALLOC(search_tuple_n_fields) \
1425 	+ DTUPLE_EST_ALLOC(ref_len) \
1426 	/* allocd in row_prebuild_sel_graph() */ \
1427 	+ sizeof(sel_node_t) \
1428 	+ sizeof(que_fork_t) \
1429 	+ sizeof(que_thr_t) \
1430 	/* allocd in row_get_prebuilt_update_vector() */ \
1431 	+ sizeof(upd_node_t) \
1432 	+ sizeof(upd_t) \
1433 	+ sizeof(upd_field_t) \
1434 	  * dict_table_get_n_cols(table) \
1435 	+ sizeof(que_fork_t) \
1436 	+ sizeof(que_thr_t) \
1437 	/* allocd in row_get_prebuilt_insert_row() */ \
1438 	+ sizeof(ins_node_t) \
1439 	/* mysql_row_len could be huge and we are not \
1440 	sure if this prebuilt instance is going to be \
1441 	used in inserts */ \
1442 	+ (mysql_row_len < 256 ? mysql_row_len : 0) \
1443 	+ DTUPLE_EST_ALLOC(dict_table_get_n_cols(table) \
1444 			   + dict_table_get_n_v_cols(table)) \
1445 	+ sizeof(que_fork_t) \
1446 	+ sizeof(que_thr_t) \
1447 	+ sizeof(*prebuilt->pcur) \
1448 	+ sizeof(*prebuilt->clust_pcur) \
1449 	)
1450 
1451 	/* Calculate size of key buffer used to store search key in
1452 	InnoDB format. MySQL stores INTs in little endian format and
1453 	InnoDB stores INTs in big endian format with the sign bit
1454 	flipped. All other field types are stored/compared the same
1455 	in MySQL and InnoDB, so we must create a buffer containing
1456 	the INT key parts in InnoDB format.We need two such buffers
1457 	since both start and end keys are used in records_in_range(). */
1458 
1459 	for (temp_index = dict_table_get_first_index(table); temp_index;
1460 	     temp_index = dict_table_get_next_index(temp_index)) {
1461 		DBUG_EXECUTE_IF("innodb_srch_key_buffer_max_value",
1462 			ut_a(temp_index->n_user_defined_cols
1463 						== MAX_REF_PARTS););
1464 		uint temp_len = 0;
1465 		for (uint i = 0; i < temp_index->n_uniq; i++) {
1466 			ulint type = temp_index->fields[i].col->mtype;
1467 			if (type == DATA_INT) {
1468 				temp_len +=
1469 					temp_index->fields[i].fixed_len;
1470 			}
1471 		}
1472 		srch_key_len = std::max(srch_key_len,temp_len);
1473 	}
1474 
1475 	ut_a(srch_key_len <= MAX_SRCH_KEY_VAL_BUFFER);
1476 
1477 	DBUG_EXECUTE_IF("innodb_srch_key_buffer_max_value",
1478 		ut_a(srch_key_len == MAX_SRCH_KEY_VAL_BUFFER););
1479 
1480 	/* We allocate enough space for the objects that are likely to
1481 	be created later in order to minimize the number of malloc()
1482 	calls */
1483 	heap = mem_heap_create(PREBUILT_HEAP_INITIAL_SIZE + 2 * srch_key_len);
1484 
1485 	prebuilt = static_cast<row_prebuilt_t*>(
1486 		mem_heap_zalloc(heap, sizeof(*prebuilt)));
1487 
1488 	prebuilt->magic_n = ROW_PREBUILT_ALLOCATED;
1489 	prebuilt->magic_n2 = ROW_PREBUILT_ALLOCATED;
1490 
1491 	prebuilt->table = table;
1492 
1493 	prebuilt->sql_stat_start = TRUE;
1494 	prebuilt->heap = heap;
1495 
1496 	prebuilt->srch_key_val_len = srch_key_len;
1497 	if (prebuilt->srch_key_val_len) {
1498 		prebuilt->srch_key_val1 = static_cast<byte*>(
1499 			mem_heap_alloc(prebuilt->heap,
1500 				       2 * prebuilt->srch_key_val_len));
1501 		prebuilt->srch_key_val2 = prebuilt->srch_key_val1 +
1502 						prebuilt->srch_key_val_len;
1503 	} else {
1504 		prebuilt->srch_key_val1 = NULL;
1505 		prebuilt->srch_key_val2 = NULL;
1506 	}
1507 
1508 	prebuilt->pcur = static_cast<btr_pcur_t*>(
1509 				mem_heap_zalloc(prebuilt->heap,
1510 					       sizeof(btr_pcur_t)));
1511 	prebuilt->clust_pcur = static_cast<btr_pcur_t*>(
1512 					mem_heap_zalloc(prebuilt->heap,
1513 						       sizeof(btr_pcur_t)));
1514 	btr_pcur_reset(prebuilt->pcur);
1515 	btr_pcur_reset(prebuilt->clust_pcur);
1516 
1517 	prebuilt->select_lock_type = LOCK_NONE;
1518 	prebuilt->stored_select_lock_type = LOCK_NONE_UNSET;
1519 
1520 	prebuilt->search_tuple = dtuple_create(heap, search_tuple_n_fields);
1521 
1522 	ref = dtuple_create(heap, ref_len);
1523 
1524 	dict_index_copy_types(ref, clust_index, ref_len);
1525 
1526 	prebuilt->clust_ref = ref;
1527 
1528 	prebuilt->autoinc_error = DB_SUCCESS;
1529 	prebuilt->autoinc_offset = 0;
1530 
1531 	/* Default to 1, we will set the actual value later in
1532 	ha_innobase::get_auto_increment(). */
1533 	prebuilt->autoinc_increment = 1;
1534 
1535 	prebuilt->autoinc_last_value = 0;
1536 
1537 	/* During UPDATE and DELETE we need the doc id. */
1538 	prebuilt->fts_doc_id = 0;
1539 
1540 	prebuilt->mysql_row_len = mysql_row_len;
1541 
1542 	prebuilt->ins_sel_stmt = false;
1543 	prebuilt->session = NULL;
1544 
1545 	prebuilt->fts_doc_id_in_read_set = 0;
1546 	prebuilt->blob_heap = NULL;
1547 
1548 	prebuilt->m_no_prefetch = false;
1549 	prebuilt->m_read_virtual_key = false;
1550 
1551 	DBUG_RETURN(prebuilt);
1552 }
1553 
1554 /********************************************************************//**
1555 Free a prebuilt struct for a MySQL table handle. */
1556 void
row_prebuilt_free(row_prebuilt_t * prebuilt,ibool dict_locked)1557 row_prebuilt_free(
1558 /*==============*/
1559 	row_prebuilt_t*	prebuilt,	/*!< in, own: prebuilt struct */
1560 	ibool		dict_locked)	/*!< in: TRUE=data dictionary locked */
1561 {
1562 	DBUG_ENTER("row_prebuilt_free");
1563 
1564 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
1565 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
1566 
1567 	prebuilt->magic_n = ROW_PREBUILT_FREED;
1568 	prebuilt->magic_n2 = ROW_PREBUILT_FREED;
1569 
1570 	btr_pcur_reset(prebuilt->pcur);
1571 	btr_pcur_reset(prebuilt->clust_pcur);
1572 
1573 	ut_free(prebuilt->mysql_template);
1574 
1575 	if (prebuilt->ins_graph) {
1576 		que_graph_free_recursive(prebuilt->ins_graph);
1577 	}
1578 
1579 	if (prebuilt->sel_graph) {
1580 		que_graph_free_recursive(prebuilt->sel_graph);
1581 	}
1582 
1583 	if (prebuilt->upd_graph) {
1584 		que_graph_free_recursive(prebuilt->upd_graph);
1585 	}
1586 
1587 	if (prebuilt->blob_heap) {
1588 		row_mysql_prebuilt_free_blob_heap(prebuilt);
1589 	}
1590 
1591 	if (prebuilt->compress_heap) {
1592 		mem_heap_free(prebuilt->compress_heap);
1593 	}
1594 
1595 	if (prebuilt->old_vers_heap) {
1596 		mem_heap_free(prebuilt->old_vers_heap);
1597 	}
1598 
1599 	if (prebuilt->fetch_cache[0] != NULL) {
1600 		byte*	base = prebuilt->fetch_cache[0] - 4;
1601 		byte*	ptr = base;
1602 
1603 		for (ulint i = 0; i < MYSQL_FETCH_CACHE_SIZE; i++) {
1604 			ulint	magic1 = mach_read_from_4(ptr);
1605 			ut_a(magic1 == ROW_PREBUILT_FETCH_MAGIC_N);
1606 			ptr += 4;
1607 
1608 			byte*	row = ptr;
1609 			ut_a(row == prebuilt->fetch_cache[i]);
1610 			ptr += prebuilt->mysql_row_len;
1611 
1612 			ulint	magic2 = mach_read_from_4(ptr);
1613 			ut_a(magic2 == ROW_PREBUILT_FETCH_MAGIC_N);
1614 			ptr += 4;
1615 		}
1616 
1617 		ut_free(base);
1618 	}
1619 
1620 	if (prebuilt->rtr_info) {
1621 		rtr_clean_rtr_info(prebuilt->rtr_info, true);
1622 	}
1623 	if (prebuilt->table) {
1624 		dict_table_close(prebuilt->table, dict_locked, TRUE);
1625 	}
1626 
1627 	mem_heap_free(prebuilt->heap);
1628 
1629 	DBUG_VOID_RETURN;
1630 }
1631 
1632 /*********************************************************************//**
1633 Updates the transaction pointers in query graphs stored in the prebuilt
1634 struct. */
1635 void
row_update_prebuilt_trx(row_prebuilt_t * prebuilt,trx_t * trx)1636 row_update_prebuilt_trx(
1637 /*====================*/
1638 	row_prebuilt_t*	prebuilt,	/*!< in/out: prebuilt struct
1639 					in MySQL handle */
1640 	trx_t*		trx)		/*!< in: transaction handle */
1641 {
1642 	ut_a(trx->magic_n == TRX_MAGIC_N);
1643 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
1644 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
1645 
1646 	prebuilt->trx = trx;
1647 
1648 	if (prebuilt->ins_graph) {
1649 		prebuilt->ins_graph->trx = trx;
1650 	}
1651 
1652 	if (prebuilt->upd_graph) {
1653 		prebuilt->upd_graph->trx = trx;
1654 	}
1655 
1656 	if (prebuilt->sel_graph) {
1657 		prebuilt->sel_graph->trx = trx;
1658 	}
1659 }
1660 
1661 /*********************************************************************//**
1662 Gets pointer to a prebuilt dtuple used in insertions. If the insert graph
1663 has not yet been built in the prebuilt struct, then this function first
1664 builds it.
1665 @return prebuilt dtuple; the column type information is also set in it */
1666 static
1667 dtuple_t*
row_get_prebuilt_insert_row(row_prebuilt_t * prebuilt)1668 row_get_prebuilt_insert_row(
1669 /*========================*/
1670 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL
1671 					handle */
1672 {
1673 	dict_table_t*		table	= prebuilt->table;
1674 
1675 	ut_ad(prebuilt && table && prebuilt->trx);
1676 
1677 	if (prebuilt->ins_node != 0) {
1678 
1679 		/* Check if indexes have been dropped or added and we
1680 		may need to rebuild the row insert template. */
1681 
1682 		if (prebuilt->trx_id == table->def_trx_id
1683 		    && UT_LIST_GET_LEN(prebuilt->ins_node->entry_list)
1684 		    == UT_LIST_GET_LEN(table->indexes)) {
1685 
1686 			return(prebuilt->ins_node->row);
1687 		}
1688 
1689 		ut_ad(prebuilt->trx_id < table->def_trx_id);
1690 
1691 		que_graph_free_recursive(prebuilt->ins_graph);
1692 
1693 		prebuilt->ins_graph = 0;
1694 	}
1695 
1696 	/* Create an insert node and query graph to the prebuilt struct */
1697 
1698 	ins_node_t*		node;
1699 
1700 	node = ins_node_create(INS_DIRECT, table, prebuilt->heap);
1701 
1702 	prebuilt->ins_node = node;
1703 
1704 	if (prebuilt->ins_upd_rec_buff == 0) {
1705 		prebuilt->ins_upd_rec_buff = static_cast<byte*>(
1706 			mem_heap_alloc(
1707 				prebuilt->heap,
1708 				prebuilt->mysql_row_len));
1709 	}
1710 
1711 	dtuple_t*	row;
1712 
1713 	row = dtuple_create_with_vcol(
1714 			prebuilt->heap, dict_table_get_n_cols(table),
1715 			dict_table_get_n_v_cols(table));
1716 
1717 	dict_table_copy_types(row, table);
1718 
1719 	ins_node_set_new_row(node, row);
1720 
1721 	prebuilt->ins_graph = static_cast<que_fork_t*>(
1722 		que_node_get_parent(
1723 			pars_complete_graph_for_exec(
1724 				node,
1725 				prebuilt->trx, prebuilt->heap, prebuilt)));
1726 
1727 	prebuilt->ins_graph->state = QUE_FORK_ACTIVE;
1728 
1729 	prebuilt->trx_id = table->def_trx_id;
1730 
1731 	return(prebuilt->ins_node->row);
1732 }
1733 
1734 /*********************************************************************//**
1735 Updates the table modification counter and calculates new estimates
1736 for table and index statistics if necessary. */
1737 UNIV_INLINE
1738 void
row_update_statistics_if_needed(dict_table_t * table)1739 row_update_statistics_if_needed(
1740 /*============================*/
1741 	dict_table_t*	table)	/*!< in: table */
1742 {
1743 	ib_uint64_t	counter;
1744 	ib_uint64_t	n_rows;
1745 
1746 	if (!table->stat_initialized) {
1747 		DBUG_EXECUTE_IF(
1748 			"test_upd_stats_if_needed_not_inited",
1749 			fprintf(stderr, "test_upd_stats_if_needed_not_inited"
1750 				" was executed\n");
1751 		);
1752 		return;
1753 	}
1754 
1755 	counter = table->stat_modified_counter++;
1756 	n_rows = dict_table_get_n_rows(table);
1757 
1758 	if (dict_stats_is_persistent_enabled(table)) {
1759 		if (counter > n_rows / 10 /* 10% */
1760 		    && dict_stats_auto_recalc_is_enabled(table)) {
1761 
1762 			dict_stats_recalc_pool_add(table);
1763 			table->stat_modified_counter = 0;
1764 		}
1765 		return;
1766 	}
1767 
1768 	/* Calculate new statistics if 1 / 16 of table has been modified
1769 	since the last time a statistics batch was run.
1770 	We calculate statistics at most every 16th round, since we may have
1771 	a counter table which is very small and updated very often. */
1772 
1773 	if (counter > 16 + n_rows / 16 /* 6.25% */) {
1774 
1775 		ut_ad(!mutex_own(&dict_sys->mutex));
1776 		/* this will reset table->stat_modified_counter to 0 */
1777 		dict_stats_update(table, DICT_STATS_RECALC_TRANSIENT);
1778 	}
1779 }
1780 
1781 /*********************************************************************//**
1782 Sets an AUTO_INC type lock on the table mentioned in prebuilt. The
1783 AUTO_INC lock gives exclusive access to the auto-inc counter of the
1784 table. The lock is reserved only for the duration of an SQL statement.
1785 It is not compatible with another AUTO_INC or exclusive lock on the
1786 table.
1787 @return error code or DB_SUCCESS */
1788 dberr_t
row_lock_table_autoinc_for_mysql(row_prebuilt_t * prebuilt)1789 row_lock_table_autoinc_for_mysql(
1790 /*=============================*/
1791 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in the MySQL
1792 					table handle */
1793 {
1794 	trx_t*			trx	= prebuilt->trx;
1795 	ins_node_t*		node	= prebuilt->ins_node;
1796 	const dict_table_t*	table	= prebuilt->table;
1797 	que_thr_t*		thr;
1798 	dberr_t			err;
1799 	ibool			was_lock_wait;
1800 
1801 	/* If we already hold an AUTOINC lock on the table then do nothing.
1802 	Note: We peek at the value of the current owner without acquiring
1803 	the lock mutex. */
1804 	if (trx == table->autoinc_trx) {
1805 
1806 		return(DB_SUCCESS);
1807 	}
1808 
1809 	trx->op_info = "setting auto-inc lock";
1810 
1811 	row_get_prebuilt_insert_row(prebuilt);
1812 	node = prebuilt->ins_node;
1813 
1814 	/* We use the insert query graph as the dummy graph needed
1815 	in the lock module call */
1816 
1817 	thr = que_fork_get_first_thr(prebuilt->ins_graph);
1818 
1819 	que_thr_move_to_run_state_for_mysql(thr, trx);
1820 
1821 run_again:
1822 	thr->run_node = node;
1823 	thr->prev_node = node;
1824 
1825 	/* It may be that the current session has not yet started
1826 	its transaction, or it has been committed: */
1827 
1828 	trx_start_if_not_started_xa(trx, true);
1829 
1830 	err = lock_table(0, prebuilt->table, LOCK_AUTO_INC, thr);
1831 
1832 	trx->error_state = err;
1833 
1834 	if (err != DB_SUCCESS) {
1835 		que_thr_stop_for_mysql(thr);
1836 
1837 		was_lock_wait = row_mysql_handle_errors(&err, trx, thr, NULL);
1838 
1839 		if (was_lock_wait) {
1840 			goto run_again;
1841 		}
1842 
1843 		trx->op_info = "";
1844 
1845 		return(err);
1846 	}
1847 
1848 	que_thr_stop_for_mysql_no_error(thr, trx);
1849 
1850 	trx->op_info = "";
1851 
1852 	return(err);
1853 }
1854 
1855 /*********************************************************************//**
1856 Sets a table lock on the table mentioned in prebuilt.
1857 @return error code or DB_SUCCESS */
1858 dberr_t
row_lock_table_for_mysql(row_prebuilt_t * prebuilt,dict_table_t * table,ulint mode)1859 row_lock_table_for_mysql(
1860 /*=====================*/
1861 	row_prebuilt_t*	prebuilt,	/*!< in: prebuilt struct in the MySQL
1862 					table handle */
1863 	dict_table_t*	table,		/*!< in: table to lock, or NULL
1864 					if prebuilt->table should be
1865 					locked as
1866 					prebuilt->select_lock_type */
1867 	ulint		mode)		/*!< in: lock mode of table
1868 					(ignored if table==NULL) */
1869 {
1870 	trx_t*		trx		= prebuilt->trx;
1871 	que_thr_t*	thr;
1872 	dberr_t		err;
1873 	ibool		was_lock_wait;
1874 
1875 	trx->op_info = "setting table lock";
1876 
1877 	if (prebuilt->sel_graph == NULL) {
1878 		/* Build a dummy select query graph */
1879 		row_prebuild_sel_graph(prebuilt);
1880 	}
1881 
1882 	/* We use the select query graph as the dummy graph needed
1883 	in the lock module call */
1884 
1885 	thr = que_fork_get_first_thr(prebuilt->sel_graph);
1886 
1887 	que_thr_move_to_run_state_for_mysql(thr, trx);
1888 
1889 run_again:
1890 	thr->run_node = thr;
1891 	thr->prev_node = thr->common.parent;
1892 
1893 	/* It may be that the current session has not yet started
1894 	its transaction, or it has been committed: */
1895 
1896 	trx_start_if_not_started_xa(trx, false);
1897 
1898 	if (table) {
1899 		err = lock_table(
1900 			0, table,
1901 			static_cast<enum lock_mode>(mode), thr);
1902 	} else {
1903 		err = lock_table(
1904 			0, prebuilt->table,
1905 			static_cast<enum lock_mode>(
1906 				prebuilt->select_lock_type),
1907 			thr);
1908 	}
1909 
1910 	trx->error_state = err;
1911 
1912 	if (err != DB_SUCCESS) {
1913 		que_thr_stop_for_mysql(thr);
1914 
1915 		was_lock_wait = row_mysql_handle_errors(&err, trx, thr, NULL);
1916 
1917 		if (was_lock_wait) {
1918 			goto run_again;
1919 		}
1920 
1921 		trx->op_info = "";
1922 
1923 		return(err);
1924 	}
1925 
1926 	que_thr_stop_for_mysql_no_error(thr, trx);
1927 
1928 	trx->op_info = "";
1929 
1930 	return(err);
1931 }
1932 
1933 /** Perform explicit rollback in absence of UNDO logs.
1934 @param[in]	index	apply rollback action on this index
1935 @param[in]	entry	entry to remove/rollback.
1936 @param[in,out]	thr	thread handler.
1937 @param[in,out]	mtr	mini transaction.
1938 @return error code or DB_SUCCESS */
1939 static
1940 dberr_t
row_explicit_rollback(dict_index_t * index,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)1941 row_explicit_rollback(
1942 	dict_index_t*		index,
1943 	const dtuple_t*		entry,
1944 	que_thr_t*		thr,
1945 	mtr_t*			mtr)
1946 {
1947 	btr_cur_t	cursor;
1948 	ulint		flags;
1949 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1950 	ulint*		offsets;
1951 	mem_heap_t*	heap = NULL;
1952 	dberr_t		err;
1953 
1954 	rec_offs_init(offsets_);
1955 	flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
1956 
1957 	btr_cur_search_to_nth_level_with_no_latch(
1958 		index, 0, entry, PAGE_CUR_LE,
1959 		&cursor, __FILE__, __LINE__, mtr);
1960 
1961 	offsets = rec_get_offsets(
1962 		btr_cur_get_rec(&cursor), index, offsets_,
1963 		ULINT_UNDEFINED, &heap);
1964 
1965 	if (dict_index_is_clust(index)) {
1966 		err = btr_cur_del_mark_set_clust_rec(
1967 			flags, btr_cur_get_block(&cursor),
1968 			btr_cur_get_rec(&cursor), index,
1969 			offsets, thr, entry, mtr);
1970 	} else {
1971 		err = btr_cur_del_mark_set_sec_rec(
1972 			flags, &cursor, TRUE, thr, mtr);
1973 	}
1974 	ut_ad(err == DB_SUCCESS);
1975 
1976 	/* Void call just to set mtr modification flag
1977 	to true failing which block is not scheduled for flush*/
1978 	byte* log_ptr = mlog_open(mtr, 0);
1979 	ut_ad(log_ptr == NULL);
1980 	if (log_ptr != NULL) {
1981 		/* To keep complier happy. */
1982 		mlog_close(mtr, log_ptr);
1983 	}
1984 
1985 	if (heap != NULL) {
1986 		mem_heap_free(heap);
1987 	}
1988 
1989 	return(err);
1990 }
1991 
1992 /** Convert a row in the MySQL format to a row in the Innobase format.
1993 This is specialized function used for intrinsic table with reduce branching.
1994 @param[in,out]	row		row where field values are copied.
1995 @param[in]	prebuilt	prebuilt handler
1996 @param[in]	mysql_rec	row in mysql format. */
1997 static
1998 void
row_mysql_to_innobase(dtuple_t * row,row_prebuilt_t * prebuilt,const byte * mysql_rec)1999 row_mysql_to_innobase(
2000 	dtuple_t*		row,
2001 	row_prebuilt_t*		prebuilt,
2002 	const byte*		mysql_rec)
2003 {
2004 	ut_ad(dict_table_is_intrinsic(prebuilt->table));
2005 
2006 	const byte*		ptr = mysql_rec;
2007 
2008 	for (ulint i = 0; i < prebuilt->n_template; i++) {
2009 		const mysql_row_templ_t*	templ;
2010 		dfield_t*			dfield;
2011 
2012 		templ = prebuilt->mysql_template + i;
2013 		dfield = dtuple_get_nth_field(row, i);
2014 
2015 		/* Check if column has null value. */
2016 		if (templ->mysql_null_bit_mask != 0) {
2017 			if (mysql_rec[templ->mysql_null_byte_offset]
2018 			    & (byte) (templ->mysql_null_bit_mask)) {
2019 				dfield_set_null(dfield);
2020 				continue;
2021 			}
2022 		}
2023 
2024 		/* Extract the column value. */
2025 		ptr = mysql_rec + templ->mysql_col_offset;
2026 		const dtype_t*	dtype = dfield_get_type(dfield);
2027 		ulint		col_len = templ->mysql_col_len;
2028 
2029 		ut_ad(dtype->mtype == DATA_INT
2030 		      || dtype->mtype == DATA_CHAR
2031 		      || dtype->mtype == DATA_MYSQL
2032 		      || dtype->mtype == DATA_VARCHAR
2033 		      || dtype->mtype == DATA_VARMYSQL
2034 		      || dtype->mtype == DATA_BINARY
2035 		      || dtype->mtype == DATA_FIXBINARY
2036 		      || dtype->mtype == DATA_FLOAT
2037 		      || dtype->mtype == DATA_DOUBLE
2038 		      || dtype->mtype == DATA_DECIMAL
2039 		      || dtype->mtype == DATA_BLOB
2040 		      || dtype->mtype == DATA_GEOMETRY
2041 		      || dtype->mtype == DATA_POINT
2042 		      || dtype->mtype == DATA_VAR_POINT);
2043 
2044 #ifdef UNIV_DEBUG
2045 		if (dtype_get_mysql_type(dtype) == DATA_MYSQL_TRUE_VARCHAR) {
2046 			ut_ad(templ->mysql_length_bytes > 0);
2047 		}
2048 #endif /* UNIV_DEBUG */
2049 
2050 		/* For now varchar field this has to be always 0 so
2051 		memcpy of 0 bytes shouldn't affect the original col_len. */
2052 		if (dtype->mtype == DATA_INT) {
2053 			/* Convert and Store in big-endian. */
2054 			byte*	buf = prebuilt->ins_upd_rec_buff
2055 				+ templ->mysql_col_offset;
2056 			byte*	copy_to = buf + col_len;
2057 			for (;;) {
2058 				copy_to--;
2059 				*copy_to = *ptr;
2060 				if (copy_to == buf) {
2061 					break;
2062 				}
2063 				ptr++;
2064 			}
2065 
2066 			if (!(dtype->prtype & DATA_UNSIGNED)) {
2067 				*buf ^= 128;
2068 			}
2069 
2070 			ptr = buf;
2071 			buf += col_len;
2072 		} else if (dtype_get_mysql_type(dtype) ==
2073 				DATA_MYSQL_TRUE_VARCHAR) {
2074 
2075 			ut_ad(dtype->mtype == DATA_VARCHAR
2076 			      || dtype->mtype == DATA_VARMYSQL
2077 			      || dtype->mtype == DATA_BINARY);
2078 
2079 			col_len = 0;
2080 			row_mysql_read_true_varchar(
2081 				&col_len, ptr, templ->mysql_length_bytes);
2082 			ptr += templ->mysql_length_bytes;
2083 		} else if (dtype->mtype == DATA_BLOB) {
2084 			ptr = row_mysql_read_blob_ref(&col_len, ptr, col_len,
2085 				false, 0, 0, 0);
2086 		} else if (DATA_GEOMETRY_MTYPE(dtype->mtype)) {
2087 			/* Point, Var-Point, Geometry */
2088 			ptr = row_mysql_read_geometry(&col_len, ptr, col_len);
2089 		}
2090 
2091 		dfield_set_data(dfield, ptr, col_len);
2092 	}
2093 }
2094 
2095 /** Does an insert for MySQL using cursor interface.
2096 Cursor interface is low level interface that directly interacts at
2097 Storage Level by-passing all the locking and transaction semantics.
2098 For InnoDB case, this will also by-pass hidden column generation.
2099 @param[in]	mysql_rec	row in the MySQL format
2100 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
2101 @return error code or DB_SUCCESS */
2102 static
2103 dberr_t
row_insert_for_mysql_using_cursor(const byte * mysql_rec,row_prebuilt_t * prebuilt)2104 row_insert_for_mysql_using_cursor(
2105 	const byte*		mysql_rec,
2106 	row_prebuilt_t*		prebuilt)
2107 {
2108 	dberr_t		err	= DB_SUCCESS;
2109 	ins_node_t*	node	= NULL;
2110 	que_thr_t*	thr	= NULL;
2111 	mtr_t		mtr;
2112 
2113 	/* Step-1: Get the reference of row to insert. */
2114 	row_get_prebuilt_insert_row(prebuilt);
2115 	node = prebuilt->ins_node;
2116 	thr = que_fork_get_first_thr(prebuilt->ins_graph);
2117 
2118 	/* Step-2: Convert row from MySQL row format to InnoDB row format. */
2119 	row_mysql_to_innobase(node->row, prebuilt, mysql_rec);
2120 
2121 	/* Step-3: Append row-id index is not unique. */
2122 	dict_index_t*	clust_index = dict_table_get_first_index(node->table);
2123 
2124 	if (!dict_index_is_unique(clust_index)) {
2125 		dict_sys_write_row_id(
2126 			node->row_id_buf,
2127 			dict_table_get_next_table_sess_row_id(node->table));
2128 	}
2129 
2130 	trx_write_trx_id(node->trx_id_buf,
2131 			 dict_table_get_next_table_sess_trx_id(node->table));
2132 
2133 	/* Step-4: Iterate over all the indexes and insert entries. */
2134 	dict_index_t*	inserted_upto = NULL;
2135 	node->entry = UT_LIST_GET_FIRST(node->entry_list);
2136 	for (dict_index_t* index = UT_LIST_GET_FIRST(node->table->indexes);
2137 	     index != NULL;
2138 	     index = UT_LIST_GET_NEXT(indexes, index),
2139 	     node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry)) {
2140 
2141 		node->index = index;
2142 		err = row_ins_index_entry_set_vals(
2143 			node->index, node->entry, node->row);
2144 		if (err != DB_SUCCESS) {
2145 			break;
2146 		}
2147 
2148 		if (dict_index_is_clust(index)) {
2149 			err = row_ins_clust_index_entry(
2150 				node->index, node->entry, thr, 0, false);
2151 		} else {
2152 			err = row_ins_sec_index_entry(
2153 				node->index, node->entry, thr, false);
2154 		}
2155 
2156 		if (err == DB_SUCCESS) {
2157 			inserted_upto = index;
2158 		} else {
2159 			break;
2160 		}
2161 	}
2162 
2163 	/* Step-5: If error is encountered while inserting entries to any
2164 	of the index then entries inserted to previous indexes are removed
2165 	explicity. Automatic rollback is not in action as UNDO logs are
2166 	turned-off. */
2167 	if (err != DB_SUCCESS) {
2168 
2169 		node->entry = UT_LIST_GET_FIRST(node->entry_list);
2170 
2171 		mtr_start(&mtr);
2172 		dict_disable_redo_if_temporary(node->table, &mtr);
2173 
2174 		for (dict_index_t* index =
2175 			UT_LIST_GET_FIRST(node->table->indexes);
2176 		     inserted_upto != NULL;
2177 		     index = UT_LIST_GET_NEXT(indexes, index),
2178 		     node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry)) {
2179 
2180 			row_explicit_rollback(index, node->entry, thr, &mtr);
2181 
2182 			if (index == inserted_upto) {
2183 				break;
2184 			}
2185 		}
2186 
2187 		mtr_commit(&mtr);
2188 	} else {
2189 		/* Not protected by dict_table_stats_lock() for performance
2190 		reasons, we would rather get garbage in stat_n_rows (which is
2191 		just an estimate anyway) than protecting the following code
2192 		, with a latch. */
2193 		dict_table_n_rows_inc(node->table);
2194 
2195 		srv_stats.n_rows_inserted.inc();
2196 	}
2197 
2198 	thr_get_trx(thr)->error_state = DB_SUCCESS;
2199 	return(err);
2200 }
2201 
2202 /** Determine is tablespace encrypted but decryption failed, is table corrupted
2203 or is tablespace .ibd file missing.
2204 @param[in]	table		Table
2205 @param[in]	trx		Transaction
2206 @param[in]	push_warning	true if we should push warning to user
2207 @retval	DB_DECRYPTION_FAILED	table is encrypted but decryption failed
2208 @retval	DB_CORRUPTION		table is corrupted
2209 @retval	DB_TABLESPACE_NOT_FOUND	tablespace .ibd file not found */
2210 static
2211 dberr_t
row_mysql_get_table_status(const dict_table_t * table,trx_t * trx,bool push_warning=true)2212 row_mysql_get_table_status(
2213 	const dict_table_t*	table,
2214 	trx_t*			trx,
2215 	bool 			push_warning = true)
2216 {
2217 	dberr_t err;
2218 	if (fil_space_t* space = fil_space_acquire_silent(table->space)) {
2219 		if (space->crypt_data && space->crypt_data->is_encrypted()) {
2220 			if (push_warning) {
2221 				push_warning_printf(trx->mysql_thd, Sql_condition::SL_WARNING,
2222 						    HA_ERR_DECRYPTION_FAILED, "Table %s in tablespace %u encrypted."
2223 						    "However key management plugin or used key_id is not found or"
2224 						    " used encryption algorithm or method does not match.",
2225 						    table->name.m_name, table->space);
2226 			}
2227 			err = DB_DECRYPTION_FAILED;
2228 		} else {
2229 			if (push_warning) {
2230 				push_warning_printf(trx->mysql_thd, Sql_condition::SL_WARNING,
2231 						    HA_ERR_CRASHED, "Table %s in tablespace %u corrupted.",
2232 						    table->name.m_name, table->space);
2233 			}
2234 			err = DB_CORRUPTION;
2235 		}
2236 		fil_space_release(space);
2237 	} else {
2238 		ib::error() << ".ibd file is missing for table "
2239 			<< table->name;
2240 		err = DB_TABLESPACE_NOT_FOUND;
2241 	}
2242 
2243 	return(err);
2244 }
2245 
2246 /** Does an insert for MySQL using INSERT graph. This function will run/execute
2247 INSERT graph.
2248 @param[in]	mysql_rec	row in the MySQL format
2249 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
2250 @return error code or DB_SUCCESS */
2251 static
2252 dberr_t
row_insert_for_mysql_using_ins_graph(const byte * mysql_rec,row_prebuilt_t * prebuilt)2253 row_insert_for_mysql_using_ins_graph(
2254 	const byte*	mysql_rec,
2255 	row_prebuilt_t*	prebuilt)
2256 {
2257 	trx_savept_t	savept;
2258 	que_thr_t*	thr;
2259 	dberr_t		err;
2260 	ibool		was_lock_wait;
2261 	trx_t*		trx		= prebuilt->trx;
2262 	ins_node_t*	node		= prebuilt->ins_node;
2263 	dict_table_t*	table		= prebuilt->table;
2264 
2265 	/* FIX_ME: This blob heap is used to compensate an issue in server
2266 	for virtual column blob handling */
2267 	mem_heap_t*	blob_heap = NULL;
2268 
2269 	ut_ad(trx);
2270 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
2271 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
2272 
2273 	if (dict_table_is_discarded(prebuilt->table)) {
2274 
2275 		ib::error() << "The table " << prebuilt->table->name
2276 			<< " doesn't have a corresponding tablespace, it was"
2277 			" discarded.";
2278 
2279 		return(DB_TABLESPACE_DELETED);
2280 
2281 	} else if (!prebuilt->table->is_readable()) {
2282 		return(row_mysql_get_table_status(prebuilt->table, trx, true));
2283 	} else if (srv_force_recovery) {
2284 
2285 		ib::error() << MODIFICATIONS_NOT_ALLOWED_MSG_FORCE_RECOVERY;
2286 
2287 		return(DB_READ_ONLY);
2288 	}
2289 
2290 	DBUG_EXECUTE_IF("mark_table_corrupted", {
2291 		/* Mark the table corrupted for the clustered index */
2292 		dict_index_t*	index = dict_table_get_first_index(table);
2293 		ut_ad(dict_index_is_clust(index));
2294 		dict_set_corrupted(index, trx, "INSERT TABLE"); });
2295 
2296 	if (dict_table_is_corrupted(table)) {
2297 
2298 		ib::error() << "Table " << table->name << " is corrupt.";
2299 		return(DB_TABLE_CORRUPT);
2300 	}
2301 
2302 	trx->op_info = "inserting";
2303 
2304 	row_mysql_delay_if_needed();
2305 
2306 	trx_start_if_not_started_xa(trx, true);
2307 
2308 	row_get_prebuilt_insert_row(prebuilt);
2309 	node = prebuilt->ins_node;
2310 
2311 	row_mysql_convert_row_to_innobase(node->row, prebuilt, mysql_rec,
2312 					  &blob_heap);
2313 
2314 	savept = trx_savept_take(trx);
2315 
2316 	thr = que_fork_get_first_thr(prebuilt->ins_graph);
2317 
2318 	if (prebuilt->sql_stat_start) {
2319 		node->state = INS_NODE_SET_IX_LOCK;
2320 		prebuilt->sql_stat_start = FALSE;
2321 	} else {
2322 		node->state = INS_NODE_ALLOC_ROW_ID;
2323 	}
2324 
2325 	que_thr_move_to_run_state_for_mysql(thr, trx);
2326 
2327 run_again:
2328 	thr->run_node = node;
2329 	thr->prev_node = node;
2330 
2331 	row_ins_step(thr);
2332 
2333 	DEBUG_SYNC_C("ib_after_row_insert_step");
2334 
2335 	err = trx->error_state;
2336 
2337 	if (err != DB_SUCCESS) {
2338 error_exit:
2339 		que_thr_stop_for_mysql(thr);
2340 
2341 		/* FIXME: What's this ? */
2342 		thr->lock_state = QUE_THR_LOCK_ROW;
2343 
2344 		was_lock_wait = row_mysql_handle_errors(
2345 			&err, trx, thr, &savept);
2346 
2347 		thr->lock_state = QUE_THR_LOCK_NOLOCK;
2348 
2349 		if (was_lock_wait) {
2350 			ut_ad(node->state == INS_NODE_INSERT_ENTRIES
2351 			      || node->state == INS_NODE_ALLOC_ROW_ID);
2352 			goto run_again;
2353 		}
2354 
2355 		trx->op_info = "";
2356 
2357 		if (blob_heap != NULL) {
2358 			mem_heap_free(blob_heap);
2359 		}
2360 
2361 		return(err);
2362 	}
2363 
2364 
2365 	if (dict_table_has_fts_index(table)) {
2366 		doc_id_t	doc_id;
2367 
2368 		/* Extract the doc id from the hidden FTS column */
2369 		doc_id = fts_get_doc_id_from_row(table, node->row);
2370 
2371 		if (doc_id <= 0) {
2372 			ib::error() << "FTS Doc ID must be large than 0";
2373 			err = DB_FTS_INVALID_DOCID;
2374 			trx->error_state = DB_FTS_INVALID_DOCID;
2375 			goto error_exit;
2376 		}
2377 
2378 		if (!DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
2379 			doc_id_t	next_doc_id
2380 				= table->fts->cache->next_doc_id;
2381 
2382 			if (doc_id < next_doc_id) {
2383 
2384 				ib::error() << "FTS Doc ID must be large than "
2385 					<< next_doc_id - 1 << " for table "
2386 					<< table->name;
2387 
2388 				err = DB_FTS_INVALID_DOCID;
2389 				trx->error_state = DB_FTS_INVALID_DOCID;
2390 				goto error_exit;
2391 			}
2392 
2393 			/* Difference between Doc IDs are restricted within
2394 			4 bytes integer. See fts_get_encoded_len(). Consecutive
2395 			doc_ids difference should not exceed
2396 			FTS_DOC_ID_MAX_STEP value. */
2397 
2398 			if (doc_id - next_doc_id >= FTS_DOC_ID_MAX_STEP) {
2399 				 ib::error() << "Doc ID " << doc_id
2400 					<< " is too big. Its difference with"
2401 					" largest used Doc ID "
2402 					<< next_doc_id - 1 << " cannot"
2403 					" exceed or equal to "
2404 					<< FTS_DOC_ID_MAX_STEP;
2405 				err = DB_FTS_INVALID_DOCID;
2406 				trx->error_state = DB_FTS_INVALID_DOCID;
2407 				goto error_exit;
2408 			}
2409 		}
2410 
2411 		/* Pass NULL for the columns affected, since an INSERT affects
2412 		all FTS indexes. */
2413 		fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
2414 	}
2415 
2416 	que_thr_stop_for_mysql_no_error(thr, trx);
2417 
2418 	srv_stats.n_rows_inserted.inc();
2419 
2420 	/* Not protected by dict_table_stats_lock() for performance
2421 	reasons, we would rather get garbage in stat_n_rows (which is
2422 	just an estimate anyway) than protecting the following code
2423 	with a latch. */
2424 	dict_table_n_rows_inc(table);
2425 
2426 	row_update_statistics_if_needed(table);
2427 	trx->op_info = "";
2428 
2429 	if (blob_heap != NULL) {
2430 		mem_heap_free(blob_heap);
2431 	}
2432 
2433 	return(err);
2434 }
2435 
2436 /** Does an insert for MySQL.
2437 @param[in]	mysql_rec	row in the MySQL format
2438 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
2439 @return error code or DB_SUCCESS*/
2440 dberr_t
row_insert_for_mysql(const byte * mysql_rec,row_prebuilt_t * prebuilt)2441 row_insert_for_mysql(
2442 	const byte*		mysql_rec,
2443 	row_prebuilt_t*		prebuilt)
2444 {
2445 	/* For intrinsic tables there a lot of restrictions that can be
2446 	relaxed including locking of table, transaction handling, etc.
2447 	Use direct cursor interface for inserting to intrinsic tables. */
2448 	if (dict_table_is_intrinsic(prebuilt->table)) {
2449 		return(row_insert_for_mysql_using_cursor(mysql_rec, prebuilt));
2450 	} else {
2451 		return(row_insert_for_mysql_using_ins_graph(
2452 			mysql_rec, prebuilt));
2453 	}
2454 }
2455 
2456 /*********************************************************************//**
2457 Builds a dummy query graph used in selects. */
2458 void
row_prebuild_sel_graph(row_prebuilt_t * prebuilt)2459 row_prebuild_sel_graph(
2460 /*===================*/
2461 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL
2462 					handle */
2463 {
2464 	sel_node_t*	node;
2465 
2466 	ut_ad(prebuilt && prebuilt->trx);
2467 
2468 	if (prebuilt->sel_graph == NULL) {
2469 
2470 		node = sel_node_create(prebuilt->heap);
2471 
2472 		prebuilt->sel_graph = static_cast<que_fork_t*>(
2473 			que_node_get_parent(
2474 				pars_complete_graph_for_exec(
2475 					static_cast<sel_node_t*>(node),
2476 					prebuilt->trx, prebuilt->heap,
2477 					prebuilt)));
2478 
2479 		prebuilt->sel_graph->state = QUE_FORK_ACTIVE;
2480 	}
2481 }
2482 
2483 /*********************************************************************//**
2484 Creates an query graph node of 'update' type to be used in the MySQL
2485 interface.
2486 @return own: update node */
2487 upd_node_t*
row_create_update_node_for_mysql(dict_table_t * table,mem_heap_t * heap)2488 row_create_update_node_for_mysql(
2489 /*=============================*/
2490 	dict_table_t*	table,	/*!< in: table to update */
2491 	mem_heap_t*	heap)	/*!< in: mem heap from which allocated */
2492 {
2493 	upd_node_t*	node;
2494 
2495 	DBUG_ENTER("row_create_update_node_for_mysql");
2496 
2497 	node = upd_node_create(heap);
2498 
2499 	node->in_mysql_interface = TRUE;
2500 	node->is_delete = FALSE;
2501 	node->searched_update = FALSE;
2502 	node->select = NULL;
2503 	node->pcur = btr_pcur_create_for_mysql();
2504 
2505 	DBUG_PRINT("info", ("node: %p, pcur: %p", node, node->pcur));
2506 
2507 	node->table = table;
2508 
2509 	node->update = upd_create(dict_table_get_n_cols(table)
2510 				  + dict_table_get_n_v_cols(table), heap);
2511 
2512 	node->update_n_fields = dict_table_get_n_cols(table);
2513 
2514 	UT_LIST_INIT(node->columns, &sym_node_t::col_var_list);
2515 
2516 	node->has_clust_rec_x_lock = TRUE;
2517 	node->cmpl_info = 0;
2518 
2519 	node->table_sym = NULL;
2520 	node->col_assign_list = NULL;
2521 
2522 	DBUG_RETURN(node);
2523 }
2524 
2525 /*********************************************************************//**
2526 Gets pointer to a prebuilt update vector used in updates. If the update
2527 graph has not yet been built in the prebuilt struct, then this function
2528 first builds it.
2529 @return prebuilt update vector */
2530 upd_t*
row_get_prebuilt_update_vector(row_prebuilt_t * prebuilt)2531 row_get_prebuilt_update_vector(
2532 /*===========================*/
2533 	row_prebuilt_t*	prebuilt)	/*!< in: prebuilt struct in MySQL
2534 					handle */
2535 {
2536 	dict_table_t*	table	= prebuilt->table;
2537 	upd_node_t*	node;
2538 
2539 	ut_ad(prebuilt && table && prebuilt->trx);
2540 
2541 	if (prebuilt->upd_node == NULL) {
2542 
2543 		/* Not called before for this handle: create an update node
2544 		and query graph to the prebuilt struct */
2545 
2546 		node = row_create_update_node_for_mysql(table, prebuilt->heap);
2547 
2548 		prebuilt->upd_node = node;
2549 
2550 		prebuilt->upd_graph = static_cast<que_fork_t*>(
2551 			que_node_get_parent(
2552 				pars_complete_graph_for_exec(
2553 					static_cast<upd_node_t*>(node),
2554 					prebuilt->trx, prebuilt->heap,
2555 					prebuilt)));
2556 
2557 		prebuilt->upd_graph->state = QUE_FORK_ACTIVE;
2558 	}
2559 
2560 	return(prebuilt->upd_node->update);
2561 }
2562 
2563 /********************************************************************
2564 Handle an update of a column that has an FTS index. */
2565 static
2566 void
row_fts_do_update(trx_t * trx,dict_table_t * table,doc_id_t old_doc_id,doc_id_t new_doc_id)2567 row_fts_do_update(
2568 /*==============*/
2569 	trx_t*		trx,		/* in: transaction */
2570 	dict_table_t*	table,		/* in: Table with FTS index */
2571 	doc_id_t	old_doc_id,	/* in: old document id */
2572 	doc_id_t	new_doc_id)	/* in: new document id */
2573 {
2574 	if(trx->fts_next_doc_id) {
2575 		fts_trx_add_op(trx, table, old_doc_id, FTS_DELETE, NULL);
2576 		if(new_doc_id != FTS_NULL_DOC_ID)
2577 		fts_trx_add_op(trx, table, new_doc_id, FTS_INSERT, NULL);
2578 	}
2579 }
2580 
2581 /************************************************************************
2582 Handles FTS matters for an update or a delete.
2583 NOTE: should not be called if the table does not have an FTS index. .*/
2584 static
2585 dberr_t
row_fts_update_or_delete(row_prebuilt_t * prebuilt)2586 row_fts_update_or_delete(
2587 /*=====================*/
2588 	row_prebuilt_t*	prebuilt)	/* in: prebuilt struct in MySQL
2589 					handle */
2590 {
2591 	trx_t*		trx = prebuilt->trx;
2592 	dict_table_t*	table = prebuilt->table;
2593 	upd_node_t*	node = prebuilt->upd_node;
2594 	doc_id_t	old_doc_id = prebuilt->fts_doc_id;
2595 
2596 	DBUG_ENTER("row_fts_update_or_delete");
2597 
2598 	ut_a(dict_table_has_fts_index(prebuilt->table));
2599 
2600 	/* Deletes are simple; get them out of the way first. */
2601 	if (node->is_delete) {
2602 		/* A delete affects all FTS indexes, so we pass NULL */
2603 		fts_trx_add_op(trx, table, old_doc_id, FTS_DELETE, NULL);
2604 	} else {
2605 		doc_id_t	new_doc_id;
2606 		new_doc_id = fts_read_doc_id((byte*) &trx->fts_next_doc_id);
2607 
2608 		if (new_doc_id == 0) {
2609 			ib::error() << "InnoDB FTS: Doc ID cannot be 0";
2610 			return(DB_FTS_INVALID_DOCID);
2611 		}
2612 		row_fts_do_update(trx, table, old_doc_id, new_doc_id);
2613 	}
2614 
2615 	DBUG_RETURN(DB_SUCCESS);
2616 }
2617 
2618 /*********************************************************************//**
2619 Initialize the Doc ID system for FK table with FTS index */
2620 static
2621 void
init_fts_doc_id_for_ref(dict_table_t * table,ulint * depth)2622 init_fts_doc_id_for_ref(
2623 /*====================*/
2624 	dict_table_t*	table,		/*!< in: table */
2625 	ulint*		depth)		/*!< in: recusive call depth */
2626 {
2627 	dict_foreign_t* foreign;
2628 
2629 	table->fk_max_recusive_level = 0;
2630 
2631 	(*depth)++;
2632 
2633 	/* Limit on tables involved in cascading delete/update */
2634 	if (*depth > FK_MAX_CASCADE_DEL) {
2635 		return;
2636 	}
2637 
2638 	/* Loop through this table's referenced list and also
2639 	recursively traverse each table's foreign table list */
2640 	for (dict_foreign_set::iterator it = table->referenced_set.begin();
2641 	     it != table->referenced_set.end();
2642 	     ++it) {
2643 
2644 		foreign = *it;
2645 
2646 		ut_ad(foreign->foreign_table != NULL);
2647 
2648 		if (foreign->foreign_table->fts != NULL) {
2649 			fts_init_doc_id(foreign->foreign_table);
2650 		}
2651 
2652 		if (!foreign->foreign_table->referenced_set.empty()
2653 		    && foreign->foreign_table != table) {
2654 			init_fts_doc_id_for_ref(
2655 				foreign->foreign_table, depth);
2656 		}
2657 	}
2658 }
2659 
2660 /* A functor for decrementing counters. */
2661 class ib_dec_counter {
2662 public:
ib_dec_counter()2663 	ib_dec_counter() {}
2664 
operator ()(upd_node_t * node)2665 	void operator() (upd_node_t* node) {
2666 		ut_ad(node->table->n_foreign_key_checks_running > 0);
2667 		os_atomic_decrement_ulint(
2668 			&node->table->n_foreign_key_checks_running, 1);
2669 	}
2670 };
2671 
2672 /** Do an in-place update in the intrinsic table.  The update should not
2673 modify any of the keys and it should not change the size of any fields.
2674 @param[in]	node	the update node.
2675 @return DB_SUCCESS on success, an error code on failure. */
2676 static
2677 dberr_t
row_update_inplace_for_intrinsic(const upd_node_t * node)2678 row_update_inplace_for_intrinsic(const upd_node_t* node)
2679 {
2680 	mtr_t		mtr;
2681 	dict_table_t*	table = node->table;
2682 	mem_heap_t*	heap = node->heap;
2683 	dtuple_t*	entry = node->row;
2684 	ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2685 	ulint*          offsets         = offsets_;
2686 
2687 	ut_ad(dict_table_is_intrinsic(table));
2688 
2689 	rec_offs_init(offsets_);
2690 	mtr_start(&mtr);
2691 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
2692 
2693 	btr_pcur_t pcur;
2694 
2695 	dict_index_t* index = dict_table_get_first_index(table);
2696 
2697 	entry = row_build_index_entry(node->row, node->ext,
2698 				      index, heap);
2699 
2700 	btr_pcur_open(index, entry, PAGE_CUR_LE,
2701 		      BTR_MODIFY_LEAF, &pcur, &mtr);
2702 
2703 	rec_t* rec = btr_pcur_get_rec(&pcur);
2704 
2705 	ut_ad(!page_rec_is_infimum(rec));
2706 	ut_ad(!page_rec_is_supremum(rec));
2707 
2708 	offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
2709 
2710 	ut_ad(!cmp_dtuple_rec(entry, rec, offsets));
2711 
2712 	ut_ad(!rec_get_deleted_flag(rec, dict_table_is_comp(index->table)));
2713 
2714 	ut_ad(btr_pcur_get_block(&pcur)->made_dirty_with_no_latch);
2715 
2716 	bool size_changes = row_upd_changes_field_size_or_external(
2717 		index, offsets, node->update);
2718 
2719 	if (size_changes) {
2720 		mtr_commit(&mtr);
2721 		return(DB_FAIL);
2722 	}
2723 
2724 	row_upd_rec_in_place(rec, index, offsets, node->update, NULL);
2725 
2726 	/* Set the changed pages as modified, so that if the page is
2727 	evicted from the buffer pool it is flushed and we don't lose
2728 	the changes */
2729 
2730 	mtr.set_modified();
2731 
2732 	mtr_commit(&mtr);
2733 
2734 	return(DB_SUCCESS);
2735 }
2736 
2737 typedef	std::vector<btr_pcur_t, ut_allocator<btr_pcur_t> >	cursors_t;
2738 
2739 /** Delete row from table (corresponding entries from all the indexes).
2740 Function will maintain cursor to the entries to invoke explicity rollback
2741 just incase update action following delete fails.
2742 
2743 @param[in]	node		update node carrying information to delete.
2744 @param[out]	delete_entries	vector of cursor to deleted entries.
2745 @param[in]	restore_delete	if true, then restore DELETE records by
2746 				unmarking delete.
2747 @return error code or DB_SUCCESS */
2748 static
2749 dberr_t
row_delete_for_mysql_using_cursor(const upd_node_t * node,cursors_t & delete_entries,bool restore_delete)2750 row_delete_for_mysql_using_cursor(
2751 	const upd_node_t*	node,
2752 	cursors_t&		delete_entries,
2753 	bool			restore_delete)
2754 {
2755 	mtr_t		mtr;
2756 	dict_table_t*	table = node->table;
2757 	mem_heap_t*	heap = node->heap;
2758 	dberr_t		err = DB_SUCCESS;
2759 	dtuple_t*	entry;
2760 
2761 	mtr_start(&mtr);
2762 	dict_disable_redo_if_temporary(table, &mtr);
2763 
2764 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2765 	     index != NULL && err == DB_SUCCESS && !restore_delete;
2766 	     index = UT_LIST_GET_NEXT(indexes, index)) {
2767 
2768 		entry = row_build_index_entry(node->row, node->ext,
2769 					      index, heap);
2770 
2771 		btr_pcur_t	pcur;
2772 
2773 		btr_pcur_open(index, entry, PAGE_CUR_LE,
2774 			      BTR_MODIFY_LEAF, &pcur, &mtr);
2775 
2776 #ifdef UNIV_DEBUG
2777 		ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2778 		ulint*          offsets         = offsets_;
2779 		rec_offs_init(offsets_);
2780 
2781 		offsets = rec_get_offsets(
2782 			btr_cur_get_rec(btr_pcur_get_btr_cur(&pcur)),
2783 			index, offsets, ULINT_UNDEFINED, &heap);
2784 
2785 		ut_ad(!cmp_dtuple_rec(
2786 			entry, btr_cur_get_rec(btr_pcur_get_btr_cur(&pcur)),
2787 			offsets));
2788 #endif /* UNIV_DEBUG */
2789 
2790 		ut_ad(!rec_get_deleted_flag(
2791 			btr_cur_get_rec(btr_pcur_get_btr_cur(&pcur)),
2792 			dict_table_is_comp(index->table)));
2793 
2794 		ut_ad(btr_pcur_get_block(&pcur)->made_dirty_with_no_latch);
2795 
2796 		if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
2797 		    || page_rec_is_supremum(btr_pcur_get_rec(&pcur))) {
2798 			err = DB_ERROR;
2799 		} else {
2800 			btr_cur_t* btr_cur = btr_pcur_get_btr_cur(&pcur);
2801 
2802 			btr_rec_set_deleted_flag(
2803 				btr_cur_get_rec(btr_cur),
2804 				buf_block_get_page_zip(
2805 					btr_cur_get_block(btr_cur)),
2806 				TRUE);
2807 
2808 			/* Void call just to set mtr modification flag
2809 			to true failing which block is not scheduled for flush*/
2810 			byte* log_ptr = mlog_open(&mtr, 0);
2811 			ut_ad(log_ptr == NULL);
2812 			if (log_ptr != NULL) {
2813 				/* To keep complier happy. */
2814 				mlog_close(&mtr, log_ptr);
2815 			}
2816 
2817 			btr_pcur_store_position(&pcur, &mtr);
2818 
2819 			delete_entries.push_back(pcur);
2820 		}
2821 	}
2822 
2823 	if (err != DB_SUCCESS || restore_delete) {
2824 
2825 		/* Rollback half-way delete action that might have been
2826 		applied to few of the indexes. */
2827 		cursors_t::iterator	end = delete_entries.end();
2828 		for (cursors_t::iterator it = delete_entries.begin();
2829 		     it != end;
2830 		     ++it) {
2831 
2832 			ibool success = btr_pcur_restore_position(
2833 				BTR_MODIFY_LEAF, &(*it), &mtr);
2834 
2835 			if (!success) {
2836 				ut_a(success);
2837 			} else {
2838 				btr_cur_t* btr_cur = btr_pcur_get_btr_cur(
2839 					&(*it));
2840 
2841 				ut_ad(btr_cur_get_block(
2842 					btr_cur)->made_dirty_with_no_latch);
2843 
2844 				btr_rec_set_deleted_flag(
2845 					btr_cur_get_rec(btr_cur),
2846 					buf_block_get_page_zip(
2847 						btr_cur_get_block(btr_cur)),
2848 					FALSE);
2849 
2850 				/* Void call just to set mtr modification flag
2851 				to true failing which block is not scheduled for
2852 				flush. */
2853 				byte* log_ptr = mlog_open(&mtr, 0);
2854 				ut_ad(log_ptr == NULL);
2855 				if (log_ptr != NULL) {
2856 					/* To keep complier happy. */
2857 					mlog_close(&mtr, log_ptr);
2858 				}
2859 			}
2860 		}
2861 	}
2862 
2863 	mtr_commit(&mtr);
2864 
2865 	return(err);
2866 }
2867 
2868 /** Does an update of a row for MySQL by inserting new entry with update values.
2869 @param[in]	node		update node carrying information to delete.
2870 @param[out]	delete_entries	vector of cursor to deleted entries.
2871 @param[in]	thr		thread handler
2872 @return error code or DB_SUCCESS */
2873 static
2874 dberr_t
row_update_for_mysql_using_cursor(const upd_node_t * node,cursors_t & delete_entries,que_thr_t * thr)2875 row_update_for_mysql_using_cursor(
2876 	const upd_node_t*	node,
2877 	cursors_t&		delete_entries,
2878 	que_thr_t*		thr)
2879 {
2880 	dberr_t		err = DB_SUCCESS;
2881 	dict_table_t*	table = node->table;
2882 	mem_heap_t*	heap = node->heap;
2883 	dtuple_t*	entry;
2884 	dfield_t*	trx_id_field;
2885 
2886 	/* Step-1: Update row-id column if table doesn't have unique index. */
2887 	if (!dict_index_is_unique(dict_table_get_first_index(table))) {
2888 		/* Update the row_id column. */
2889 		dfield_t*	row_id_field;
2890 
2891 		row_id_field = dtuple_get_nth_field(
2892 			node->upd_row, dict_table_get_n_cols(table) - 2);
2893 
2894 		dict_sys_write_row_id(
2895 			static_cast<byte*>(row_id_field->data),
2896 			dict_table_get_next_table_sess_row_id(node->table));
2897 	}
2898 
2899 	/* Step-2: Update the trx_id column. */
2900 	trx_id_field = dtuple_get_nth_field(
2901 		node->upd_row, dict_table_get_n_cols(table) - 1);
2902 	trx_write_trx_id(static_cast<byte*>(trx_id_field->data),
2903 			 dict_table_get_next_table_sess_trx_id(node->table));
2904 
2905 
2906 	/* Step-3: Check if UPDATE can lead to DUPLICATE key violation.
2907 	If yes, then avoid executing it and return error. Only after ensuring
2908 	that UPDATE is safe execute it as we can't rollback. */
2909 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2910 	     index != NULL && err == DB_SUCCESS;
2911 	     index = UT_LIST_GET_NEXT(indexes, index)) {
2912 
2913 		entry = row_build_index_entry(
2914 			node->upd_row, node->upd_ext, index, heap);
2915 
2916 
2917 		if (dict_index_is_clust(index)) {
2918 			if (!dict_index_is_auto_gen_clust(index)) {
2919 				err = row_ins_clust_index_entry(
2920 					index, entry, thr,
2921 					entry->get_n_ext(),
2922 					true);
2923 			}
2924 		} else {
2925 			err = row_ins_sec_index_entry(index, entry, thr, true);
2926 		}
2927 	}
2928 
2929 	if (err != DB_SUCCESS) {
2930 		/* This suggest update can't be executed safely.
2931 		Avoid executing update. Rollback DELETE action. */
2932 		row_delete_for_mysql_using_cursor(node, delete_entries, true);
2933 	}
2934 
2935 	/* Step-4: It is now safe to execute update if there is no error */
2936 	for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
2937 	     index != NULL && err == DB_SUCCESS;
2938 	     index = UT_LIST_GET_NEXT(indexes, index)) {
2939 
2940 		entry = row_build_index_entry(
2941 			node->upd_row, node->upd_ext, index, heap);
2942 
2943 		if (dict_index_is_clust(index)) {
2944 
2945 			err = row_ins_clust_index_entry(
2946 				index, entry, thr,
2947 				entry->get_n_ext(),
2948 				false);
2949 			/* Commit the open mtr as we are processing UPDATE. */
2950 			if (index->last_ins_cur) {
2951 				index->last_ins_cur->release();
2952 			}
2953 		} else {
2954 			err = row_ins_sec_index_entry(index, entry, thr, false);
2955 		}
2956 
2957 		/* Too big record is valid error and suggestion is to use
2958 		bigger page-size or different format. */
2959 		ut_ad(err == DB_SUCCESS
2960 		      || err == DB_TOO_BIG_RECORD
2961 		      || err == DB_OUT_OF_FILE_SPACE);
2962 
2963 		if (err == DB_TOO_BIG_RECORD) {
2964 			row_delete_for_mysql_using_cursor(
2965 				node, delete_entries, true);
2966 		}
2967 	}
2968 
2969 	return(err);
2970 }
2971 
2972 /** Does an update or delete of a row for MySQL.
2973 @param[in]	mysql_rec	row in the MySQL format
2974 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
2975 @return error code or DB_SUCCESS */
2976 static
2977 dberr_t
row_del_upd_for_mysql_using_cursor(const byte * mysql_rec,row_prebuilt_t * prebuilt)2978 row_del_upd_for_mysql_using_cursor(
2979 	const byte*		mysql_rec,
2980 	row_prebuilt_t*		prebuilt)
2981 {
2982 	dberr_t			err = DB_SUCCESS;
2983 	upd_node_t*		node;
2984 	cursors_t		delete_entries;
2985 	dict_index_t*		clust_index;
2986 	que_thr_t*		thr = NULL;
2987 
2988 	/* Step-0: If there is cached insert position commit it before
2989 	starting delete/update action as this can result in btree structure
2990 	to change. */
2991 	thr = que_fork_get_first_thr(prebuilt->upd_graph);
2992 	clust_index = dict_table_get_first_index(prebuilt->table);
2993 	if (clust_index->last_ins_cur) {
2994 		clust_index->last_ins_cur->release();
2995 	}
2996 
2997 	/* Step-1: Select the appropriate cursor that will help build
2998 	the original row and updated row. */
2999 	node = prebuilt->upd_node;
3000 	if (prebuilt->pcur->btr_cur.index == clust_index) {
3001 		btr_pcur_copy_stored_position(node->pcur, prebuilt->pcur);
3002 	} else {
3003 		btr_pcur_copy_stored_position(node->pcur,
3004 					      prebuilt->clust_pcur);
3005 	}
3006 
3007 	ut_ad(dict_table_is_intrinsic(prebuilt->table));
3008 	ut_ad(!prebuilt->table->n_v_cols);
3009 
3010 	/* Internal table is created by optimiser. So there
3011 	should not be any virtual columns. */
3012 	row_upd_store_row(node, NULL, NULL);
3013 
3014 	if (!node->is_delete) {
3015 		/* UPDATE operation */
3016 		bool key_changed = false;
3017 		dict_table_t* table = prebuilt->table;
3018 
3019 		for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
3020 			index != NULL;
3021 			index = UT_LIST_GET_NEXT(indexes, index)) {
3022 
3023 			key_changed = row_upd_changes_ord_field_binary(
3024 				index, node->update, thr, node->upd_row,
3025 				node->upd_ext);
3026 
3027 			if (key_changed) {
3028 				break;
3029 			}
3030 		}
3031 
3032 		if (!key_changed) {
3033 			err = row_update_inplace_for_intrinsic(node);
3034 
3035 			if (err == DB_SUCCESS) {
3036 				return(err);
3037 			}
3038 		}
3039 	}
3040 
3041 	/* Step-2: Execute DELETE operation. */
3042 	err = row_delete_for_mysql_using_cursor(node, delete_entries, false);
3043 
3044 	/* Step-3: If only DELETE operation then exit immediately. */
3045 	if (node->is_delete) {
3046 		if (err == DB_SUCCESS) {
3047 			dict_table_n_rows_dec(prebuilt->table);
3048 			srv_stats.n_rows_deleted.inc();
3049 		}
3050 	}
3051 
3052 	if (err == DB_SUCCESS && !node->is_delete) {
3053 		/* Step-4: Complete UPDATE operation by inserting new row with
3054 		updated data. */
3055 		err = row_update_for_mysql_using_cursor(
3056 			node, delete_entries, thr);
3057 
3058 		if (err == DB_SUCCESS) {
3059 			srv_stats.n_rows_updated.inc();
3060 		}
3061 	}
3062 
3063 	thr_get_trx(thr)->error_state = DB_SUCCESS;
3064 	cursors_t::iterator	end = delete_entries.end();
3065 	for (cursors_t::iterator it = delete_entries.begin(); it != end; ++it) {
3066 		btr_pcur_close(&(*it));
3067 	}
3068 
3069 	return(err);
3070 }
3071 
3072 /** Does an update or delete of a row for MySQL.
3073 @param[in]	mysql_rec	row in the MySQL format
3074 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
3075 @return error code or DB_SUCCESS */
3076 static
3077 dberr_t
row_update_for_mysql_using_upd_graph(const byte * mysql_rec,row_prebuilt_t * prebuilt)3078 row_update_for_mysql_using_upd_graph(
3079 	const byte*	mysql_rec,
3080 	row_prebuilt_t*	prebuilt)
3081 {
3082 	trx_savept_t	savept;
3083 	dberr_t		err;
3084 	que_thr_t*	thr;
3085 	ibool		was_lock_wait;
3086 	dict_index_t*	clust_index;
3087 	upd_node_t*	node;
3088 	dict_table_t*	table		= prebuilt->table;
3089 	trx_t*		trx		= prebuilt->trx;
3090 	ulint		fk_depth	= 0;
3091 	bool		got_s_lock	= false;
3092 
3093 	DBUG_ENTER("row_update_for_mysql_using_upd_graph");
3094 
3095 	ut_ad(trx);
3096 	ut_a(prebuilt->magic_n == ROW_PREBUILT_ALLOCATED);
3097 	ut_a(prebuilt->magic_n2 == ROW_PREBUILT_ALLOCATED);
3098 	UT_NOT_USED(mysql_rec);
3099 
3100 	if (prebuilt->table->file_unreadable) {
3101 		ib::error() << "MySQL is trying to use a table handle but the"
3102 			" .ibd file for table " << prebuilt->table->name
3103 			<< " does not exist. Have you deleted"
3104 			" the .ibd file from the database directory under"
3105 			" the MySQL datadir, or have you used DISCARD"
3106 			" TABLESPACE? " << TROUBLESHOOTING_MSG;
3107 		DBUG_RETURN(DB_ERROR);
3108 	}
3109 
3110 	if(srv_force_recovery) {
3111 		ib::error() << MODIFICATIONS_NOT_ALLOWED_MSG_FORCE_RECOVERY;
3112 		DBUG_RETURN(DB_READ_ONLY);
3113 	}
3114 
3115 	DEBUG_SYNC_C("innodb_row_update_for_mysql_begin");
3116 
3117 	trx->op_info = "updating or deleting";
3118 
3119 	row_mysql_delay_if_needed();
3120 
3121 	init_fts_doc_id_for_ref(table, &fk_depth);
3122 
3123 	trx_start_if_not_started_xa(trx, true);
3124 
3125 	if (dict_table_is_referenced_by_foreign_key(table)) {
3126 		/* Share lock the data dictionary to prevent any
3127 		table dictionary (for foreign constraint) change.
3128 		This is similar to row_ins_check_foreign_constraint
3129 		check protect by the dictionary lock as well.
3130 		In the future, this can be removed once the Foreign
3131 		key MDL is implemented */
3132 		row_mysql_freeze_data_dictionary(trx);
3133 		init_fts_doc_id_for_ref(table, &fk_depth);
3134 		row_mysql_unfreeze_data_dictionary(trx);
3135 	}
3136 
3137 	node = prebuilt->upd_node;
3138 
3139 	clust_index = dict_table_get_first_index(table);
3140 
3141 	if (prebuilt->pcur->btr_cur.index == clust_index) {
3142 		btr_pcur_copy_stored_position(node->pcur, prebuilt->pcur);
3143 	} else {
3144 		btr_pcur_copy_stored_position(node->pcur,
3145 					      prebuilt->clust_pcur);
3146 	}
3147 
3148 	ut_a(node->pcur->rel_pos == BTR_PCUR_ON);
3149 
3150 	/* MySQL seems to call rnd_pos before updating each row it
3151 	has cached: we can get the correct cursor position from
3152 	prebuilt->pcur; NOTE that we cannot build the row reference
3153 	from mysql_rec if the clustered index was automatically
3154 	generated for the table: MySQL does not know anything about
3155 	the row id used as the clustered index key */
3156 
3157 	savept = trx_savept_take(trx);
3158 
3159 	thr = que_fork_get_first_thr(prebuilt->upd_graph);
3160 
3161 	node->state = UPD_NODE_UPDATE_CLUSTERED;
3162 
3163 	ut_ad(!prebuilt->sql_stat_start);
3164 
3165 	que_thr_move_to_run_state_for_mysql(thr, trx);
3166 
3167 
3168 run_again:
3169 	thr->run_node = node;
3170 	thr->prev_node = node;
3171 	thr->fk_cascade_depth = 0;
3172 
3173 	row_upd_step(thr);
3174 
3175 	err = trx->error_state;
3176 
3177 	if (err != DB_SUCCESS) {
3178 
3179 		que_thr_stop_for_mysql(thr);
3180 
3181 		if (err == DB_RECORD_NOT_FOUND) {
3182 			trx->error_state = DB_SUCCESS;
3183 			trx->op_info = "";
3184 			goto error;
3185 		}
3186 
3187 		thr->lock_state= QUE_THR_LOCK_ROW;
3188 
3189 		DEBUG_SYNC(trx->mysql_thd, "row_update_for_mysql_error");
3190 
3191 		was_lock_wait = row_mysql_handle_errors(&err, trx, thr,
3192 							&savept);
3193 		thr->lock_state= QUE_THR_LOCK_NOLOCK;
3194 
3195 		if (was_lock_wait) {
3196 			goto run_again;
3197 		}
3198 
3199 		trx->op_info = "";
3200 		goto error;
3201 	}
3202 
3203 
3204 	que_thr_stop_for_mysql_no_error(thr, trx);
3205 
3206 	if (dict_table_has_fts_index(table)
3207 	    && trx->fts_next_doc_id != UINT64_UNDEFINED) {
3208 		err = row_fts_update_or_delete(prebuilt);
3209 		ut_ad(err == DB_SUCCESS);
3210 		if (err != DB_SUCCESS) {
3211 			goto error;
3212 		}
3213 	}
3214 
3215 	/* Completed cascading operations (if any) */
3216 	if (got_s_lock) {
3217 		row_mysql_unfreeze_data_dictionary(trx);
3218 	}
3219 
3220 	if (node->is_delete) {
3221 		/* Not protected by dict_table_stats_lock() for performance
3222 		reasons, we would rather get garbage in stat_n_rows (which is
3223 		just an estimate anyway) than protecting the following code
3224 		with a latch. */
3225 		dict_table_n_rows_dec(prebuilt->table);
3226 
3227 		srv_stats.n_rows_deleted.inc();
3228 	} else {
3229 		srv_stats.n_rows_updated.inc();
3230 	}
3231 
3232 	/* We update table statistics only if it is a DELETE or UPDATE
3233 	that changes indexed columns, UPDATEs that change only non-indexed
3234 	columns would not affect statistics. */
3235 	if (node->is_delete || !(node->cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
3236 		row_update_statistics_if_needed(prebuilt->table);
3237 	}
3238 
3239 	trx->op_info = "";
3240 
3241 	DBUG_RETURN(err);
3242 
3243 error:
3244 	if (got_s_lock) {
3245 		row_mysql_unfreeze_data_dictionary(trx);
3246 	}
3247 	DBUG_RETURN(err);
3248 }
3249 
3250 /** Does an update or delete of a row for MySQL.
3251 @param[in]	mysql_rec	row in the MySQL format
3252 @param[in,out]	prebuilt	prebuilt struct in MySQL handle
3253 @return error code or DB_SUCCESS */
3254 dberr_t
row_update_for_mysql(const byte * mysql_rec,row_prebuilt_t * prebuilt)3255 row_update_for_mysql(
3256 	const byte*		mysql_rec,
3257 	row_prebuilt_t*		prebuilt)
3258 {
3259 	if (dict_table_is_intrinsic(prebuilt->table)) {
3260 		return(row_del_upd_for_mysql_using_cursor(mysql_rec, prebuilt));
3261 	} else {
3262 		ut_a(prebuilt->template_type == ROW_MYSQL_WHOLE_ROW);
3263 		return(row_update_for_mysql_using_upd_graph(
3264 			mysql_rec, prebuilt));
3265 	}
3266 }
3267 
3268 /** Delete all rows for the given table by freeing/truncating indexes.
3269 @param[in,out]	table	table handler
3270 @return error code or DB_SUCCESS */
3271 dberr_t
row_delete_all_rows(dict_table_t * table)3272 row_delete_all_rows(
3273 	dict_table_t*	table)
3274 {
3275 	dberr_t		err = DB_SUCCESS;
3276 	dict_index_t*	index;
3277 
3278 
3279 	index = dict_table_get_first_index(table);
3280 	/* Step-0: If there is cached insert position along with mtr
3281 	commit it before starting delete/update action. */
3282 	if (index->last_ins_cur) {
3283 		index->last_ins_cur->release();
3284 	}
3285 
3286 	/* Step-1: Now truncate all the indexes and re-create them.
3287 	Note: This is ddl action even though delete all rows is
3288 	DML action. Any error during this action is ir-reversible. */
3289 	for (index = UT_LIST_GET_FIRST(table->indexes);
3290 	     index != NULL && err == DB_SUCCESS;
3291 	     index = UT_LIST_GET_NEXT(indexes, index)) {
3292 
3293 		err = dict_truncate_index_tree_in_mem(index);
3294 		// TODO: what happen if get an error
3295 		ut_ad(err == DB_SUCCESS);
3296 	}
3297 
3298 	return(err);
3299 }
3300 
3301 /** This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
3302 session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
3303 Before calling this function row_search_for_mysql() must have
3304 initialized prebuilt->new_rec_locks to store the information which new
3305 record locks really were set. This function removes a newly set
3306 clustered index record lock under prebuilt->pcur or
3307 prebuilt->clust_pcur.  Thus, this implements a 'mini-rollback' that
3308 releases the latest clustered index record lock we set.
3309 @param[in,out]	prebuilt		prebuilt struct in MySQL handle
3310 @param[in]	has_latches_on_recs	TRUE if called so that we have the
3311 					latches on the records under pcur
3312 					and clust_pcur, and we do not need
3313 					to reposition the cursors. */
3314 void
row_unlock_for_mysql(row_prebuilt_t * prebuilt,ibool has_latches_on_recs)3315 row_unlock_for_mysql(
3316 	row_prebuilt_t*	prebuilt,
3317 	ibool		has_latches_on_recs)
3318 {
3319 	btr_pcur_t*	pcur		= prebuilt->pcur;
3320 	btr_pcur_t*	clust_pcur	= prebuilt->clust_pcur;
3321 	trx_t*		trx		= prebuilt->trx;
3322 
3323 	ut_ad(prebuilt != NULL);
3324 	ut_ad(trx != NULL);
3325 
3326 	if (UNIV_UNLIKELY
3327 	    (!srv_locks_unsafe_for_binlog
3328 	     && trx->isolation_level > TRX_ISO_READ_COMMITTED)) {
3329 
3330 		ib::error() << "Calling row_unlock_for_mysql though"
3331 			" innodb_locks_unsafe_for_binlog is FALSE and this"
3332 			" session is not using READ COMMITTED isolation"
3333 			" level.";
3334 		return;
3335 	}
3336 	if (dict_index_is_spatial(prebuilt->index)) {
3337 		return;
3338 	}
3339 
3340 	trx->op_info = "unlock_row";
3341 
3342 	if (prebuilt->new_rec_locks >= 1) {
3343 
3344 		const rec_t*	rec;
3345 		dict_index_t*	index;
3346 		trx_id_t	rec_trx_id;
3347 		mtr_t		mtr;
3348 
3349 		mtr_start(&mtr);
3350 
3351 		/* Restore the cursor position and find the record */
3352 
3353 		if (!has_latches_on_recs) {
3354 			btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, &mtr);
3355 		}
3356 
3357 		rec = btr_pcur_get_rec(pcur);
3358 		index = btr_pcur_get_btr_cur(pcur)->index;
3359 
3360 		if (prebuilt->new_rec_locks >= 2) {
3361 			/* Restore the cursor position and find the record
3362 			in the clustered index. */
3363 
3364 			if (!has_latches_on_recs) {
3365 				btr_pcur_restore_position(BTR_SEARCH_LEAF,
3366 							  clust_pcur, &mtr);
3367 			}
3368 
3369 			rec = btr_pcur_get_rec(clust_pcur);
3370 			index = btr_pcur_get_btr_cur(clust_pcur)->index;
3371 		}
3372 
3373 		if (!dict_index_is_clust(index)) {
3374 			/* This is not a clustered index record.  We
3375 			do not know how to unlock the record. */
3376 			goto no_unlock;
3377 		}
3378 
3379 		/* If the record has been modified by this
3380 		transaction, do not unlock it. */
3381 
3382 		if (index->trx_id_offset) {
3383 			rec_trx_id = trx_read_trx_id(rec
3384 						     + index->trx_id_offset);
3385 		} else {
3386 			mem_heap_t*	heap			= NULL;
3387 			ulint	offsets_[REC_OFFS_NORMAL_SIZE];
3388 			ulint*	offsets				= offsets_;
3389 
3390 			rec_offs_init(offsets_);
3391 			offsets = rec_get_offsets(rec, index, offsets,
3392 						  ULINT_UNDEFINED, &heap);
3393 
3394 			rec_trx_id = row_get_rec_trx_id(rec, index, offsets);
3395 
3396 			if (UNIV_LIKELY_NULL(heap)) {
3397 				mem_heap_free(heap);
3398 			}
3399 		}
3400 
3401 		if (rec_trx_id != trx->id) {
3402 			/* We did not update the record: unlock it */
3403 
3404 			rec = btr_pcur_get_rec(pcur);
3405 
3406 			lock_rec_unlock(
3407 				trx,
3408 				btr_pcur_get_block(pcur),
3409 				rec,
3410 				static_cast<enum lock_mode>(
3411 					prebuilt->select_lock_type));
3412 
3413 			if (prebuilt->new_rec_locks >= 2) {
3414 				rec = btr_pcur_get_rec(clust_pcur);
3415 
3416 				lock_rec_unlock(
3417 					trx,
3418 					btr_pcur_get_block(clust_pcur),
3419 					rec,
3420 					static_cast<enum lock_mode>(
3421 						prebuilt->select_lock_type));
3422 			}
3423 		}
3424 no_unlock:
3425 		mtr_commit(&mtr);
3426 	}
3427 
3428 	trx->op_info = "";
3429 }
3430 
3431 /**********************************************************************//**
3432 Does a cascaded delete or set null in a foreign key operation.
3433 @return error code or DB_SUCCESS */
3434 dberr_t
row_update_cascade_for_mysql(que_thr_t * thr,upd_node_t * node,dict_table_t * table)3435 row_update_cascade_for_mysql(
3436 /*=========================*/
3437         que_thr_t*      thr,    /*!< in: query thread */
3438         upd_node_t*     node,   /*!< in: update node used in the cascade
3439                                 or set null operation */
3440         dict_table_t*   table)  /*!< in: table where we do the operation */
3441 {
3442         dberr_t err;
3443         trx_t*  trx;
3444 
3445         trx = thr_get_trx(thr);
3446 
3447         /* Increment fk_cascade_depth to record the recursive call depth on
3448         a single update/delete that affects multiple tables chained
3449         together with foreign key relations. */
3450         thr->fk_cascade_depth++;
3451 
3452         if (thr->fk_cascade_depth > FK_MAX_CASCADE_DEL) {
3453                 return(DB_FOREIGN_EXCEED_MAX_CASCADE);
3454         }
3455 run_again:
3456         thr->run_node = node;
3457         thr->prev_node = node;
3458 
3459         DEBUG_SYNC_C("foreign_constraint_update_cascade");
3460 	TABLE *temp = thr->prebuilt->m_mysql_table;
3461 	thr->prebuilt->m_mysql_table = NULL ;
3462         row_upd_step(thr);
3463 	thr->prebuilt->m_mysql_table = temp;
3464         /* The recursive call for cascading update/delete happens
3465         in above row_upd_step(), reset the counter once we come
3466         out of the recursive call, so it does not accumulate for
3467         different row deletes */
3468         thr->fk_cascade_depth = 0;
3469 
3470         err = trx->error_state;
3471 
3472 
3473         /* Note that the cascade node is a subnode of another InnoDB
3474         query graph node. We do a normal lock wait in this node, but
3475         all errors are handled by the parent node. */
3476 
3477         if (err == DB_LOCK_WAIT) {
3478                 /* Handle lock wait here */
3479 
3480                 que_thr_stop_for_mysql(thr);
3481 
3482                 lock_wait_suspend_thread(thr);
3483 
3484                 /* Note that a lock wait may also end in a lock wait timeout,
3485                 or this transaction is picked as a victim in selective
3486                 deadlock resolution */
3487 
3488                 if (trx->error_state != DB_SUCCESS) {
3489 
3490                         return(trx->error_state);
3491                 }
3492 
3493                 /* Retry operation after a normal lock wait */
3494 
3495                 goto run_again;
3496         }
3497 
3498         if (err != DB_SUCCESS) {
3499 
3500                 return(err);
3501         }
3502 
3503         if (node->is_delete) {
3504                 /* Not protected by dict_table_stats_lock() for performance
3505                 reasons, we would rather get garbage in stat_n_rows (which is
3506                 just an estimate anyway) than protecting the following code
3507                 with a latch. */
3508                 dict_table_n_rows_dec(table);
3509 
3510                 srv_stats.n_rows_deleted.add((size_t)trx->id, 1);
3511         } else {
3512                 srv_stats.n_rows_updated.add((size_t)trx->id, 1);
3513         }
3514 
3515         row_update_statistics_if_needed(table);
3516 
3517         return(err);
3518 }
3519 
3520 /*********************************************************************//**
3521 Checks if a table is such that we automatically created a clustered
3522 index on it (on row id).
3523 @return TRUE if the clustered index was generated automatically */
3524 ibool
row_table_got_default_clust_index(const dict_table_t * table)3525 row_table_got_default_clust_index(
3526 /*==============================*/
3527 	const dict_table_t*	table)	/*!< in: table */
3528 {
3529 	const dict_index_t*	clust_index;
3530 
3531 	clust_index = dict_table_get_first_index(table);
3532 
3533 	return(dict_index_get_nth_col(clust_index, 0)->mtype == DATA_SYS);
3534 }
3535 
3536 /*********************************************************************//**
3537 Locks the data dictionary in shared mode from modifications, for performing
3538 foreign key check, rollback, or other operation invisible to MySQL. */
3539 void
row_mysql_freeze_data_dictionary_func(trx_t * trx,const char * file,ulint line)3540 row_mysql_freeze_data_dictionary_func(
3541 /*==================================*/
3542 	trx_t*		trx,	/*!< in/out: transaction */
3543 	const char*	file,	/*!< in: file name */
3544 	ulint		line)	/*!< in: line number */
3545 {
3546 	ut_a(trx->dict_operation_lock_mode == 0);
3547 
3548 	rw_lock_s_lock_inline(dict_operation_lock, 0, file, line);
3549 
3550 	trx->dict_operation_lock_mode = RW_S_LATCH;
3551 }
3552 
3553 /*********************************************************************//**
3554 Unlocks the data dictionary shared lock. */
3555 void
row_mysql_unfreeze_data_dictionary(trx_t * trx)3556 row_mysql_unfreeze_data_dictionary(
3557 /*===============================*/
3558 	trx_t*	trx)	/*!< in/out: transaction */
3559 {
3560 	ut_ad(lock_trx_has_sys_table_locks(trx) == NULL);
3561 
3562 	ut_a(trx->dict_operation_lock_mode == RW_S_LATCH);
3563 
3564 	rw_lock_s_unlock(dict_operation_lock);
3565 
3566 	trx->dict_operation_lock_mode = 0;
3567 }
3568 
3569 /*********************************************************************//**
3570 Locks the data dictionary exclusively for performing a table create or other
3571 data dictionary modification operation. */
3572 void
row_mysql_lock_data_dictionary_func(trx_t * trx,const char * file,ulint line)3573 row_mysql_lock_data_dictionary_func(
3574 /*================================*/
3575 	trx_t*		trx,	/*!< in/out: transaction */
3576 	const char*	file,	/*!< in: file name */
3577 	ulint		line)	/*!< in: line number */
3578 {
3579 	ut_a(trx->dict_operation_lock_mode == 0
3580 	     || trx->dict_operation_lock_mode == RW_X_LATCH);
3581 
3582 	/* Serialize data dictionary operations with dictionary mutex:
3583 	no deadlocks or lock waits can occur then in these operations */
3584 
3585 	rw_lock_x_lock_inline(dict_operation_lock, 0, file, line);
3586 	trx->dict_operation_lock_mode = RW_X_LATCH;
3587 
3588 	mutex_enter(&dict_sys->mutex);
3589 }
3590 
3591 /*********************************************************************//**
3592 Unlocks the data dictionary exclusive lock. */
3593 void
row_mysql_unlock_data_dictionary(trx_t * trx)3594 row_mysql_unlock_data_dictionary(
3595 /*=============================*/
3596 	trx_t*	trx)	/*!< in/out: transaction */
3597 {
3598 	ut_ad(lock_trx_has_sys_table_locks(trx) == NULL);
3599 
3600 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
3601 
3602 	/* Serialize data dictionary operations with dictionary mutex:
3603 	no deadlocks can occur then in these operations */
3604 
3605 	mutex_exit(&dict_sys->mutex);
3606 	rw_lock_x_unlock(dict_operation_lock);
3607 
3608 	trx->dict_operation_lock_mode = 0;
3609 }
3610 
3611 /*********************************************************************//**
3612 Creates a table for MySQL. On failure the transaction will be rolled back
3613 and the 'table' object will be freed.
3614 @return error code or DB_SUCCESS */
3615 dberr_t
row_create_table_for_mysql(dict_table_t * table,const char * compression,trx_t * trx,bool commit,fil_encryption_t mode,const CreateInfoEncryptionKeyId & create_info_encryption_key_id)3616 row_create_table_for_mysql(
3617 /*=======================*/
3618 	dict_table_t*	table,	/*!< in, own: table definition
3619 				(will be freed, or on DB_SUCCESS
3620 				added to the data dictionary cache) */
3621 	const char*	compression,
3622 				/*!< in: compression algorithm to use,
3623 				can be NULL */
3624 	trx_t*		trx,	/*!< in/out: transaction */
3625 	bool		commit, /*!< in: if true, commit the transaction */
3626 	fil_encryption_t mode,	/*!< in: encryption mode */
3627 	const CreateInfoEncryptionKeyId &create_info_encryption_key_id) { /*!< in: encryption key_id */
3628 
3629 	tab_node_t*	node;
3630 	mem_heap_t*	heap;
3631 	que_thr_t*	thr;
3632 	dberr_t		err;
3633 
3634 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3635 	ut_ad(mutex_own(&dict_sys->mutex));
3636 	ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3637 
3638 	DBUG_EXECUTE_IF(
3639 		"ib_create_table_fail_at_start_of_row_create_table_for_mysql",
3640 		goto err_exit;
3641 	);
3642 
3643 	trx->op_info = "creating table";
3644 
3645 	if (row_mysql_is_system_table(table->name.m_name)) {
3646 
3647 		ib::error() << "Trying to create a MySQL system table "
3648 			<< table->name << " of type InnoDB. MySQL system"
3649 			" tables must be of the MyISAM type!";
3650 #ifndef NDEBUG
3651 err_exit:
3652 #endif /* !NDEBUG */
3653 		dict_mem_table_free(table);
3654 
3655 		if (commit) {
3656 			trx_commit_for_mysql(trx);
3657 		}
3658 
3659 		trx->op_info = "";
3660 
3661 		return(DB_ERROR);
3662 	}
3663 
3664 	trx_start_if_not_started_xa(trx, true);
3665 
3666 	heap = mem_heap_create(512);
3667 
3668 	switch (trx_get_dict_operation(trx)) {
3669 	case TRX_DICT_OP_NONE:
3670 		trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
3671 	case TRX_DICT_OP_TABLE:
3672 		break;
3673 	case TRX_DICT_OP_INDEX:
3674 		/* If the transaction was previously flagged as
3675 		TRX_DICT_OP_INDEX, we should be creating auxiliary
3676 		tables for full-text indexes. */
3677 		ut_ad(strstr(table->name.m_name, "/FTS_") != NULL);
3678 	}
3679 
3680 	node = tab_create_graph_create(table, heap, mode, create_info_encryption_key_id);
3681 
3682 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
3683 
3684 	ut_a(thr == que_fork_start_command(
3685 			static_cast<que_fork_t*>(que_node_get_parent(thr))));
3686 
3687 	que_run_threads(thr);
3688 
3689 	err = trx->error_state;
3690 
3691 	/* Update SYS_TABLESPACES and SYS_DATAFILES if a new file-per-table
3692 	tablespace was created. */
3693 	if (err == DB_SUCCESS && dict_table_is_file_per_table(table)) {
3694 
3695 		ut_ad(dict_table_is_file_per_table(table));
3696 
3697 		char*	path;
3698 		path = fil_space_get_first_path(table->space);
3699 
3700 		err = dict_replace_tablespace_in_dictionary(
3701 			table->space, table->name.m_name,
3702 			fil_space_get_flags(table->space),
3703 			path, trx, commit);
3704 
3705 			ut_free(path);
3706 
3707 		if (err != DB_SUCCESS) {
3708 
3709 			/* We must delete the link file. */
3710 			RemoteDatafile::delete_link_file(table->name.m_name);
3711 
3712 		} else if (compression != NULL && compression[0] != '\0') {
3713 
3714 			ut_ad(!dict_table_in_shared_tablespace(table));
3715 
3716 			ut_ad(Compression::validate(compression) == DB_SUCCESS);
3717 
3718 			err = fil_set_compression(table, compression);
3719 
3720 			switch (err) {
3721 			case DB_SUCCESS:
3722 				break;
3723 			case DB_NOT_FOUND:
3724 			case DB_UNSUPPORTED:
3725 			case DB_IO_NO_PUNCH_HOLE_FS:
3726 				/* Return these errors */
3727 				break;
3728 			case DB_IO_NO_PUNCH_HOLE_TABLESPACE:
3729 				/* Page Compression will not be used. */
3730 				err = DB_SUCCESS;
3731 				break;
3732 			default:
3733 				ut_error;
3734 			}
3735 
3736 			/* We can check for file system punch hole support
3737                         only after creating the tablespace. On Windows
3738 			we can query that information but not on Linux. */
3739 			ut_ad(err == DB_SUCCESS
3740 				|| err == DB_IO_NO_PUNCH_HOLE_FS);
3741 
3742 			/* In non-strict mode we ignore dodgy compression
3743 			settings. */
3744 		}
3745 	}
3746 
3747 	switch (err) {
3748 	case DB_SUCCESS:
3749 	case DB_IO_NO_PUNCH_HOLE_FS:
3750 		break;
3751 	case DB_OUT_OF_FILE_SPACE:
3752 		trx->error_state = DB_SUCCESS;
3753 		trx_rollback_to_savepoint(trx, NULL);
3754 
3755 		ib::warn() << "Cannot create table "
3756 			<< table->name
3757 			<< " because tablespace full";
3758 
3759 		if (dict_table_open_on_name(table->name.m_name, TRUE, FALSE,
3760 					    DICT_ERR_IGNORE_NONE)) {
3761 
3762 			dict_table_close_and_drop(trx, table);
3763 
3764 			if (commit) {
3765 				trx_commit_for_mysql(trx);
3766 			}
3767 		} else {
3768 			dict_mem_table_free(table);
3769 		}
3770 
3771 		break;
3772 
3773 	case DB_UNSUPPORTED:
3774 	case DB_TOO_MANY_CONCURRENT_TRXS:
3775 		/* We already have .ibd file here. it should be deleted. */
3776 
3777 		if (dict_table_is_file_per_table(table)
3778 		    && fil_delete_tablespace(
3779 			    table->space,
3780 			    BUF_REMOVE_FLUSH_NO_WRITE)
3781 		    != DB_SUCCESS) {
3782 
3783 			ib::error() << "Not able to delete tablespace "
3784 				<< table->space << " of table "
3785 				<< table->name << "!";
3786 		}
3787 		/* fall through */
3788 
3789 	case DB_DUPLICATE_KEY:
3790 	case DB_TABLESPACE_EXISTS:
3791 	default:
3792 		trx->error_state = DB_SUCCESS;
3793 		trx_rollback_to_savepoint(trx, NULL);
3794 		dict_mem_table_free(table);
3795 		break;
3796 	}
3797 
3798 	que_graph_free((que_t*) que_node_get_parent(thr));
3799 
3800 	trx->op_info = "";
3801 
3802 	return(err);
3803 }
3804 
3805 /*********************************************************************//**
3806 Does an index creation operation for MySQL. TODO: currently failure
3807 to create an index results in dropping the whole table! This is no problem
3808 currently as all indexes must be created at the same time as the table.
3809 @return error number or DB_SUCCESS */
3810 dberr_t
row_create_index_for_mysql(dict_index_t * index,trx_t * trx,const ulint * field_lengths,dict_table_t * handler)3811 row_create_index_for_mysql(
3812 /*=======================*/
3813 	dict_index_t*	index,		/*!< in, own: index definition
3814 					(will be freed) */
3815 	trx_t*		trx,		/*!< in: transaction handle */
3816 	const ulint*	field_lengths,	/*!< in: if not NULL, must contain
3817 					dict_index_get_n_fields(index)
3818 					actual field lengths for the
3819 					index columns, which are
3820 					then checked for not being too
3821 					large. */
3822 	dict_table_t*	handler)	/*!< in/out: table handler. */
3823 {
3824 	ind_node_t*	node;
3825 	mem_heap_t*	heap;
3826 	que_thr_t*	thr;
3827 	dberr_t		err;
3828 	ulint		i;
3829 	ulint		len;
3830 	char*		table_name;
3831 	char*		index_name;
3832 	dict_table_t*	table = NULL;
3833 	ibool		is_fts;
3834 
3835 	trx->op_info = "creating index";
3836 
3837 	/* Copy the table name because we may want to drop the
3838 	table later, after the index object is freed (inside
3839 	que_run_threads()) and thus index->table_name is not available. */
3840 	table_name = mem_strdup(index->table_name);
3841 	index_name = mem_strdup(index->name);
3842 
3843 	is_fts = (index->type == DICT_FTS);
3844 
3845 	if (handler != NULL && dict_table_is_intrinsic(handler)) {
3846 		table = handler;
3847 	}
3848 
3849 	if (table == NULL) {
3850 
3851 		ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3852 		ut_ad(mutex_own(&dict_sys->mutex));
3853 
3854 		table = dict_table_open_on_name(table_name, TRUE, TRUE,
3855 						DICT_ERR_IGNORE_NONE);
3856 
3857 	} else {
3858 		table->acquire();
3859 		ut_ad(dict_table_is_intrinsic(table));
3860 	}
3861 
3862 	if (!dict_table_is_temporary(table)) {
3863 		trx_start_if_not_started_xa(trx, true);
3864 	}
3865 
3866 	for (i = 0; i < index->n_def; i++) {
3867 		/* Check that prefix_len and actual length
3868 		< DICT_MAX_INDEX_COL_LEN */
3869 
3870 		len = dict_index_get_nth_field(index, i)->prefix_len;
3871 
3872 		if (field_lengths && field_lengths[i]) {
3873 			len = ut_max(len, field_lengths[i]);
3874 		}
3875 
3876 		DBUG_EXECUTE_IF(
3877 			"ib_create_table_fail_at_create_index",
3878 			len = DICT_MAX_FIELD_LEN_BY_FORMAT(table) + 1;
3879 		);
3880 
3881 		/* Column or prefix length exceeds maximum column length */
3882 		if (len > (ulint) DICT_MAX_FIELD_LEN_BY_FORMAT(table)) {
3883 			err = DB_TOO_BIG_INDEX_COL;
3884 
3885 			dict_mem_index_free(index);
3886 			goto error_handling;
3887 		}
3888 	}
3889 
3890 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
3891 
3892 	/* For temp-table we avoid insertion into SYSTEM TABLES to
3893 	maintain performance and so we have separate path that directly
3894 	just updates dictonary cache. */
3895 	if (!dict_table_is_temporary(table)) {
3896 		/* Note that the space id where we store the index is
3897 		inherited from the table in dict_build_index_def_step()
3898 		in dict0crea.cc. */
3899 
3900 		heap = mem_heap_create(512);
3901 
3902 		node = ind_create_graph_create(index, heap, NULL);
3903 
3904 		thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
3905 
3906 		ut_a(thr == que_fork_start_command(
3907 				static_cast<que_fork_t*>(
3908 					que_node_get_parent(thr))));
3909 
3910 		que_run_threads(thr);
3911 
3912 		err = trx->error_state;
3913 
3914 		que_graph_free((que_t*) que_node_get_parent(thr));
3915 	} else {
3916 		dict_build_index_def(table, index, trx);
3917 
3918 		index_id_t index_id = index->id;
3919 
3920 		/* add index to dictionary cache and also free index object.
3921 		We allow instrinsic table to violate the size limits because
3922 		they are used by optimizer for all record formats. */
3923 		err = dict_index_add_to_cache(
3924 			table, index, FIL_NULL,
3925 			!dict_table_is_intrinsic(table)
3926 			&& trx_is_strict(trx));
3927 
3928 		if (err != DB_SUCCESS) {
3929 			goto error_handling;
3930 		}
3931 
3932 		/* as above function has freed index object re-load it
3933 		now from dictionary cache using index_id */
3934 		if (!dict_table_is_intrinsic(table)) {
3935 			index = dict_index_get_if_in_cache_low(index_id);
3936 		} else {
3937 			index = dict_table_find_index_on_id(table, index_id);
3938 
3939 			/* trx_id field is used for tracking which transaction
3940 			created the index. For intrinsic table this is
3941 			ir-relevant and so re-use it for tracking consistent
3942 			view while processing SELECT as part of UPDATE. */
3943 			index->trx_id = ULINT_UNDEFINED;
3944 		}
3945 		ut_a(index != NULL);
3946 		index->table = table;
3947 
3948 		err = dict_create_index_tree_in_mem(index, trx);
3949 
3950 		if (err != DB_SUCCESS && !dict_table_is_intrinsic(table)) {
3951 			dict_index_remove_from_cache(table, index);
3952 		}
3953 	}
3954 
3955 	/* Create the index specific FTS auxiliary tables. */
3956 	if (err == DB_SUCCESS && is_fts) {
3957 		dict_index_t*	idx;
3958 
3959 		idx = dict_table_get_index_on_name(table, index_name);
3960 
3961 		ut_ad(idx);
3962 		err = fts_create_index_tables_low(
3963 			trx, idx, table->name.m_name, table->id);
3964 	}
3965 
3966 error_handling:
3967 	dict_table_close(table, TRUE, FALSE);
3968 
3969 	if (err != DB_SUCCESS) {
3970 		/* We have special error handling here */
3971 
3972 		trx->error_state = DB_SUCCESS;
3973 
3974 		if (trx_is_started(trx)) {
3975 
3976 			trx_rollback_to_savepoint(trx, NULL);
3977 		}
3978 
3979 		row_drop_table_for_mysql(table_name, trx, FALSE, true, handler);
3980 
3981 		if (trx_is_started(trx)) {
3982 
3983 			trx_commit_for_mysql(trx);
3984 		}
3985 
3986 		trx->error_state = DB_SUCCESS;
3987 	}
3988 
3989 	trx->op_info = "";
3990 
3991 	ut_free(table_name);
3992 	ut_free(index_name);
3993 
3994 	return(err);
3995 }
3996 
3997 /*********************************************************************//**
3998 Scans a table create SQL string and adds to the data dictionary
3999 the foreign key constraints declared in the string. This function
4000 should be called after the indexes for a table have been created.
4001 Each foreign key constraint must be accompanied with indexes in
4002 bot participating tables. The indexes are allowed to contain more
4003 fields than mentioned in the constraint.
4004 
4005 @param[in]	trx		transaction
4006 @param[in]	sql_string	table create statement where
4007 				foreign keys are declared like:
4008 				FOREIGN KEY (a, b) REFERENCES table2(c, d),
4009 				table2 can be written also with the database
4010 				name before it: test.table2; the default
4011 				database id the database of parameter name
4012 @param[in]	sql_length	length of sql_string
4013 @param[in]	name		table full name in normalized form
4014 @param[in]	reject_fks	if TRUE, fail with error code
4015 				DB_CANNOT_ADD_CONSTRAINT if any
4016 				foreign keys are found.
4017 @return error code or DB_SUCCESS */
4018 dberr_t
row_table_add_foreign_constraints(trx_t * trx,const char * sql_string,size_t sql_length,const char * name,ibool reject_fks)4019 row_table_add_foreign_constraints(
4020 	trx_t*			trx,
4021 	const char*		sql_string,
4022 	size_t			sql_length,
4023 	const char*		name,
4024 	ibool			reject_fks)
4025 {
4026 	dberr_t	err;
4027 
4028 	DBUG_ENTER("row_table_add_foreign_constraints");
4029 
4030 	ut_ad(mutex_own(&dict_sys->mutex));
4031 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
4032 	ut_a(sql_string);
4033 
4034 	trx->op_info = "adding foreign keys";
4035 
4036 	trx_start_if_not_started_xa(trx, true);
4037 
4038 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
4039 
4040 	err = dict_create_foreign_constraints(
4041 		trx, sql_string, sql_length, name, reject_fks);
4042 
4043 	DBUG_EXECUTE_IF("ib_table_add_foreign_fail",
4044 			err = DB_DUPLICATE_KEY;);
4045 
4046 	DEBUG_SYNC_C("table_add_foreign_constraints");
4047 
4048 	/* Check like this shouldn't be done for table that doesn't
4049 	have foreign keys but code still continues to run with void action.
4050 	Disable it for intrinsic table at-least */
4051 	if (err == DB_SUCCESS) {
4052 		/* Check that also referencing constraints are ok */
4053 		dict_names_t	fk_tables;
4054 		err = dict_load_foreigns(name, NULL, false, true,
4055 					 DICT_ERR_IGNORE_NONE, fk_tables);
4056 
4057 		while (err == DB_SUCCESS && !fk_tables.empty()) {
4058 			dict_load_table(fk_tables.front(), true,
4059 					DICT_ERR_IGNORE_NONE);
4060 			fk_tables.pop_front();
4061 		}
4062 	}
4063 
4064 	if (err != DB_SUCCESS) {
4065 		/* We have special error handling here */
4066 
4067 		trx->error_state = DB_SUCCESS;
4068 
4069 		if (trx_is_started(trx)) {
4070 
4071 			trx_rollback_to_savepoint(trx, NULL);
4072 		}
4073 
4074 		row_drop_table_for_mysql(name, trx, FALSE, true);
4075 
4076 		if (trx_is_started(trx)) {
4077 
4078 			trx_commit_for_mysql(trx);
4079 		}
4080 
4081 		trx->error_state = DB_SUCCESS;
4082 	}
4083 
4084 	DBUG_RETURN(err);
4085 }
4086 
4087 /*********************************************************************//**
4088 Drops a table for MySQL as a background operation. MySQL relies on Unix
4089 in ALTER TABLE to the fact that the table handler does not remove the
4090 table before all handles to it has been removed. Furhermore, the MySQL's
4091 call to drop table must be non-blocking. Therefore we do the drop table
4092 as a background operation, which is taken care of by the master thread
4093 in srv0srv.cc.
4094 @return error code or DB_SUCCESS */
4095 static
4096 dberr_t
row_drop_table_for_mysql_in_background(const char * name)4097 row_drop_table_for_mysql_in_background(
4098 /*===================================*/
4099 	const char*	name)	/*!< in: table name */
4100 {
4101 	dberr_t	error;
4102 	trx_t*	trx;
4103 
4104 	trx = trx_allocate_for_background();
4105 
4106 	/* If the original transaction was dropping a table referenced by
4107 	foreign keys, we must set the following to be able to drop the
4108 	table: */
4109 
4110 	trx->check_foreigns = false;
4111 
4112 	/* Try to drop the table in InnoDB */
4113 
4114 	error = row_drop_table_for_mysql(name, trx, FALSE);
4115 
4116 	/* Flush the log to reduce probability that the .frm files and
4117 	the InnoDB data dictionary get out-of-sync if the user runs
4118 	with innodb_flush_log_at_trx_commit = 0 */
4119 
4120 	log_buffer_flush_to_disk();
4121 
4122 	trx_commit_for_mysql(trx);
4123 
4124 	trx_free_for_background(trx);
4125 
4126 	return(error);
4127 }
4128 
4129 /*********************************************************************//**
4130 The master thread in srv0srv.cc calls this regularly to drop tables which
4131 we must drop in background after queries to them have ended. Such lazy
4132 dropping of tables is needed in ALTER TABLE on Unix.
4133 @return how many tables dropped + remaining tables in list */
4134 ulint
row_drop_tables_for_mysql_in_background(void)4135 row_drop_tables_for_mysql_in_background(void)
4136 /*=========================================*/
4137 {
4138 	row_mysql_drop_t*	drop;
4139 	dict_table_t*		table;
4140 	ulint			n_tables;
4141 	ulint			n_tables_dropped = 0;
4142 loop:
4143 	mutex_enter(&row_drop_list_mutex);
4144 
4145 	ut_a(row_mysql_drop_list_inited);
4146 
4147 	drop = UT_LIST_GET_FIRST(row_mysql_drop_list);
4148 
4149 	n_tables = UT_LIST_GET_LEN(row_mysql_drop_list);
4150 
4151 	mutex_exit(&row_drop_list_mutex);
4152 
4153 	if (drop == NULL) {
4154 		/* All tables dropped */
4155 
4156 		return(n_tables + n_tables_dropped);
4157 	}
4158 
4159 	DBUG_EXECUTE_IF("row_drop_tables_in_background_sleep",
4160 		os_thread_sleep(5000000);
4161 	);
4162 
4163 	table = dict_table_open_on_name(drop->table_name, FALSE, FALSE,
4164 					DICT_ERR_IGNORE_NONE);
4165 
4166 	if (table == NULL) {
4167 		/* If for some reason the table has already been dropped
4168 		through some other mechanism, do not try to drop it */
4169 
4170 		goto already_dropped;
4171 	}
4172 
4173 	if (!table->to_be_dropped) {
4174 		/* There is a scenario: the old table is dropped
4175 		just after it's added into drop list, and new
4176 		table with the same name is created, then we try
4177 		to drop the new table in background. */
4178 		dict_table_close(table, FALSE, FALSE);
4179 
4180 		goto already_dropped;
4181 	}
4182 
4183 	ut_a(!table->can_be_evicted);
4184 
4185 	dict_table_close(table, FALSE, FALSE);
4186 
4187 	if (DB_SUCCESS != row_drop_table_for_mysql_in_background(
4188 		    drop->table_name)) {
4189 		/* If the DROP fails for some table, we return, and let the
4190 		main thread retry later */
4191 
4192 		return(n_tables + n_tables_dropped);
4193 	}
4194 
4195 	n_tables_dropped++;
4196 
4197 already_dropped:
4198 	mutex_enter(&row_drop_list_mutex);
4199 
4200 	UT_LIST_REMOVE(row_mysql_drop_list, drop);
4201 
4202 	MONITOR_DEC(MONITOR_BACKGROUND_DROP_TABLE);
4203 
4204 	ib::info() << "Dropped table "
4205 		<< ut_get_name(NULL, drop->table_name)
4206 		<< " in background drop queue.",
4207 
4208 	ut_free(drop->table_name);
4209 
4210 	ut_free(drop);
4211 
4212 	mutex_exit(&row_drop_list_mutex);
4213 
4214 	goto loop;
4215 }
4216 
4217 /*********************************************************************//**
4218 Get the background drop list length. NOTE: the caller must own the
4219 drop list mutex!
4220 @return how many tables in list */
4221 ulint
row_get_background_drop_list_len_low(void)4222 row_get_background_drop_list_len_low(void)
4223 /*======================================*/
4224 {
4225 	ulint	len;
4226 
4227 	mutex_enter(&row_drop_list_mutex);
4228 
4229 	ut_a(row_mysql_drop_list_inited);
4230 
4231 	len = UT_LIST_GET_LEN(row_mysql_drop_list);
4232 
4233 	mutex_exit(&row_drop_list_mutex);
4234 
4235 	return(len);
4236 }
4237 
4238 /*********************************************************************//**
4239 If a table is not yet in the drop list, adds the table to the list of tables
4240 which the master thread drops in background. We need this on Unix because in
4241 ALTER TABLE MySQL may call drop table even if the table has running queries on
4242 it. Also, if there are running foreign key checks on the table, we drop the
4243 table lazily.
4244 @return TRUE if the table was not yet in the drop list, and was added there */
4245 static
4246 ibool
row_add_table_to_background_drop_list(const char * name)4247 row_add_table_to_background_drop_list(
4248 /*==================================*/
4249 	const char*	name)	/*!< in: table name */
4250 {
4251 	row_mysql_drop_t*	drop;
4252 
4253 	mutex_enter(&row_drop_list_mutex);
4254 
4255 	ut_a(row_mysql_drop_list_inited);
4256 
4257 	/* Look if the table already is in the drop list */
4258 	for (drop = UT_LIST_GET_FIRST(row_mysql_drop_list);
4259 	     drop != NULL;
4260 	     drop = UT_LIST_GET_NEXT(row_mysql_drop_list, drop)) {
4261 
4262 		if (strcmp(drop->table_name, name) == 0) {
4263 			/* Already in the list */
4264 
4265 			mutex_exit(&row_drop_list_mutex);
4266 
4267 			return(FALSE);
4268 		}
4269 	}
4270 
4271 	drop = static_cast<row_mysql_drop_t*>(
4272 		ut_malloc_nokey(sizeof(row_mysql_drop_t)));
4273 
4274 	drop->table_name = mem_strdup(name);
4275 
4276 	UT_LIST_ADD_LAST(row_mysql_drop_list, drop);
4277 
4278 	MONITOR_INC(MONITOR_BACKGROUND_DROP_TABLE);
4279 
4280 	mutex_exit(&row_drop_list_mutex);
4281 
4282 	return(TRUE);
4283 }
4284 
4285 /** Reassigns the table identifier of a table.
4286 @param[in,out]	table	table
4287 @param[in,out]	trx	transaction
4288 @param[out]	new_id	new table id
4289 @return error code or DB_SUCCESS */
4290 dberr_t
row_mysql_table_id_reassign(dict_table_t * table,trx_t * trx,table_id_t * new_id)4291 row_mysql_table_id_reassign(
4292 	dict_table_t*	table,
4293 	trx_t*		trx,
4294 	table_id_t*	new_id)
4295 {
4296 	dberr_t		err;
4297 	pars_info_t*	info	= pars_info_create();
4298 
4299 	dict_hdr_get_new_id(new_id, NULL, NULL, table, false);
4300 
4301 	/* Remove all locks except the table-level S and X locks. */
4302 	lock_remove_all_on_table(table, FALSE);
4303 
4304 	pars_info_add_ull_literal(info, "old_id", table->id);
4305 	pars_info_add_ull_literal(info, "new_id", *new_id);
4306 
4307 	/* As micro-SQL does not support int4 == int8 comparisons,
4308 	old and new IDs are added again under different names as
4309 	int4 values */
4310 	pars_info_add_int4_literal(info, "old_id_narrow", table->id);
4311 	pars_info_add_int4_literal(info, "new_id_narrow", *new_id);
4312 
4313 	err = que_eval_sql(
4314 		info,
4315 		"PROCEDURE RENUMBER_TABLE_PROC () IS\n"
4316 		"BEGIN\n"
4317 		"UPDATE SYS_TABLES SET ID = :new_id\n"
4318 		" WHERE ID = :old_id;\n"
4319 		"UPDATE SYS_COLUMNS SET TABLE_ID = :new_id\n"
4320 		" WHERE TABLE_ID = :old_id;\n"
4321 		"UPDATE SYS_INDEXES SET TABLE_ID = :new_id\n"
4322 		" WHERE TABLE_ID = :old_id;\n"
4323 		"UPDATE SYS_VIRTUAL SET TABLE_ID = :new_id\n"
4324 		" WHERE TABLE_ID = :old_id;\n"
4325 		"UPDATE SYS_ZIP_DICT_COLS SET TABLE_ID = :new_id_narrow\n"
4326 		" WHERE TABLE_ID = :old_id_narrow;\n"
4327 		"END;\n", FALSE, trx);
4328 
4329 	return(err);
4330 }
4331 
4332 /*********************************************************************//**
4333 Setup the pre-requisites for DISCARD TABLESPACE. It will start the transaction,
4334 acquire the data dictionary lock in X mode and open the table.
4335 @return table instance or 0 if not found. */
4336 static
4337 dict_table_t*
row_discard_tablespace_begin(const char * name,trx_t * trx)4338 row_discard_tablespace_begin(
4339 /*=========================*/
4340 	const char*	name,	/*!< in: table name */
4341 	trx_t*		trx)	/*!< in: transaction handle */
4342 {
4343 	trx->op_info = "discarding tablespace";
4344 
4345 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
4346 
4347 	trx_start_if_not_started_xa(trx, true);
4348 
4349 	/* Serialize data dictionary operations with dictionary mutex:
4350 	this is to avoid deadlocks during data dictionary operations */
4351 
4352 	row_mysql_lock_data_dictionary(trx);
4353 
4354 	dict_table_t*	table;
4355 
4356 	table = dict_table_open_on_name(
4357 		name, TRUE, FALSE, DICT_ERR_IGNORE_NONE);
4358 
4359 	if (table) {
4360 		dict_stats_wait_bg_to_stop_using_table(table, trx);
4361 		ut_a(!is_system_tablespace(table->space));
4362 		ut_a(table->n_foreign_key_checks_running == 0);
4363 	}
4364 
4365 	return(table);
4366 }
4367 
4368 /*********************************************************************//**
4369 Do the foreign key constraint checks.
4370 @return DB_SUCCESS or error code. */
4371 static
4372 dberr_t
row_discard_tablespace_foreign_key_checks(const trx_t * trx,const dict_table_t * table)4373 row_discard_tablespace_foreign_key_checks(
4374 /*======================================*/
4375 	const trx_t*		trx,	/*!< in: transaction handle */
4376 	const dict_table_t*	table)	/*!< in: table to be discarded */
4377 {
4378 
4379 	if (srv_read_only_mode || !trx->check_foreigns) {
4380 		return(DB_SUCCESS);
4381 	}
4382 
4383 	/* Check if the table is referenced by foreign key constraints from
4384 	some other table (not the table itself) */
4385 	dict_foreign_set::iterator	it
4386 		= std::find_if(table->referenced_set.begin(),
4387 			       table->referenced_set.end(),
4388 			       dict_foreign_different_tables());
4389 
4390 	if (it == table->referenced_set.end()) {
4391 		return(DB_SUCCESS);
4392 	}
4393 
4394 	const dict_foreign_t*	foreign	= *it;
4395 	FILE*			ef	= dict_foreign_err_file;
4396 
4397 	ut_ad(foreign->foreign_table != table);
4398 	ut_ad(foreign->referenced_table == table);
4399 
4400 	/* We only allow discarding a referenced table if
4401 	FOREIGN_KEY_CHECKS is set to 0 */
4402 
4403 	mutex_enter(&dict_foreign_err_mutex);
4404 
4405 	rewind(ef);
4406 
4407 	ut_print_timestamp(ef);
4408 
4409 	fputs("  Cannot DISCARD table ", ef);
4410 	ut_print_name(ef, trx, table->name.m_name);
4411 	fputs("\n"
4412 	      "because it is referenced by ", ef);
4413 	ut_print_name(ef, trx, foreign->foreign_table_name);
4414 	putc('\n', ef);
4415 
4416 	mutex_exit(&dict_foreign_err_mutex);
4417 
4418 	return(DB_CANNOT_DROP_CONSTRAINT);
4419 }
4420 
4421 /*********************************************************************//**
4422 Cleanup after the DISCARD TABLESPACE operation.
4423 @return error code. */
4424 static
4425 dberr_t
row_discard_tablespace_end(trx_t * trx,dict_table_t * table,dberr_t err)4426 row_discard_tablespace_end(
4427 /*=======================*/
4428 	trx_t*		trx,	/*!< in/out: transaction handle */
4429 	dict_table_t*	table,	/*!< in/out: table to be discarded */
4430 	dberr_t		err)	/*!< in: error code */
4431 {
4432 	if (table != 0) {
4433 		dict_table_close(table, TRUE, FALSE);
4434 	}
4435 
4436 	DBUG_EXECUTE_IF("ib_discard_before_commit_crash",
4437 			log_make_checkpoint_at(LSN_MAX, TRUE);
4438 			DBUG_SUICIDE(););
4439 
4440 	trx_commit_for_mysql(trx);
4441 
4442 	DBUG_EXECUTE_IF("ib_discard_after_commit_crash",
4443 			log_make_checkpoint_at(LSN_MAX, TRUE);
4444 			DBUG_SUICIDE(););
4445 
4446 	row_mysql_unlock_data_dictionary(trx);
4447 
4448 	trx->op_info = "";
4449 
4450 	return(err);
4451 }
4452 
4453 /*********************************************************************//**
4454 Do the DISCARD TABLESPACE operation.
4455 @return DB_SUCCESS or error code. */
4456 static
4457 dberr_t
row_discard_tablespace(trx_t * trx,dict_table_t * table)4458 row_discard_tablespace(
4459 /*===================*/
4460 	trx_t*		trx,	/*!< in/out: transaction handle */
4461 	dict_table_t*	table)	/*!< in/out: table to be discarded */
4462 {
4463 	dberr_t		err;
4464 
4465 	/* How do we prevent crashes caused by ongoing operations on
4466 	the table? Old operations could try to access non-existent
4467 	pages. MySQL will block all DML on the table using MDL and a
4468 	DISCARD will not start unless all existing operations on the
4469 	table to be discarded are completed.
4470 
4471 	1) Acquire the data dictionary latch in X mode. To prevent any
4472 	internal operations that MySQL is not aware off and also for
4473 	the internal SQL parser.
4474 
4475 	2) Purge and rollback: we assign a new table id for the
4476 	table. Since purge and rollback look for the table based on
4477 	the table id, they see the table as 'dropped' and discard
4478 	their operations.
4479 
4480 	3) Insert buffer: we remove all entries for the tablespace in
4481 	the insert buffer tree.
4482 
4483 	4) FOREIGN KEY operations: if table->n_foreign_key_checks_running > 0,
4484 	we do not allow the discard. */
4485 
4486 	/* Play safe and remove all insert buffer entries, though we should
4487 	have removed them already when DISCARD TABLESPACE was called */
4488 
4489 	ibuf_delete_for_discarded_space(table->space);
4490 
4491 	table_id_t	new_id;
4492 
4493 	/* Set the TABLESPACE DISCARD flag in the table definition
4494 	on disk. */
4495 	err = row_import_update_discarded_flag(
4496 		trx, table->id, true, true);
4497 
4498 	if (err != DB_SUCCESS) {
4499 		return(err);
4500 	}
4501 
4502 	/* Update the index root pages in the system tables, on disk */
4503 	err = row_import_update_index_root(trx, table, true, true);
4504 
4505 	if (err != DB_SUCCESS) {
4506 		return(err);
4507 	}
4508 
4509 	/* Drop all the FTS auxiliary tables. */
4510 	if (dict_table_has_fts_index(table)
4511 	    || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
4512 
4513 		fts_drop_tables(trx, table);
4514 	}
4515 
4516 	/* Assign a new space ID to the table definition so that purge
4517 	can ignore the changes. Update the system table on disk. */
4518 
4519 	err = row_mysql_table_id_reassign(table, trx, &new_id);
4520 
4521 	if (err != DB_SUCCESS) {
4522 		return(err);
4523 	}
4524 
4525 	/* Discard the physical file that is used for the tablespace. */
4526 
4527 	err = fil_discard_tablespace(table->space);
4528 
4529 	switch (err) {
4530 	case DB_SUCCESS:
4531 	case DB_IO_ERROR:
4532 	case DB_TABLESPACE_NOT_FOUND:
4533 		/* All persistent operations successful, update the
4534 		data dictionary memory cache. */
4535 
4536 		table->set_file_unreadable();
4537 
4538 		table->flags2 |= DICT_TF2_DISCARDED;
4539 
4540 		dict_table_change_id_in_cache(table, new_id);
4541 
4542 		/* Reset the root page numbers. */
4543 
4544 		for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
4545 		     index != 0;
4546 		     index = UT_LIST_GET_NEXT(indexes, index)) {
4547 
4548 			index->page = FIL_NULL;
4549 			index->space = FIL_NULL;
4550 		}
4551 
4552 		/* If the tablespace did not already exist or we couldn't
4553 		write to it, we treat that as a successful DISCARD. It is
4554 		unusable anyway. */
4555 
4556 		err = DB_SUCCESS;
4557 		break;
4558 
4559 	default:
4560 		/* We need to rollback the disk changes, something failed. */
4561 
4562 		trx->error_state = DB_SUCCESS;
4563 
4564 		trx_rollback_to_savepoint(trx, NULL);
4565 
4566 		trx->error_state = DB_SUCCESS;
4567 	}
4568 
4569 	return(err);
4570 }
4571 
4572 /*********************************************************************//**
4573 Discards the tablespace of a table which stored in an .ibd file. Discarding
4574 means that this function renames the .ibd file and assigns a new table id for
4575 the table. Also the flag table->file_unreadable is set to TRUE.
4576 @return error code or DB_SUCCESS */
4577 dberr_t
row_discard_tablespace_for_mysql(const char * name,trx_t * trx)4578 row_discard_tablespace_for_mysql(
4579 /*=============================*/
4580 	const char*	name,	/*!< in: table name */
4581 	trx_t*		trx)	/*!< in: transaction handle */
4582 {
4583 	dberr_t		err;
4584 	dict_table_t*	table;
4585 
4586 	/* Open the table and start the transaction if not started. */
4587 
4588 	table = row_discard_tablespace_begin(name, trx);
4589 
4590 	if (table == 0) {
4591 		err = DB_TABLE_NOT_FOUND;
4592 	} else if (dict_table_is_temporary(table)) {
4593 
4594 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4595 			    ER_CANNOT_DISCARD_TEMPORARY_TABLE);
4596 
4597 		err = DB_ERROR;
4598 
4599 	} else if (table->space == srv_sys_space.space_id()) {
4600 		char	table_name[MAX_FULL_NAME_LEN + 1];
4601 
4602 		innobase_format_name(
4603 			table_name, sizeof(table_name),
4604 			table->name.m_name);
4605 
4606 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4607 			    ER_TABLE_IN_SYSTEM_TABLESPACE, table_name);
4608 
4609 		err = DB_ERROR;
4610 
4611 	} else if (table->n_foreign_key_checks_running > 0) {
4612 		char	table_name[MAX_FULL_NAME_LEN + 1];
4613 
4614 		innobase_format_name(
4615 			table_name, sizeof(table_name),
4616 			table->name.m_name);
4617 
4618 		ib_senderrf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
4619 			    ER_DISCARD_FK_CHECKS_RUNNING, table_name);
4620 
4621 		err = DB_ERROR;
4622 
4623 	} else {
4624 		/* Do foreign key constraint checks. */
4625 
4626 		err = row_discard_tablespace_foreign_key_checks(trx, table);
4627 
4628 		if (err == DB_SUCCESS) {
4629 			err = row_discard_tablespace(trx, table);
4630 		}
4631 	}
4632 
4633 	return(row_discard_tablespace_end(trx, table, err));
4634 }
4635 
4636 /*********************************************************************//**
4637 Sets an exclusive lock on a table.
4638 @return error code or DB_SUCCESS */
4639 dberr_t
row_mysql_lock_table(trx_t * trx,dict_table_t * table,enum lock_mode mode,const char * op_info)4640 row_mysql_lock_table(
4641 /*=================*/
4642 	trx_t*		trx,		/*!< in/out: transaction */
4643 	dict_table_t*	table,		/*!< in: table to lock */
4644 	enum lock_mode	mode,		/*!< in: LOCK_X or LOCK_S */
4645 	const char*	op_info)	/*!< in: string for trx->op_info */
4646 {
4647 	mem_heap_t*	heap;
4648 	que_thr_t*	thr;
4649 	dberr_t		err;
4650 	sel_node_t*	node;
4651 
4652 	ut_ad(trx);
4653 	ut_ad(mode == LOCK_X || mode == LOCK_S);
4654 
4655 	heap = mem_heap_create(512);
4656 
4657 	trx->op_info = op_info;
4658 
4659 	node = sel_node_create(heap);
4660 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
4661 	thr->graph->state = QUE_FORK_ACTIVE;
4662 
4663 	/* We use the select query graph as the dummy graph needed
4664 	in the lock module call */
4665 
4666 	thr = que_fork_get_first_thr(
4667 		static_cast<que_fork_t*>(que_node_get_parent(thr)));
4668 
4669 	que_thr_move_to_run_state_for_mysql(thr, trx);
4670 
4671 run_again:
4672 	thr->run_node = thr;
4673 	thr->prev_node = thr->common.parent;
4674 
4675 	err = lock_table(0, table, mode, thr);
4676 
4677 	trx->error_state = err;
4678 
4679 	if (err == DB_SUCCESS) {
4680 		que_thr_stop_for_mysql_no_error(thr, trx);
4681 	} else {
4682 		que_thr_stop_for_mysql(thr);
4683 
4684 		if (err != DB_QUE_THR_SUSPENDED) {
4685 			ibool	was_lock_wait;
4686 
4687 			was_lock_wait = row_mysql_handle_errors(
4688 				&err, trx, thr, NULL);
4689 
4690 			if (was_lock_wait) {
4691 				goto run_again;
4692 			}
4693 		} else {
4694 			que_thr_t*	run_thr;
4695 			que_node_t*	parent;
4696 
4697 			parent = que_node_get_parent(thr);
4698 
4699 			run_thr = que_fork_start_command(
4700 				static_cast<que_fork_t*>(parent));
4701 
4702 			ut_a(run_thr == thr);
4703 
4704 			/* There was a lock wait but the thread was not
4705 			in a ready to run or running state. */
4706 			trx->error_state = DB_LOCK_WAIT;
4707 
4708 			goto run_again;
4709 		}
4710 	}
4711 
4712 	que_graph_free(thr->graph);
4713 	trx->op_info = "";
4714 
4715 	return(err);
4716 }
4717 
4718 /** Drop ancillary FTS tables as part of dropping a table.
4719 @param[in,out]	table		Table cache entry
4720 @param[in,out]	trx		Transaction handle
4721 @return error code or DB_SUCCESS */
4722 UNIV_INLINE
4723 dberr_t
row_drop_ancillary_fts_tables(dict_table_t * table,trx_t * trx)4724 row_drop_ancillary_fts_tables(
4725 	dict_table_t*	table,
4726 	trx_t*		trx)
4727 {
4728 	/* Drop ancillary FTS tables */
4729 	if (dict_table_has_fts_index(table)
4730 	    || DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
4731 
4732 		ut_ad(table->get_ref_count() == 0);
4733 		ut_ad(trx_is_started(trx));
4734 
4735 		dberr_t err = fts_drop_tables(trx, table);
4736 
4737 		if (err != DB_SUCCESS) {
4738 			ib::error() << " Unable to remove ancillary FTS"
4739 				" tables for table "
4740 				<< table->name << " : " << ut_strerr(err);
4741 
4742 			return(err);
4743 		}
4744 	}
4745 
4746 	/* The table->fts flag can be set on the table for which
4747 	the cluster index is being rebuilt. Such table might not have
4748 	DICT_TF2_FTS flag set. So keep this out of above
4749 	dict_table_has_fts_index condition */
4750 	if (table->fts != NULL) {
4751 		/* Need to set TABLE_DICT_LOCKED bit, since
4752 		fts_que_graph_free_check_lock would try to acquire
4753 		dict mutex lock */
4754 		table->fts->fts_status |= TABLE_DICT_LOCKED;
4755 
4756 		fts_free(table);
4757 	}
4758 
4759 	return(DB_SUCCESS);
4760 }
4761 
4762 /** Drop a table from the memory cache as part of dropping a table.
4763 @param[in]	tablename	A copy of table->name. Used when table == null
4764 @param[in,out]	table		Table cache entry
4765 @param[in,out]	trx		Transaction handle
4766 @return error code or DB_SUCCESS */
4767 UNIV_INLINE
4768 dberr_t
row_drop_table_from_cache(const char * tablename,dict_table_t * table,trx_t * trx)4769 row_drop_table_from_cache(
4770 	const char*	tablename,
4771 	dict_table_t*	table,
4772 	trx_t*		trx)
4773 {
4774 	dberr_t	err = DB_SUCCESS;
4775 	bool	is_temp = dict_table_is_temporary(table);
4776 
4777 	/* Remove the pointer to this table object from the list
4778 	of modified tables by the transaction because the object
4779 	is going to be destroyed below. */
4780 	trx->mod_tables.erase(table);
4781 
4782 	if (!dict_table_is_intrinsic(table)) {
4783 		dict_table_remove_from_cache(table);
4784 	} else {
4785 		for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
4786 		     index != NULL;
4787 		     index = UT_LIST_GET_FIRST(table->indexes)) {
4788 
4789 			rw_lock_free(&index->lock);
4790 
4791 			UT_LIST_REMOVE(table->indexes, index);
4792 
4793 			dict_mem_index_free(index);
4794 		}
4795 
4796 		dict_mem_table_free(table);
4797 		table = NULL;
4798 	}
4799 
4800 	if (!is_temp
4801 	    && dict_load_table(tablename, true,
4802 			       DICT_ERR_IGNORE_NONE) != NULL) {
4803 		ib::error() << "Not able to remove table "
4804 			<< ut_get_name(trx, tablename)
4805 			<< " from the dictionary cache!";
4806 		err = DB_ERROR;
4807 	}
4808 
4809 	return(err);
4810 }
4811 
4812 /** Drop a single-table tablespace as part of dropping or renaming a table.
4813 This deletes the fil_space_t if found and the file on disk.
4814 @param[in]	space_id	Tablespace ID
4815 @param[in]	tablename	Table name, same as the tablespace name
4816 @param[in]	filepath	File path of tablespace to delete
4817 @param[in]	is_temp		Is this a temporary table/tablespace
4818 @param[in]	is_encrypted	Is this an encrypted table/tablespace
4819 @param[in]	trx		Transaction handle
4820 @return error code or DB_SUCCESS */
4821 UNIV_INLINE
4822 dberr_t
row_drop_single_table_tablespace(ulint space_id,const char * tablename,const char * filepath,bool is_temp,bool is_encrypted,trx_t * trx)4823 row_drop_single_table_tablespace(
4824 	ulint		space_id,
4825 	const char*	tablename,
4826 	const char*	filepath,
4827 	bool		is_temp,
4828 	bool		is_encrypted,
4829 	trx_t*		trx)
4830 {
4831 	dberr_t	err = DB_SUCCESS;
4832 
4833 	/* This might be a temporary single-table tablespace if the table
4834 	is compressed and temporary. If so, don't spam the log when we
4835 	delete one of these or if we can't find the tablespace. */
4836 	bool	print_msg = !is_temp && !is_encrypted;
4837 
4838 	/* If the tablespace is not in the cache, just delete the file. */
4839 	if (!fil_space_for_table_exists_in_mem(
4840 		    space_id, tablename, print_msg, false, NULL, 0)) {
4841 
4842 		/* Force a delete of any discarded or temporary files. */
4843 		fil_delete_file(filepath);
4844 
4845 		if (print_msg) {
4846 			ib::info() << "Removed datafile " << filepath
4847 				<< " for table " << tablename;
4848 		}
4849 
4850 	} else if (fil_delete_tablespace(space_id, BUF_REMOVE_FLUSH_NO_WRITE)
4851 		   != DB_SUCCESS) {
4852 
4853 		ib::error() << "We removed the InnoDB internal data"
4854 			" dictionary entry of table " << tablename
4855 			<< " but we are not able to delete the tablespace "
4856 			<< space_id << " file " << filepath << "!";
4857 
4858 		err = DB_ERROR;
4859 	}
4860 
4861 	return(err);
4862 }
4863 
4864 /** Drop a table for MySQL.
4865 If the data dictionary was not already locked by the transaction,
4866 the transaction will be committed.  Otherwise, the data dictionary
4867 will remain locked.
4868 @param[in]	name		Table name
4869 @param[in]	trx		Transaction handle
4870 @param[in]	drop_db		true=dropping whole database
4871 @param[in]	nonatomic	Whether it is permitted to release
4872 and reacquire dict_operation_lock
4873 @param[in,out]	handler		Table handler
4874 @return error code or DB_SUCCESS */
4875 dberr_t
row_drop_table_for_mysql(const char * name,trx_t * trx,bool drop_db,bool nonatomic,dict_table_t * handler)4876 row_drop_table_for_mysql(
4877 	const char*	name,
4878 	trx_t*		trx,
4879 	bool		drop_db,
4880 	bool		nonatomic,
4881 	dict_table_t*	handler)
4882 {
4883 	dberr_t		err;
4884 	dict_foreign_t*	foreign;
4885 	dict_table_t*	table			= NULL;
4886 	char*		filepath		= NULL;
4887 	char*		tablename		= NULL;
4888 	bool		locked_dictionary	= false;
4889 	pars_info_t*	info			= NULL;
4890 	mem_heap_t*	heap			= NULL;
4891 	bool		is_intrinsic_temp_table	= false;
4892 	bool		was_master_key_id_mutex_locked	= false;
4893 	bool		page0_has_crypt_data = false;
4894 
4895 	DBUG_ENTER("row_drop_table_for_mysql");
4896 	DBUG_PRINT("row_drop_table_for_mysql", ("table: '%s'", name));
4897 
4898 	ut_a(name != NULL);
4899 
4900 	/* Serialize data dictionary operations with dictionary mutex:
4901 	no deadlocks can occur then in these operations */
4902 
4903 	trx->op_info = "dropping table";
4904 
4905 	if (handler != NULL && dict_table_is_intrinsic(handler)) {
4906 		table = handler;
4907 		is_intrinsic_temp_table = true;
4908 	}
4909 
4910 	if (table == NULL) {
4911 
4912 		if (trx->dict_operation_lock_mode != RW_X_LATCH) {
4913 			/* Prevent foreign key checks etc. while we are
4914 			dropping the table */
4915 
4916 			row_mysql_lock_data_dictionary(trx);
4917 
4918 			locked_dictionary = true;
4919 			nonatomic = true;
4920 		}
4921 
4922 		ut_ad(mutex_own(&dict_sys->mutex));
4923 		ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
4924 
4925 		table = dict_table_open_on_name(
4926 			name, TRUE, FALSE,
4927 			static_cast<dict_err_ignore_t>(
4928 				DICT_ERR_IGNORE_INDEX_ROOT
4929 				| DICT_ERR_IGNORE_CORRUPT));
4930 	} else {
4931 		table->acquire();
4932 		ut_ad(dict_table_is_intrinsic(table));
4933 	}
4934 
4935 	if (!table) {
4936 		err = DB_TABLE_NOT_FOUND;
4937 		goto funct_exit;
4938 	}
4939 
4940 	/* This function is called recursively via fts_drop_tables(). */
4941 	if (!trx_is_started(trx)) {
4942 
4943 		if (!dict_table_is_temporary(table)) {
4944 			trx_start_for_ddl(trx, TRX_DICT_OP_TABLE);
4945 		} else {
4946 			trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
4947 		}
4948 	}
4949 
4950 	/* Turn on this drop bit before we could release the dictionary
4951 	latch */
4952 	table->to_be_dropped = true;
4953 
4954 	if (nonatomic) {
4955 		/* This trx did not acquire any locks on dictionary
4956 		table records yet. Thus it is safe to release and
4957 		reacquire the data dictionary latches. */
4958 		if (table->fts) {
4959 			ut_ad(!table->fts->add_wq);
4960 			ut_ad(lock_trx_has_sys_table_locks(trx) == 0);
4961 
4962 			for (;;) {
4963 				bool retry = false;
4964 				if (dict_fts_index_syncing(table)) {
4965 					retry = true;
4966 				}
4967 				if (!retry) {
4968 			        break;
4969 				}
4970 				DICT_BG_YIELD(trx);
4971 			}
4972 
4973 			row_mysql_unlock_data_dictionary(trx);
4974 			fts_optimize_remove_table(table);
4975 			row_mysql_lock_data_dictionary(trx);
4976 		}
4977 
4978 		/* Do not bother to deal with persistent stats for temp
4979 		tables since we know temp tables do not use persistent
4980 		stats. */
4981 		if (!dict_table_is_temporary(table)) {
4982 			dict_stats_wait_bg_to_stop_using_table(
4983 				table, trx);
4984 		}
4985 	}
4986 
4987 	/* make sure background stats thread is not running on the table */
4988 	ut_ad(!(table->stats_bg_flag & BG_STAT_IN_PROGRESS));
4989 
4990 	/* Delete the link file if used. */
4991 	if (DICT_TF_HAS_DATA_DIR(table->flags)) {
4992 		RemoteDatafile::delete_link_file(name);
4993 	}
4994 
4995 	if (!dict_table_is_temporary(table)) {
4996 
4997 		dict_stats_recalc_pool_del(table);
4998 
4999 		/* Remove stats for this table and all of its indexes from the
5000 		persistent storage if it exists and if there are stats for this
5001 		table in there. This function creates its own trx and commits
5002 		it. */
5003 		if (dict_stats_is_persistent_enabled(table)) {
5004 			char	errstr[1024];
5005 			err = dict_stats_drop_table(name, errstr, sizeof(errstr));
5006 			if (err != DB_SUCCESS) {
5007 				ib::warn() << errstr;
5008 			}
5009 		}
5010 	}
5011 
5012 	if (!dict_table_is_intrinsic(table)) {
5013 		dict_table_prevent_eviction(table);
5014 	}
5015 
5016 	dict_table_close(table, TRUE, FALSE);
5017 
5018 	/* Check if the table is referenced by foreign key constraints from
5019 	some other table (not the table itself) */
5020 
5021 	if (!srv_read_only_mode && trx->check_foreigns) {
5022 
5023 		for (dict_foreign_set::iterator it
5024 			= table->referenced_set.begin();
5025 		     it != table->referenced_set.end();
5026 		     ++it) {
5027 
5028 			foreign = *it;
5029 
5030 			const bool	ref_ok = drop_db
5031 				&& dict_tables_have_same_db(
5032 					name,
5033 					foreign->foreign_table_name_lookup);
5034 
5035 			if (foreign->foreign_table != table && !ref_ok) {
5036 
5037 				FILE*	ef	= dict_foreign_err_file;
5038 
5039 				/* We only allow dropping a referenced table
5040 				if FOREIGN_KEY_CHECKS is set to 0 */
5041 
5042 				err = DB_CANNOT_DROP_CONSTRAINT;
5043 
5044 				mutex_enter(&dict_foreign_err_mutex);
5045 				rewind(ef);
5046 				ut_print_timestamp(ef);
5047 
5048 				fputs("  Cannot drop table ", ef);
5049 				ut_print_name(ef, trx, name);
5050 				fputs("\n"
5051 				      "because it is referenced by ", ef);
5052 				ut_print_name(ef, trx,
5053 					      foreign->foreign_table_name);
5054 				putc('\n', ef);
5055 				mutex_exit(&dict_foreign_err_mutex);
5056 
5057 				goto funct_exit;
5058 			}
5059 		}
5060 	}
5061 
5062 
5063 	DBUG_EXECUTE_IF("row_drop_table_add_to_background",
5064 		row_add_table_to_background_drop_list(table->name.m_name);
5065 		err = DB_SUCCESS;
5066 		goto funct_exit;
5067 	);
5068 
5069 	/* TODO: could we replace the counter n_foreign_key_checks_running
5070 	with lock checks on the table? Acquire here an exclusive lock on the
5071 	table, and rewrite lock0lock.cc and the lock wait in srv0srv.cc so that
5072 	they can cope with the table having been dropped here? Foreign key
5073 	checks take an IS or IX lock on the table. */
5074 
5075 	if (table->n_foreign_key_checks_running > 0) {
5076 
5077 		const char*	save_tablename = table->name.m_name;
5078 		ibool		added;
5079 
5080 		added = row_add_table_to_background_drop_list(save_tablename);
5081 
5082 		if (added) {
5083 			ib::info() << "You are trying to drop table "
5084 				<< table->name
5085 				<< " though there is a foreign key check"
5086 				" running on it. Adding the table to the"
5087 				" background drop queue.";
5088 
5089 			/* We return DB_SUCCESS to MySQL though the drop will
5090 			happen lazily later */
5091 
5092 			err = DB_SUCCESS;
5093 		} else {
5094 			/* The table is already in the background drop list */
5095 			err = DB_ERROR;
5096 		}
5097 
5098 		goto funct_exit;
5099 	}
5100 
5101 	/* Remove all locks that are on the table or its records, if there
5102 	are no references to the table but it has record locks, we release
5103 	the record locks unconditionally. One use case is:
5104 
5105 		CREATE TABLE t2 (PRIMARY KEY (a)) SELECT * FROM t1;
5106 
5107 	If after the user transaction has done the SELECT and there is a
5108 	problem in completing the CREATE TABLE operation, MySQL will drop
5109 	the table. InnoDB will create a new background transaction to do the
5110 	actual drop, the trx instance that is passed to this function. To
5111 	preserve existing behaviour we remove the locks but ideally we
5112 	shouldn't have to. There should never be record locks on a table
5113 	that is going to be dropped. */
5114 
5115 	if (table->get_ref_count() == 0) {
5116 		/* We don't take lock on intrinsic table so nothing to remove.*/
5117 		if (!dict_table_is_intrinsic(table)) {
5118 			lock_remove_all_on_table(table, TRUE);
5119 		}
5120 		ut_a(table->n_rec_locks == 0);
5121 	} else if (table->get_ref_count() > 0 || table->n_rec_locks > 0) {
5122 		ibool	added;
5123 
5124 		ut_ad(!dict_table_is_intrinsic(table));
5125 
5126 		added = row_add_table_to_background_drop_list(
5127 			table->name.m_name);
5128 
5129 		if (added) {
5130 			ib::info() << "MySQL is trying to drop table "
5131 				<< table->name
5132 				<< " though there are still open handles to"
5133 				" it. Adding the table to the background drop"
5134 				" queue.";
5135 
5136 			/* We return DB_SUCCESS to MySQL though the drop will
5137 			happen lazily later */
5138 			err = DB_SUCCESS;
5139 		} else {
5140 			/* The table is already in the background drop list */
5141 			err = DB_ERROR;
5142 		}
5143 
5144 		goto funct_exit;
5145 	}
5146 
5147 	/* The "to_be_dropped" marks table that is to be dropped, but
5148 	has not been dropped, instead, was put in the background drop
5149 	list due to being used by concurrent DML operations. Clear it
5150 	here since there are no longer any concurrent activities on it,
5151 	and it is free to be dropped */
5152 	table->to_be_dropped = false;
5153 
5154 	/* If we get this far then the table to be dropped must not have
5155 	any table or record locks on it. */
5156 
5157 	ut_a(dict_table_is_intrinsic(table) || !lock_table_has_locks(table));
5158 
5159 	switch (trx_get_dict_operation(trx)) {
5160 	case TRX_DICT_OP_NONE:
5161 		trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
5162 		trx->table_id = table->id;
5163 	case TRX_DICT_OP_TABLE:
5164 		break;
5165 	case TRX_DICT_OP_INDEX:
5166 		/* If the transaction was previously flagged as
5167 		TRX_DICT_OP_INDEX, we should be dropping auxiliary
5168 		tables for full-text indexes or temp tables. */
5169 		ut_ad(strstr(table->name.m_name, "/FTS_") != NULL
5170 		      || strstr(table->name.m_name, TEMP_FILE_PREFIX_INNODB)
5171 		      != NULL);
5172 	}
5173 
5174 	/* Mark all indexes unavailable in the data dictionary cache
5175 	before starting to drop the table. */
5176 
5177 	unsigned*	page_no;
5178 	unsigned*	page_nos;
5179 	heap = mem_heap_create(
5180 		200 + UT_LIST_GET_LEN(table->indexes) * sizeof *page_nos);
5181 	tablename = mem_heap_strdup(heap, name);
5182 
5183 	page_no = page_nos = static_cast<unsigned*>(
5184 		mem_heap_alloc(
5185 			heap,
5186 			UT_LIST_GET_LEN(table->indexes) * sizeof *page_no));
5187 
5188 	for (dict_index_t* index = dict_table_get_first_index(table);
5189 	     index != NULL;
5190 	     index = dict_table_get_next_index(index)) {
5191 		rw_lock_x_lock(dict_index_get_lock(index));
5192 		/* Save the page numbers so that we can restore them
5193 		if the operation fails. */
5194 		*page_no++ = index->page;
5195 		/* Mark the index unusable. */
5196 		index->page = FIL_NULL;
5197 		rw_lock_x_unlock(dict_index_get_lock(index));
5198 	}
5199 
5200 	/* As we don't insert entries to SYSTEM TABLES for temp-tables
5201 	we need to avoid running removal of these entries. */
5202 	if (!dict_table_is_temporary(table)) {
5203 		/* We use the private SQL parser of Innobase to generate the
5204 		query graphs needed in deleting the dictionary data from system
5205 		tables in Innobase. Deleting a row from SYS_INDEXES table also
5206 		frees the file segments of the B-tree associated with the
5207 		index. */
5208 
5209 		info = pars_info_create();
5210 
5211 		pars_info_add_str_literal(info, "table_name", name);
5212 
5213 		std::basic_string<char, std::char_traits<char>,
5214 				  ut_allocator<char> > sql;
5215 		sql.reserve(2000);
5216 
5217 		sql =	"PROCEDURE DROP_TABLE_PROC () IS\n"
5218 			"sys_foreign_id CHAR;\n"
5219 			"table_id CHAR;\n"
5220 			"index_id CHAR;\n"
5221 			"foreign_id CHAR;\n"
5222 			"space_id INT;\n"
5223 			"found INT;\n";
5224 
5225 		sql +=	"DECLARE CURSOR cur_fk IS\n"
5226 			"SELECT ID FROM SYS_FOREIGN\n"
5227 			"WHERE FOR_NAME = :table_name\n"
5228 			"AND TO_BINARY(FOR_NAME)\n"
5229 			"  = TO_BINARY(:table_name)\n"
5230 			"LOCK IN SHARE MODE;\n";
5231 
5232 		sql +=	"DECLARE CURSOR cur_idx IS\n"
5233 			"SELECT ID FROM SYS_INDEXES\n"
5234 			"WHERE TABLE_ID = table_id\n"
5235 			"LOCK IN SHARE MODE;\n";
5236 
5237 		sql +=	"BEGIN\n";
5238 
5239 		sql +=	"SELECT ID INTO table_id\n"
5240 			"FROM SYS_TABLES\n"
5241 			"WHERE NAME = :table_name\n"
5242 			"LOCK IN SHARE MODE;\n"
5243 			"IF (SQL % NOTFOUND) THEN\n"
5244 			"       RETURN;\n"
5245 			"END IF;\n";
5246 
5247 		sql +=	"SELECT SPACE INTO space_id\n"
5248 			"FROM SYS_TABLES\n"
5249 			"WHERE NAME = :table_name;\n"
5250 			"IF (SQL % NOTFOUND) THEN\n"
5251 			"       RETURN;\n"
5252 			"END IF;\n";
5253 
5254 		sql +=	"found := 1;\n"
5255 			"SELECT ID INTO sys_foreign_id\n"
5256 			"FROM SYS_TABLES\n"
5257 			"WHERE NAME = 'SYS_FOREIGN'\n"
5258 			"LOCK IN SHARE MODE;\n"
5259 			"IF (SQL % NOTFOUND) THEN\n"
5260 			"       found := 0;\n"
5261 			"END IF;\n"
5262 			"IF (:table_name = 'SYS_FOREIGN') THEN\n"
5263 			"       found := 0;\n"
5264 			"END IF;\n"
5265 			"IF (:table_name = 'SYS_FOREIGN_COLS') \n"
5266 			"THEN\n"
5267 			"       found := 0;\n"
5268 			"END IF;\n";
5269 
5270 		sql +=	"OPEN cur_fk;\n"
5271 			"WHILE found = 1 LOOP\n"
5272 			"       FETCH cur_fk INTO foreign_id;\n"
5273 			"       IF (SQL % NOTFOUND) THEN\n"
5274 			"               found := 0;\n"
5275 			"       ELSE\n"
5276 			"               DELETE FROM \n"
5277 			"		   SYS_FOREIGN_COLS\n"
5278 			"               WHERE ID = foreign_id;\n"
5279 			"               DELETE FROM SYS_FOREIGN\n"
5280 			"               WHERE ID = foreign_id;\n"
5281 			"       END IF;\n"
5282 			"END LOOP;\n"
5283 			"CLOSE cur_fk;\n";
5284 
5285 		sql +=	"found := 1;\n"
5286 			"OPEN cur_idx;\n"
5287 			"WHILE found = 1 LOOP\n"
5288 			"       FETCH cur_idx INTO index_id;\n"
5289 			"       IF (SQL % NOTFOUND) THEN\n"
5290 			"               found := 0;\n"
5291 			"       ELSE\n"
5292 			"               DELETE FROM SYS_FIELDS\n"
5293 			"               WHERE INDEX_ID = index_id;\n"
5294 			"               DELETE FROM SYS_INDEXES\n"
5295 			"               WHERE ID = index_id\n"
5296 			"               AND TABLE_ID = table_id;\n"
5297 			"       END IF;\n"
5298 			"END LOOP;\n"
5299 			"CLOSE cur_idx;\n";
5300 
5301 		sql +=	"DELETE FROM SYS_COLUMNS\n"
5302 			"WHERE TABLE_ID = table_id;\n"
5303 			"DELETE FROM SYS_TABLES\n"
5304 			"WHERE NAME = :table_name;\n";
5305 
5306 		if (dict_table_is_file_per_table(table)) {
5307 			sql += "DELETE FROM SYS_TABLESPACES\n"
5308 				"WHERE SPACE = space_id;\n"
5309 				"DELETE FROM SYS_DATAFILES\n"
5310 				"WHERE SPACE = space_id;\n";
5311 		}
5312 
5313 		sql +=	"DELETE FROM SYS_VIRTUAL\n"
5314 			"WHERE TABLE_ID = table_id;\n";
5315 
5316 		sql += "END;\n";
5317 
5318 		err = que_eval_sql(info, sql.c_str(), FALSE, trx);
5319 	} else {
5320 		page_no = page_nos;
5321 		for (dict_index_t* index = dict_table_get_first_index(table);
5322 		     index != NULL;
5323 		     index = dict_table_get_next_index(index)) {
5324 			/* remove the index object associated. */
5325 			dict_drop_index_tree_in_mem(index, *page_no++);
5326 		}
5327 		err = DB_SUCCESS;
5328 	}
5329 
5330 	switch (err) {
5331 		ulint	space_id;
5332 		bool	is_temp;
5333 		bool	is_encrypted;
5334 		bool	file_unreadable;
5335 		bool	is_discarded;
5336 		bool	shared_tablespace;
5337 
5338 	case DB_SUCCESS:
5339 		space_id = table->space;
5340 		file_unreadable = table->file_unreadable;
5341 		is_discarded = dict_table_is_discarded(table);
5342 		is_temp = dict_table_is_temporary(table);
5343 		is_encrypted = dict_table_is_encrypted(table);
5344 		shared_tablespace = DICT_TF_HAS_SHARED_SPACE(table->flags);
5345 
5346 		/* If there is a temp path then the temp flag is set.
5347 		However, during recovery, we might have a temp flag but
5348 		not know the temp path */
5349 		ut_a(table->dir_path_of_temp_table == NULL || is_temp);
5350 
5351 		/* We do not allow temporary tables with a remote path. */
5352 		ut_a(!(is_temp && DICT_TF_HAS_DATA_DIR(table->flags)));
5353 
5354 		/* Make sure the data_dir_path is set if needed. */
5355 		dict_get_and_save_data_dir_path(table, true);
5356 
5357 		/* Remove all compression dictionary references for the
5358 		table */
5359 		if (table->id != ULINT_UNDEFINED) {
5360 			err =
5361 			dict_create_remove_zip_dict_references_for_table(
5362 				table->id, trx);
5363 			if (err != DB_SUCCESS) {
5364 				ib::error() << "Error: (" <<
5365 					ut_strerr(err) <<
5366 					") not able to remove compression "
5367 					"dictionary references for table " <<
5368 					tablename;
5369 
5370 				break;
5371 			}
5372 		}
5373 
5374 		err = row_drop_ancillary_fts_tables(table, trx);
5375 		if (err != DB_SUCCESS) {
5376 			break;
5377 		}
5378 
5379 		/* Determine the tablespace filename before we drop
5380 		dict_table_t.  Free this memory before returning. */
5381 		if (DICT_TF_HAS_DATA_DIR(table->flags)) {
5382 			ut_a(table->data_dir_path);
5383 
5384 			filepath = fil_make_filepath(
5385 				table->data_dir_path,
5386 				table->name.m_name, IBD, true);
5387 		} else if (table->dir_path_of_temp_table) {
5388 			filepath = fil_make_filepath(
5389 				table->dir_path_of_temp_table,
5390 				NULL, IBD, false);
5391 		} else if (!shared_tablespace) {
5392 			filepath = fil_make_filepath(
5393 				NULL, table->name.m_name, IBD, false);
5394 		}
5395 
5396 		page0_has_crypt_data =
5397 			table->keyring_encryption_info.page0_has_crypt_data;
5398 
5399 		/* Free the dict_table_t object. */
5400 		err = row_drop_table_from_cache(tablename, table, trx);
5401 		if (err != DB_SUCCESS) {
5402 			break;
5403 		}
5404 
5405 		/* Do not attempt to drop known-to-be-missing tablespaces,
5406 		nor system or shared general tablespaces. */
5407 		if (is_discarded || file_unreadable || shared_tablespace
5408 		    || is_system_tablespace(space_id)) {
5409 			/* For encrypted table, if ibd file can not be decrypt,
5410 			we also set file_unreadable. We still need to try to
5411 			remove the ibd file for this. */
5412 			if (is_discarded || !is_encrypted
5413 			    || !file_unreadable) {
5414 				break;
5415 			}
5416 		}
5417 
5418 		if (is_encrypted && !page0_has_crypt_data) {
5419 			/* Require the mutex to block key rotation. */
5420 			was_master_key_id_mutex_locked = true;
5421 			mutex_enter(&master_key_id_mutex);
5422 		}
5423 		/* We can now drop the single-table tablespace. */
5424 		err = row_drop_single_table_tablespace(
5425 			space_id, tablename, filepath,
5426 			is_temp, is_encrypted, trx);
5427 
5428 		if (was_master_key_id_mutex_locked) {
5429 			mutex_exit(&master_key_id_mutex);
5430 		}
5431 		break;
5432 
5433 	case DB_OUT_OF_FILE_SPACE:
5434 		err = DB_MUST_GET_MORE_FILE_SPACE;
5435 
5436 		row_mysql_handle_errors(&err, trx, NULL, NULL);
5437 
5438 		/* raise error */
5439 		ut_error;
5440 		break;
5441 
5442 	case DB_TOO_MANY_CONCURRENT_TRXS:
5443 		/* Cannot even find a free slot for the
5444 		the undo log. We can directly exit here
5445 		and return the DB_TOO_MANY_CONCURRENT_TRXS
5446 		error. */
5447 
5448 	default:
5449 		/* This is some error we do not expect. Print
5450 		the error number and rollback the transaction */
5451 		ib::error() << "Unknown error code " << err << " while"
5452 			" dropping table: "
5453 			<< ut_get_name(trx, tablename) << ".";
5454 
5455 		trx->error_state = DB_SUCCESS;
5456 		trx_rollback_to_savepoint(trx, NULL);
5457 		trx->error_state = DB_SUCCESS;
5458 
5459 		/* Mark all indexes available in the data dictionary
5460 		cache again. */
5461 
5462 		page_no = page_nos;
5463 
5464 		for (dict_index_t* index = dict_table_get_first_index(table);
5465 		     index != NULL;
5466 		     index = dict_table_get_next_index(index)) {
5467 			rw_lock_x_lock(dict_index_get_lock(index));
5468 			ut_a(index->page == FIL_NULL);
5469 			index->page = *page_no++;
5470 			rw_lock_x_unlock(dict_index_get_lock(index));
5471 		}
5472 	}
5473 
5474 	if (err != DB_SUCCESS && table != NULL) {
5475 		/* Drop table has failed with error but as drop table is not
5476 		transaction safe we should mark the table as corrupted to avoid
5477 		unwarranted follow-up action on this table that can result
5478 		in more serious issues. */
5479 
5480 		table->corrupted = true;
5481 		for (dict_index_t* index = UT_LIST_GET_FIRST(table->indexes);
5482 		     index != NULL;
5483 		     index = UT_LIST_GET_NEXT(indexes, index)) {
5484 			dict_set_corrupted(index, trx, "DROP TABLE");
5485 		}
5486 	}
5487 
5488 funct_exit:
5489 	if (heap) {
5490 		mem_heap_free(heap);
5491 	}
5492 
5493 	ut_free(filepath);
5494 
5495 	if (locked_dictionary) {
5496 
5497 		if (trx_is_started(trx)) {
5498 
5499 			trx_commit_for_mysql(trx);
5500 		}
5501 
5502 		row_mysql_unlock_data_dictionary(trx);
5503 	}
5504 
5505 	trx->op_info = "";
5506 
5507 	/* No need to immediately invoke master thread as there is no work
5508 	generated by intrinsic table operation that needs master thread
5509 	attention. */
5510 	if (!is_intrinsic_temp_table) {
5511 		srv_wake_master_thread();
5512 	}
5513 
5514 	DBUG_RETURN(err);
5515 }
5516 
5517 /*********************************************************************//**
5518 Drop all temporary tables during crash recovery. */
5519 void
row_mysql_drop_temp_tables(void)5520 row_mysql_drop_temp_tables(void)
5521 /*============================*/
5522 {
5523 	trx_t*		trx;
5524 	btr_pcur_t	pcur;
5525 	mtr_t		mtr;
5526 	mem_heap_t*	heap;
5527 
5528 	trx = trx_allocate_for_background();
5529 	trx->op_info = "dropping temporary tables";
5530 	row_mysql_lock_data_dictionary(trx);
5531 
5532 	heap = mem_heap_create(200);
5533 
5534 	mtr_start(&mtr);
5535 
5536 	btr_pcur_open_at_index_side(
5537 		true,
5538 		dict_table_get_first_index(dict_sys->sys_tables),
5539 		BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
5540 
5541 	for (;;) {
5542 		const rec_t*	rec;
5543 		const byte*	field;
5544 		ulint		len;
5545 		const char*	table_name;
5546 		dict_table_t*	table;
5547 
5548 		btr_pcur_move_to_next_user_rec(&pcur, &mtr);
5549 
5550 		if (!btr_pcur_is_on_user_rec(&pcur)) {
5551 			break;
5552 		}
5553 
5554 		/* The high order bit of N_COLS is set unless
5555 		ROW_FORMAT=REDUNDANT. */
5556 		rec = btr_pcur_get_rec(&pcur);
5557 		field = rec_get_nth_field_old(
5558 			rec, DICT_FLD__SYS_TABLES__NAME, &len);
5559 		field = rec_get_nth_field_old(
5560 			rec, DICT_FLD__SYS_TABLES__N_COLS, &len);
5561 		if (len != 4
5562 		    || !(mach_read_from_4(field) & DICT_N_COLS_COMPACT)) {
5563 			continue;
5564 		}
5565 
5566 		/* Older versions of InnoDB, which only supported tables
5567 		in ROW_FORMAT=REDUNDANT could write garbage to
5568 		SYS_TABLES.MIX_LEN, where we now store the is_temp flag.
5569 		Above, we assumed is_temp=0 if ROW_FORMAT=REDUNDANT. */
5570 		field = rec_get_nth_field_old(
5571 			rec, DICT_FLD__SYS_TABLES__MIX_LEN, &len);
5572 		if (len != 4
5573 		    || !(mach_read_from_4(field) & DICT_TF2_TEMPORARY)) {
5574 			continue;
5575 		}
5576 
5577 		/* This is a temporary table. */
5578 		field = rec_get_nth_field_old(
5579 			rec, DICT_FLD__SYS_TABLES__NAME, &len);
5580 		if (len == UNIV_SQL_NULL || len == 0) {
5581 			/* Corrupted SYS_TABLES.NAME */
5582 			continue;
5583 		}
5584 
5585 		table_name = mem_heap_strdupl(heap, (const char*) field, len);
5586 
5587 		btr_pcur_store_position(&pcur, &mtr);
5588 		btr_pcur_commit_specify_mtr(&pcur, &mtr);
5589 
5590 		table = dict_table_get_low(table_name);
5591 
5592 		if (table) {
5593 			row_drop_table_for_mysql(table_name, trx, FALSE);
5594 			trx_commit_for_mysql(trx);
5595 		}
5596 
5597 		mtr_start(&mtr);
5598 		btr_pcur_restore_position(BTR_SEARCH_LEAF,
5599 					  &pcur, &mtr);
5600 	}
5601 
5602 	btr_pcur_close(&pcur);
5603 	mtr_commit(&mtr);
5604 	mem_heap_free(heap);
5605 	row_mysql_unlock_data_dictionary(trx);
5606 	trx_free_for_background(trx);
5607 }
5608 
5609 /*******************************************************************//**
5610 Drop all foreign keys in a database, see Bug#18942.
5611 Called at the end of row_drop_database_for_mysql().
5612 @return error code or DB_SUCCESS */
5613 static MY_ATTRIBUTE((warn_unused_result))
5614 dberr_t
drop_all_foreign_keys_in_db(const char * name,trx_t * trx)5615 drop_all_foreign_keys_in_db(
5616 /*========================*/
5617 	const char*	name,	/*!< in: database name which ends to '/' */
5618 	trx_t*		trx)	/*!< in: transaction handle */
5619 {
5620 	pars_info_t*	pinfo;
5621 	dberr_t		err;
5622 
5623 	ut_a(name[strlen(name) - 1] == '/');
5624 
5625 	pinfo = pars_info_create();
5626 
5627 	pars_info_add_str_literal(pinfo, "dbname", name);
5628 
5629 /** true if for_name is not prefixed with dbname */
5630 #define TABLE_NOT_IN_THIS_DB \
5631 "SUBSTR(for_name, 0, LENGTH(:dbname)) <> :dbname"
5632 
5633 	err = que_eval_sql(pinfo,
5634 			   "PROCEDURE DROP_ALL_FOREIGN_KEYS_PROC () IS\n"
5635 			   "foreign_id CHAR;\n"
5636 			   "for_name CHAR;\n"
5637 			   "found INT;\n"
5638 			   "DECLARE CURSOR cur IS\n"
5639 			   "SELECT ID, FOR_NAME FROM SYS_FOREIGN\n"
5640 			   "WHERE FOR_NAME >= :dbname\n"
5641 			   "LOCK IN SHARE MODE\n"
5642 			   "ORDER BY FOR_NAME;\n"
5643 			   "BEGIN\n"
5644 			   "found := 1;\n"
5645 			   "OPEN cur;\n"
5646 			   "WHILE found = 1 LOOP\n"
5647 			   "        FETCH cur INTO foreign_id, for_name;\n"
5648 			   "        IF (SQL % NOTFOUND) THEN\n"
5649 			   "                found := 0;\n"
5650 			   "        ELSIF (" TABLE_NOT_IN_THIS_DB ") THEN\n"
5651 			   "                found := 0;\n"
5652 			   "        ELSIF (1=1) THEN\n"
5653 			   "                DELETE FROM SYS_FOREIGN_COLS\n"
5654 			   "                WHERE ID = foreign_id;\n"
5655 			   "                DELETE FROM SYS_FOREIGN\n"
5656 			   "                WHERE ID = foreign_id;\n"
5657 			   "        END IF;\n"
5658 			   "END LOOP;\n"
5659 			   "CLOSE cur;\n"
5660 			   "COMMIT WORK;\n"
5661 			   "END;\n",
5662 			   FALSE, /* do not reserve dict mutex,
5663 				  we are already holding it */
5664 			   trx);
5665 
5666 	return(err);
5667 }
5668 
5669 /** Drop a database for MySQL.
5670 @param[in]	name	database name which ends at '/'
5671 @param[in]	trx	transaction handle
5672 @param[out]	found	number of dropped tables/partitions
5673 @return error code or DB_SUCCESS */
5674 dberr_t
row_drop_database_for_mysql(const char * name,trx_t * trx,ulint * found)5675 row_drop_database_for_mysql(
5676 	const char*	name,
5677 	trx_t*		trx,
5678 	ulint*		found)
5679 {
5680 	dict_table_t*	table;
5681 	char*		table_name;
5682 	dberr_t		err	= DB_SUCCESS;
5683 	ulint		namelen	= strlen(name);
5684 	bool		is_partition = false;
5685 
5686 	ut_ad(found != NULL);
5687 
5688 	DBUG_ENTER("row_drop_database_for_mysql");
5689 
5690 	DBUG_PRINT("row_drop_database_for_mysql", ("db: '%s'", name));
5691 
5692 	ut_a(name != NULL);
5693 	/* Assert DB name or partition name. */
5694 	if (name[namelen - 1] == '#') {
5695 		ut_ad(name[namelen - 2] != '/');
5696 		is_partition = true;
5697 		trx->op_info = "dropping partitions";
5698 	} else {
5699 		ut_a(name[namelen - 1] == '/');
5700 		trx->op_info = "dropping database";
5701 	}
5702 
5703 	*found = 0;
5704 
5705 	trx_set_dict_operation(trx, TRX_DICT_OP_TABLE);
5706 
5707 	trx_start_if_not_started_xa(trx, true);
5708 
5709 loop:
5710 	row_mysql_lock_data_dictionary(trx);
5711 
5712 	while ((table_name = dict_get_first_table_name_in_db(name))) {
5713 		/* Drop parent table if it is a fts aux table, to
5714 		avoid accessing dropped fts aux tables in information
5715 		scheam when parent table still exists.
5716 		Note: Drop parent table will drop fts aux tables. */
5717 		char*	parent_table_name;
5718 		parent_table_name = fts_get_parent_table_name(
5719 				table_name, strlen(table_name));
5720 
5721 		if (parent_table_name != NULL) {
5722 			ut_free(table_name);
5723 			table_name = parent_table_name;
5724 		}
5725 
5726 		ut_a(memcmp(table_name, name, namelen) == 0);
5727 
5728 		table = dict_table_open_on_name(
5729 			table_name, TRUE, FALSE, static_cast<dict_err_ignore_t>(
5730 				DICT_ERR_IGNORE_INDEX_ROOT
5731 				| DICT_ERR_IGNORE_CORRUPT));
5732 
5733 		if (!table) {
5734 			ib::error() << "Cannot load table " << table_name
5735 				<< " from InnoDB internal data dictionary"
5736 				" during drop database";
5737 			ut_free(table_name);
5738 			err = DB_TABLE_NOT_FOUND;
5739 			break;
5740 
5741 		}
5742 
5743 		if (!row_is_mysql_tmp_table_name(table->name.m_name)) {
5744 			/* There could be orphan temp tables left from
5745 			interrupted alter table. Leave them, and handle
5746 			the rest.*/
5747 			if (table->can_be_evicted
5748 			    && (name[namelen - 1] != '#')) {
5749 				ib::warn() << "Orphan table encountered during"
5750 					" DROP DATABASE. This is possible if '"
5751 					<< table->name << ".frm' was lost.";
5752 			}
5753 
5754 			if (table->file_unreadable) {
5755 				ib::warn() << "Missing .ibd file for table "
5756 					<< table->name << ".";
5757 			}
5758 		}
5759 
5760 		dict_table_close(table, TRUE, FALSE);
5761 
5762 		/* The dict_table_t object must not be accessed before
5763 		dict_table_open() or after dict_table_close(). But this is OK
5764 		if we are holding, the dict_sys->mutex. */
5765 		ut_ad(mutex_own(&dict_sys->mutex));
5766 
5767 		/* Disable statistics on the found table. */
5768 		if (!dict_stats_stop_bg(table)) {
5769 			row_mysql_unlock_data_dictionary(trx);
5770 
5771 			os_thread_sleep(250000);
5772 
5773 			ut_free(table_name);
5774 
5775 			goto loop;
5776 		}
5777 
5778 		/* Wait until MySQL does not have any queries running on
5779 		the table */
5780 
5781 		if (table->get_ref_count() > 0) {
5782 			row_mysql_unlock_data_dictionary(trx);
5783 
5784 			ib::warn() << "MySQL is trying to drop database "
5785 				<< ut_get_name(trx, name) << " though"
5786 				" there are still open handles to table "
5787 				<< table->name << ".";
5788 
5789 			os_thread_sleep(1000000);
5790 
5791 			ut_free(table_name);
5792 
5793 			goto loop;
5794 		}
5795 
5796 		err = row_drop_table_for_mysql(table_name, trx, TRUE);
5797 		trx_commit_for_mysql(trx);
5798 
5799 		if (err != DB_SUCCESS) {
5800 			ib::error() << "DROP DATABASE "
5801 				<< ut_get_name(trx, name) << " failed"
5802 				" with error (" << ut_strerr(err) << ") for"
5803 				" table " << ut_get_name(trx, table_name);
5804 			ut_free(table_name);
5805 			break;
5806 		}
5807 
5808 		ut_free(table_name);
5809 		(*found)++;
5810 	}
5811 
5812 	/* Partitioning does not yet support foreign keys. */
5813 	if (err == DB_SUCCESS && !is_partition) {
5814 		/* after dropping all tables try to drop all leftover
5815 		foreign keys in case orphaned ones exist */
5816 		err = drop_all_foreign_keys_in_db(name, trx);
5817 
5818 		if (err != DB_SUCCESS) {
5819 			const std::string&	db = ut_get_name(trx, name);
5820 			ib::error() << "DROP DATABASE " << db << " failed with"
5821 				" error " << err << " while dropping all"
5822 				" foreign keys";
5823 		}
5824 	}
5825 
5826 	trx_commit_for_mysql(trx);
5827 
5828 	row_mysql_unlock_data_dictionary(trx);
5829 
5830 	trx->op_info = "";
5831 
5832 	DBUG_RETURN(err);
5833 }
5834 
5835 /*********************************************************************//**
5836 Checks if a table name contains the string "/#sql" which denotes temporary
5837 tables in MySQL.
5838 @return true if temporary table */
5839 MY_ATTRIBUTE((warn_unused_result))
5840 bool
row_is_mysql_tmp_table_name(const char * name)5841 row_is_mysql_tmp_table_name(
5842 /*========================*/
5843 	const char*	name)	/*!< in: table name in the form
5844 				'database/tablename' */
5845 {
5846 	return(strstr(name, "/" TEMP_FILE_PREFIX) != NULL);
5847 	/* return(strstr(name, "/@0023sql") != NULL); */
5848 }
5849 
5850 /****************************************************************//**
5851 Delete a single constraint.
5852 @return error code or DB_SUCCESS */
5853 static MY_ATTRIBUTE((nonnull, warn_unused_result))
5854 dberr_t
row_delete_constraint_low(const char * id,trx_t * trx)5855 row_delete_constraint_low(
5856 /*======================*/
5857 	const char*	id,		/*!< in: constraint id */
5858 	trx_t*		trx)		/*!< in: transaction handle */
5859 {
5860 	pars_info_t*	info = pars_info_create();
5861 
5862 	pars_info_add_str_literal(info, "id", id);
5863 
5864 	return(que_eval_sql(info,
5865 			    "PROCEDURE DELETE_CONSTRAINT () IS\n"
5866 			    "BEGIN\n"
5867 			    "DELETE FROM SYS_FOREIGN_COLS WHERE ID = :id;\n"
5868 			    "DELETE FROM SYS_FOREIGN WHERE ID = :id;\n"
5869 			    "END;\n"
5870 			    , FALSE, trx));
5871 }
5872 
5873 /****************************************************************//**
5874 Delete a single constraint.
5875 @return error code or DB_SUCCESS */
5876 static MY_ATTRIBUTE((nonnull, warn_unused_result))
5877 dberr_t
row_delete_constraint(const char * id,const char * database_name,mem_heap_t * heap,trx_t * trx)5878 row_delete_constraint(
5879 /*==================*/
5880 	const char*	id,		/*!< in: constraint id */
5881 	const char*	database_name,	/*!< in: database name, with the
5882 					trailing '/' */
5883 	mem_heap_t*	heap,		/*!< in: memory heap */
5884 	trx_t*		trx)		/*!< in: transaction handle */
5885 {
5886 	dberr_t	err;
5887 
5888 	/* New format constraints have ids <databasename>/<constraintname>. */
5889 	err = row_delete_constraint_low(
5890 		mem_heap_strcat(heap, database_name, id), trx);
5891 
5892 	if ((err == DB_SUCCESS) && !strchr(id, '/')) {
5893 		/* Old format < 4.0.18 constraints have constraint ids
5894 		NUMBER_NUMBER. We only try deleting them if the
5895 		constraint name does not contain a '/' character, otherwise
5896 		deleting a new format constraint named 'foo/bar' from
5897 		database 'baz' would remove constraint 'bar' from database
5898 		'foo', if it existed. */
5899 
5900 		err = row_delete_constraint_low(id, trx);
5901 	}
5902 
5903 	return(err);
5904 }
5905 
5906 /*********************************************************************//**
5907 Renames a table for MySQL.
5908 @return error code or DB_SUCCESS */
5909 dberr_t
row_rename_table_for_mysql(const char * old_name,const char * new_name,trx_t * trx,bool commit)5910 row_rename_table_for_mysql(
5911 /*=======================*/
5912 	const char*	old_name,	/*!< in: old table name */
5913 	const char*	new_name,	/*!< in: new table name */
5914 	trx_t*		trx,		/*!< in/out: transaction */
5915 	bool		commit)		/*!< in: whether to commit trx */
5916 {
5917 	dict_table_t*	table			= NULL;
5918 	ibool		dict_locked		= FALSE;
5919 	dberr_t		err			= DB_ERROR;
5920 	mem_heap_t*	heap			= NULL;
5921 	const char**	constraints_to_drop	= NULL;
5922 	ulint		n_constraints_to_drop	= 0;
5923 	ibool		old_is_tmp, new_is_tmp;
5924 	pars_info_t*	info			= NULL;
5925 	int		retry;
5926 	bool		aux_fts_rename		= false;
5927 	bool		is_new_part;
5928 	bool		is_old_part;
5929 	ut_a(old_name != NULL);
5930 	ut_a(new_name != NULL);
5931 	ut_ad(trx->state == TRX_STATE_ACTIVE);
5932 
5933 	if (srv_force_recovery) {
5934 		ib::info() << MODIFICATIONS_NOT_ALLOWED_MSG_FORCE_RECOVERY;
5935 		err = DB_READ_ONLY;
5936 		goto funct_exit;
5937 
5938 	} else if (row_mysql_is_system_table(new_name)) {
5939 
5940 		ib::error() << "Trying to create a MySQL system table "
5941 			<< new_name << " of type InnoDB. MySQL system tables"
5942 			" must be of the MyISAM type!";
5943 		goto funct_exit;
5944 	}
5945 
5946 	/* Check the table identifier length here. It is possible that when we
5947 	are renaming a temporary table back to original name (after alter)
5948 	the table identifier length can exceed the maximum file name limit */
5949 
5950 	if (strlen(strchr(new_name,'/') + 1) > FN_LEN ) {
5951 		my_error(ER_PATH_LENGTH, MYF(0),
5952 			 strchr(new_name,'/')+1);
5953 		err = DB_IDENTIFIER_TOO_LONG;
5954 		goto funct_exit;
5955 	}
5956 
5957 	trx->op_info = "renaming table";
5958 
5959 	old_is_tmp = row_is_mysql_tmp_table_name(old_name);
5960 	new_is_tmp = row_is_mysql_tmp_table_name(new_name);
5961 
5962 	dict_locked = trx->dict_operation_lock_mode == RW_X_LATCH;
5963 
5964 	table = dict_table_open_on_name(old_name, dict_locked, FALSE,
5965 					DICT_ERR_IGNORE_NONE);
5966 
5967 	is_old_part = strstr((char*)old_name, "#p#") ||
5968 	              strstr((char*)old_name, "#P");
5969 
5970 	is_new_part = strstr((char*)new_name, "#p#") ||
5971 	              strstr((char*)new_name, "#P");
5972 
5973 	if (!table) {
5974 		err = DB_TABLE_NOT_FOUND;
5975 		goto funct_exit;
5976 
5977 	} else if (table->file_unreadable
5978 		   && !dict_table_is_discarded(table)) {
5979 
5980 		err = DB_TABLE_NOT_FOUND;
5981 
5982 		ib::error() << "Table " << old_name << " does not have an .ibd"
5983 			" file in the database directory. "
5984 			<< TROUBLESHOOTING_MSG;
5985 
5986 		goto funct_exit;
5987 
5988 	} else if (new_is_tmp) {
5989 		/* MySQL is doing an ALTER TABLE command and it renames the
5990 		original table to a temporary table name. We want to preserve
5991 		the original foreign key constraint definitions despite the
5992 		name change. An exception is those constraints for which
5993 		the ALTER TABLE contained DROP FOREIGN KEY <foreign key id>.*/
5994 
5995 		heap = mem_heap_create(100);
5996 
5997 		err = dict_foreign_parse_drop_constraints(
5998 			heap, trx, table, &n_constraints_to_drop,
5999 			&constraints_to_drop);
6000 
6001 		if (err != DB_SUCCESS) {
6002 			goto funct_exit;
6003 		}
6004 	}
6005 
6006         /* To exchange a normal table(t1) with partition table (p1), the
6007            rename logic is something like: 1) t1 -> tmp table 2) p1-> t1
6008            3) tmp -> p1. And special handling of dict_table_t::data_dir_path
6009            is necessary if DATA DIRECTORY is specified.
6010            For example if DATA DIRECTORY Is '/tmp', the data directory for
6011            nomral table is '/tmp/t1', while for partition is '/tmp'. So during
6012            above rename step 2) and 3), the postfix table name 't1' should
6013            either be truncated or appended.*/
6014         if (old_is_tmp && is_new_part && table->data_dir_path != NULL) {
6015 		std::string str(table->data_dir_path);
6016 		size_t found = str.find_last_of("/\\");
6017 
6018 		ut_ad(found != std::string::npos);
6019 		found++;
6020 
6021 		table->data_dir_path[found] = '\0';
6022 
6023         } else if (is_old_part && !is_new_part &&
6024                    table->data_dir_path != NULL && !new_is_tmp) {
6025 
6026 		uint old_size = mem_heap_get_size(table->heap);
6027 
6028 		std::string str(table->data_dir_path);
6029 
6030 		/* new_name contains database/name but we require name */
6031                 const char *name = strchr(new_name, '/') + 1;
6032 
6033                 str.append(name);
6034 
6035 		table->data_dir_path =
6036 		    mem_heap_strdup(table->heap, str.c_str());
6037 
6038 		uint new_size = mem_heap_get_size(table->heap);
6039 
6040 		ut_ad(mutex_own(&dict_sys->mutex));
6041 
6042 		dict_sys->size += new_size - old_size;
6043         }
6044 
6045 	/* Is a foreign key check running on this table? */
6046 	for (retry = 0; retry < 100
6047 	     && table->n_foreign_key_checks_running > 0; ++retry) {
6048 		row_mysql_unlock_data_dictionary(trx);
6049 		os_thread_yield();
6050 		row_mysql_lock_data_dictionary(trx);
6051 	}
6052 
6053 	if (table->n_foreign_key_checks_running > 0) {
6054 		ib::error() << "In ALTER TABLE "
6055 			<< ut_get_name(trx, old_name)
6056 			<< " a FOREIGN KEY check is running. Cannot rename"
6057 			" table.";
6058 		err = DB_TABLE_IN_FK_CHECK;
6059 		goto funct_exit;
6060 	}
6061 
6062 	/* We use the private SQL parser of Innobase to generate the query
6063 	graphs needed in updating the dictionary data from system tables. */
6064 
6065 	info = pars_info_create();
6066 
6067 	pars_info_add_str_literal(info, "new_table_name", new_name);
6068 	pars_info_add_str_literal(info, "old_table_name", old_name);
6069 
6070 	DEBUG_SYNC_C("rename_table");
6071 	err = que_eval_sql(info,
6072 			   "PROCEDURE RENAME_TABLE () IS\n"
6073 			   "BEGIN\n"
6074 			   "UPDATE SYS_TABLES"
6075 			   " SET NAME = :new_table_name\n"
6076 			   " WHERE NAME = :old_table_name;\n"
6077 			   "END;\n"
6078 			   , FALSE, trx);
6079 
6080 	/* SYS_TABLESPACES and SYS_DATAFILES need to be updated if
6081 	the table is in a single-table tablespace. */
6082 	if (err == DB_SUCCESS
6083 	    && dict_table_is_file_per_table(table)
6084 	    && !table->file_unreadable) {
6085 		/* Make a new pathname to update SYS_DATAFILES. */
6086 		char*	new_path = row_make_new_pathname(table, new_name);
6087 		char*	old_path = fil_space_get_first_path(table->space);
6088 
6089 		/* If old path and new path are the same means tablename
6090 		has not changed and only the database name holding the table
6091 		has changed so we need to make the complete filepath again. */
6092 		if (!dict_tables_have_same_db(old_name, new_name)) {
6093 			ut_free(new_path);
6094 			new_path = fil_make_filepath(NULL, new_name, IBD, false);
6095 		}
6096 
6097 		info = pars_info_create();
6098 
6099 		pars_info_add_str_literal(info, "new_table_name", new_name);
6100 		pars_info_add_str_literal(info, "new_path_name", new_path);
6101 		pars_info_add_int4_literal(info, "space_id", table->space);
6102 
6103 		err = que_eval_sql(info,
6104 				   "PROCEDURE RENAME_SPACE () IS\n"
6105 				   "BEGIN\n"
6106 				   "UPDATE SYS_TABLESPACES"
6107 				   " SET NAME = :new_table_name\n"
6108 				   " WHERE SPACE = :space_id;\n"
6109 				   "UPDATE SYS_DATAFILES"
6110 				   " SET PATH = :new_path_name\n"
6111 				   " WHERE SPACE = :space_id;\n"
6112 				   "END;\n"
6113 				   , FALSE, trx);
6114 
6115 		ut_free(old_path);
6116 		ut_free(new_path);
6117 	}
6118 	if (err != DB_SUCCESS) {
6119 		goto end;
6120 	}
6121 
6122 	if (!new_is_tmp) {
6123 		/* Rename all constraints. */
6124 		char	new_table_name[MAX_TABLE_NAME_LEN + 1] = "";
6125 		char	old_table_utf8[MAX_TABLE_NAME_LEN + 1] = "";
6126 		uint	errors = 0;
6127 
6128 		strncpy(old_table_utf8, old_name, MAX_TABLE_NAME_LEN);
6129 		innobase_convert_to_system_charset(
6130 			strchr(old_table_utf8, '/') + 1,
6131 			strchr(old_name, '/') +1,
6132 			MAX_TABLE_NAME_LEN, &errors);
6133 
6134 		if (errors) {
6135 			/* Table name could not be converted from charset
6136 			my_charset_filename to UTF-8. This means that the
6137 			table name is already in UTF-8 (#mysql#50). */
6138 			strncpy(old_table_utf8, old_name, MAX_TABLE_NAME_LEN);
6139 		}
6140 
6141 		info = pars_info_create();
6142 
6143 		pars_info_add_str_literal(info, "new_table_name", new_name);
6144 		pars_info_add_str_literal(info, "old_table_name", old_name);
6145 		pars_info_add_str_literal(info, "old_table_name_utf8",
6146 					  old_table_utf8);
6147 
6148 		strncpy(new_table_name, new_name, MAX_TABLE_NAME_LEN);
6149 		innobase_convert_to_system_charset(
6150 			strchr(new_table_name, '/') + 1,
6151 			strchr(new_name, '/') +1,
6152 			MAX_TABLE_NAME_LEN, &errors);
6153 
6154 		if (errors) {
6155 			/* Table name could not be converted from charset
6156 			my_charset_filename to UTF-8. This means that the
6157 			table name is already in UTF-8 (#mysql#50). */
6158 			strncpy(new_table_name, new_name, MAX_TABLE_NAME_LEN);
6159 		}
6160 
6161 		pars_info_add_str_literal(info, "new_table_utf8", new_table_name);
6162 
6163 		err = que_eval_sql(
6164 			info,
6165 			"PROCEDURE RENAME_CONSTRAINT_IDS () IS\n"
6166 			"gen_constr_prefix CHAR;\n"
6167 			"new_db_name CHAR;\n"
6168 			"foreign_id CHAR;\n"
6169 			"new_foreign_id CHAR;\n"
6170 			"old_db_name_len INT;\n"
6171 			"old_t_name_len INT;\n"
6172 			"new_db_name_len INT;\n"
6173 			"id_len INT;\n"
6174 			"offset INT;\n"
6175 			"found INT;\n"
6176 			"BEGIN\n"
6177 			"found := 1;\n"
6178 			"old_db_name_len := INSTR(:old_table_name, '/')-1;\n"
6179 			"new_db_name_len := INSTR(:new_table_name, '/')-1;\n"
6180 			"new_db_name := SUBSTR(:new_table_name, 0,\n"
6181 			"                      new_db_name_len);\n"
6182 			"old_t_name_len := LENGTH(:old_table_name);\n"
6183 			"gen_constr_prefix := CONCAT(:old_table_name_utf8,\n"
6184 			"			     '_ibfk_');\n"
6185 			"WHILE found = 1 LOOP\n"
6186 			"       SELECT ID INTO foreign_id\n"
6187 			"        FROM SYS_FOREIGN\n"
6188 			"        WHERE FOR_NAME = :old_table_name\n"
6189 			"         AND TO_BINARY(FOR_NAME)\n"
6190 			"           = TO_BINARY(:old_table_name)\n"
6191 			"         LOCK IN SHARE MODE;\n"
6192 			"       IF (SQL % NOTFOUND) THEN\n"
6193 			"        found := 0;\n"
6194 			"       ELSE\n"
6195 			"        UPDATE SYS_FOREIGN\n"
6196 			"        SET FOR_NAME = :new_table_name\n"
6197 			"         WHERE ID = foreign_id;\n"
6198 			"        id_len := LENGTH(foreign_id);\n"
6199 			"        IF (INSTR(foreign_id, '/') > 0) THEN\n"
6200 			"               IF (INSTR(foreign_id,\n"
6201 			"                         gen_constr_prefix) > 0)\n"
6202 			"               THEN\n"
6203                         "                offset := INSTR(foreign_id, '_ibfk_') - 1;\n"
6204 			"                new_foreign_id :=\n"
6205 			"                CONCAT(:new_table_utf8,\n"
6206 			"                SUBSTR(foreign_id, offset,\n"
6207 			"                       id_len - offset));\n"
6208 			"               ELSE\n"
6209 			"                new_foreign_id :=\n"
6210 			"                CONCAT(new_db_name,\n"
6211 			"                SUBSTR(foreign_id,\n"
6212 			"                       old_db_name_len,\n"
6213 			"                       id_len - old_db_name_len));\n"
6214 			"               END IF;\n"
6215 			"               UPDATE SYS_FOREIGN\n"
6216 			"                SET ID = new_foreign_id\n"
6217 			"                WHERE ID = foreign_id;\n"
6218 			"               UPDATE SYS_FOREIGN_COLS\n"
6219 			"                SET ID = new_foreign_id\n"
6220 			"                WHERE ID = foreign_id;\n"
6221 			"        END IF;\n"
6222 			"       END IF;\n"
6223 			"END LOOP;\n"
6224 			"UPDATE SYS_FOREIGN SET REF_NAME = :new_table_name\n"
6225 			"WHERE REF_NAME = :old_table_name\n"
6226 			"  AND TO_BINARY(REF_NAME)\n"
6227 			"    = TO_BINARY(:old_table_name);\n"
6228 			"END;\n"
6229 			, FALSE, trx);
6230 		if (err != DB_SUCCESS) {
6231 			goto end;
6232 		}
6233 
6234 	} else if (n_constraints_to_drop > 0) {
6235 		/* Drop some constraints of tmp tables. */
6236 
6237 		ulint	db_name_len = dict_get_db_name_len(old_name) + 1;
6238 		char*	db_name = mem_heap_strdupl(heap, old_name,
6239 						   db_name_len);
6240 		ulint	i;
6241 
6242 		for (i = 0; i < n_constraints_to_drop; i++) {
6243 			err = row_delete_constraint(constraints_to_drop[i],
6244 						    db_name, heap, trx);
6245 
6246 			if (err != DB_SUCCESS) {
6247 				break;
6248 			}
6249 		}
6250 	}
6251 
6252 	if ((dict_table_has_fts_index(table)
6253 		|| DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID))
6254 		&& !dict_tables_have_same_db(old_name, new_name)) {
6255 		err = fts_rename_aux_tables(table, new_name, trx);
6256 		if (err != DB_TABLE_NOT_FOUND) {
6257 			aux_fts_rename = true;
6258 		}
6259 	}
6260 
6261 end:
6262 	if (err != DB_SUCCESS) {
6263 		if (err == DB_DUPLICATE_KEY) {
6264 			ib::error() << "Possible reasons:";
6265 			ib::error() << "(1) Table rename would cause two"
6266 				" FOREIGN KEY constraints to have the same"
6267 				" internal name in case-insensitive"
6268 				" comparison.";
6269 			ib::error() << "(2) Table "
6270 				<< ut_get_name(trx, new_name)
6271 				<< " exists in the InnoDB internal data"
6272 				" dictionary though MySQL is trying to rename"
6273 				" table " << ut_get_name(trx, old_name)
6274 				<< " to it. Have you deleted the .frm file and"
6275 				" not used DROP TABLE?";
6276 			ib::info() << TROUBLESHOOTING_MSG;
6277 			ib::error() << "If table "
6278 				<< ut_get_name(trx, new_name)
6279 				<< " is a temporary table #sql..., then"
6280 				" it can be that there are still queries"
6281 				" running on the table, and it will be dropped"
6282 				" automatically when the queries end. You can"
6283 				" drop the orphaned table inside InnoDB by"
6284 				" creating an InnoDB table with the same name"
6285 				" in another database and copying the .frm file"
6286 				" to the current database. Then MySQL thinks"
6287 				" the table exists, and DROP TABLE will"
6288 				" succeed.";
6289 		}
6290 		trx->error_state = DB_SUCCESS;
6291 		trx_rollback_to_savepoint(trx, NULL);
6292 		trx->error_state = DB_SUCCESS;
6293 	} else {
6294 		/* The following call will also rename the .ibd data file if
6295 		the table is stored in a single-table tablespace */
6296 
6297 		err = dict_table_rename_in_cache(
6298 			table, new_name, !new_is_tmp);
6299 		if (err != DB_SUCCESS) {
6300 			trx->error_state = DB_SUCCESS;
6301 			trx_rollback_to_savepoint(trx, NULL);
6302 			trx->error_state = DB_SUCCESS;
6303 			goto funct_exit;
6304 		}
6305 
6306 		/* In case of copy alter, template db_name and
6307 		table_name should be renamed only for newly
6308 		created table. */
6309 		if (table->vc_templ != NULL && !new_is_tmp) {
6310 			innobase_rename_vc_templ(table);
6311 		}
6312 
6313 		/* We only want to switch off some of the type checking in
6314 		an ALTER TABLE...ALGORITHM=COPY, not in a RENAME. */
6315 		dict_names_t	fk_tables;
6316 
6317 		err = dict_load_foreigns(
6318 			new_name, NULL,
6319 			false, !old_is_tmp || trx->check_foreigns,
6320 			DICT_ERR_IGNORE_NONE, fk_tables);
6321 
6322 		if (err != DB_SUCCESS) {
6323 
6324 			if (old_is_tmp) {
6325 				ib::error() << "In ALTER TABLE "
6326 					<< ut_get_name(trx, new_name)
6327 					<< " has or is referenced in foreign"
6328 					" key constraints which are not"
6329 					" compatible with the new table"
6330 					" definition.";
6331 			} else {
6332 				ib::error() << "In RENAME TABLE table "
6333 					<< ut_get_name(trx, new_name)
6334 					<< " is referenced in foreign key"
6335 					" constraints which are not compatible"
6336 					" with the new table definition.";
6337 			}
6338 
6339 			ut_a(DB_SUCCESS == dict_table_rename_in_cache(
6340 				table, old_name, FALSE));
6341 			trx->error_state = DB_SUCCESS;
6342 			trx_rollback_to_savepoint(trx, NULL);
6343 			trx->error_state = DB_SUCCESS;
6344 		}
6345 
6346 		/* Check whether virtual column or stored column affects
6347 		the foreign key constraint of the table. */
6348 		if (dict_foreigns_has_s_base_col(
6349 				table->foreign_set, table)) {
6350 			err = DB_NO_FK_ON_S_BASE_COL;
6351 			ut_a(DB_SUCCESS == dict_table_rename_in_cache(
6352 				table, old_name, FALSE));
6353 			trx->error_state = DB_SUCCESS;
6354 			trx_rollback_to_savepoint(trx, NULL);
6355 			trx->error_state = DB_SUCCESS;
6356 			goto funct_exit;
6357 		}
6358 
6359 		/* Fill the virtual column set in foreign when
6360 		the table undergoes copy alter operation. */
6361 		dict_mem_table_free_foreign_vcol_set(table);
6362 		dict_mem_table_fill_foreign_vcol_set(table);
6363 
6364 		while (!fk_tables.empty()) {
6365 			dict_load_table(fk_tables.front(), true,
6366 					DICT_ERR_IGNORE_NONE);
6367 			fk_tables.pop_front();
6368 		}
6369 	}
6370 
6371 funct_exit:
6372 	if (aux_fts_rename && err != DB_SUCCESS
6373 	    && table != NULL && (table->space != 0)) {
6374 
6375 		char*	orig_name = table->name.m_name;
6376 		trx_t*	trx_bg = trx_allocate_for_background();
6377 
6378 		/* If the first fts_rename fails, the trx would
6379 		be rolled back and committed, we can't use it any more,
6380 		so we have to start a new background trx here. */
6381 		ut_a(trx_state_eq(trx_bg, TRX_STATE_NOT_STARTED));
6382 		trx_bg->op_info = "Revert the failing rename "
6383 				  "for fts aux tables";
6384 		trx_bg->dict_operation_lock_mode = RW_X_LATCH;
6385 		trx_start_for_ddl(trx_bg, TRX_DICT_OP_TABLE);
6386 
6387 		/* If rename fails and table has its own tablespace,
6388 		we need to call fts_rename_aux_tables again to
6389 		revert the ibd file rename, which is not under the
6390 		control of trx. Also notice the parent table name
6391 		in cache is not changed yet. If the reverting fails,
6392 		the ibd data may be left in the new database, which
6393 		can be fixed only manually. */
6394 		table->name.m_name = const_cast<char*>(new_name);
6395 		fts_rename_aux_tables(table, old_name, trx_bg);
6396 		table->name.m_name = orig_name;
6397 
6398 		trx_bg->dict_operation_lock_mode = 0;
6399 		trx_commit_for_mysql(trx_bg);
6400 		trx_free_for_background(trx_bg);
6401 	}
6402 
6403 	if (table != NULL) {
6404 		dict_table_close(table, dict_locked, FALSE);
6405 	}
6406 
6407 	if (commit) {
6408 		trx_commit_for_mysql(trx);
6409 	}
6410 
6411 	if (UNIV_LIKELY_NULL(heap)) {
6412 		mem_heap_free(heap);
6413 	}
6414 
6415 	trx->op_info = "";
6416 
6417 	return(err);
6418 }
6419 
6420 /** Renames a partitioned table for MySQL.
6421 @parama[in]	thd		Connection thread handle
6422 @param[in]	old_name	Old table name.
6423 @param[in]	new_name	New table name.
6424 @param[in,out]	trx		Transaction.
6425 @return error code or DB_SUCCESS */
6426 dberr_t
row_rename_partitions_for_mysql(THD * thd,const char * old_name,const char * new_name,trx_t * trx)6427 row_rename_partitions_for_mysql(
6428 	THD*		thd,
6429 	const char*	old_name,
6430 	const char*	new_name,
6431 	trx_t*		trx)
6432 {
6433 	char		from_name[FN_REFLEN];
6434 	char		to_name[FN_REFLEN];
6435 	ulint		from_len = strlen(old_name);
6436 	ulint		to_len = strlen(new_name);
6437 	char*		table_name;
6438 	dberr_t		error = DB_TABLE_NOT_FOUND;
6439 
6440 	ut_a(from_len < (FN_REFLEN - 4));
6441 	ut_a(to_len < (FN_REFLEN - 4));
6442 	memcpy(from_name, old_name, from_len);
6443 	from_name[from_len] = '#';
6444 	from_name[from_len + 1] = 0;
6445 	typedef std::vector<std::pair<std::string, std::string> > partition_names;
6446 	partition_names store_name;
6447 	partition_names::iterator it;
6448 
6449 	while ((table_name = dict_get_first_table_name_in_db(from_name))) {
6450 		ut_a(memcmp(table_name, from_name, from_len) == 0);
6451 		/* Must match #[Pp]#<partition_name> */
6452 		if (strlen(table_name) <= (from_len + 3)
6453 		    || table_name[from_len] != '#'
6454 		    || table_name[from_len + 2] != '#'
6455 		    || (table_name[from_len + 1] != 'P'
6456 			&& table_name[from_len + 1] != 'p')) {
6457 
6458 			ut_ad(0);
6459 			ut_free(table_name);
6460 			continue;
6461 		}
6462 		memcpy(to_name, new_name, to_len);
6463 		memcpy(to_name + to_len, table_name + from_len,
6464 			strlen(table_name) - from_len + 1);
6465 		error = row_rename_table_for_mysql(table_name, to_name,
6466 						trx, false);
6467 		if (error == DB_SUCCESS) {
6468 			std::pair<std::string, std::string> pair_names;
6469 			pair_names.first = table_name;
6470 			pair_names.second = to_name;
6471 			store_name.push_back(pair_names);
6472 		} else {
6473 			store_name.clear();
6474 			/* Rollback and return. */
6475 			trx_rollback_for_mysql(trx);
6476 			ut_free(table_name);
6477 			return(error);
6478 		}
6479 		ut_free(table_name);
6480 	}
6481 	trx_commit_for_mysql(trx);
6482 
6483 	char    errstr[512];
6484 	for (it = store_name.begin(); it != store_name.end(); ++it) {
6485 		error = dict_stats_rename_table(
6486 			   true, it->first.c_str(), it->second.c_str(),
6487 			   errstr, sizeof(errstr));
6488 
6489 		if (error != DB_SUCCESS) {
6490 			ib::error() << errstr;
6491 			push_warning(thd, Sql_condition::SL_WARNING,
6492 				     ER_LOCK_WAIT_TIMEOUT, errstr);
6493 			break;
6494 		}
6495 	}
6496 
6497 	store_name.clear();
6498 	return(error);
6499 }
6500 
6501 /*********************************************************************//**
6502 Scans an index for either COUNT(*) or CHECK TABLE.
6503 If CHECK TABLE; Checks that the index contains entries in an ascending order,
6504 unique constraint is not broken, and calculates the number of index entries
6505 in the read view of the current transaction.
6506 @return DB_SUCCESS or other error */
6507 dberr_t
row_scan_index_for_mysql(row_prebuilt_t * prebuilt,const dict_index_t * index,bool check_keys,ulint * n_rows)6508 row_scan_index_for_mysql(
6509 /*=====================*/
6510 	row_prebuilt_t*		prebuilt,	/*!< in: prebuilt struct
6511 						in MySQL handle */
6512 	const dict_index_t*	index,		/*!< in: index */
6513 #ifdef WL6742
6514 	/* Removing WL6742 as part of Bug 23046302 */
6515 
6516 	bool			check_keys,	/*!< in: true=check for mis-
6517 						ordered or duplicate records,
6518 						false=count the rows only */
6519 #endif
6520 	ulint*			n_rows)		/*!< out: number of entries
6521 						seen in the consistent read */
6522 {
6523 	dtuple_t*	prev_entry	= NULL;
6524 	ulint		matched_fields;
6525 	byte*		buf;
6526 	dberr_t		ret;
6527 	rec_t*		rec;
6528 	int		cmp;
6529 	ibool		contains_null;
6530 	ulint		i;
6531 	ulint		cnt;
6532 	mem_heap_t*	heap		= NULL;
6533 	ulint		n_ext;
6534 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
6535 	ulint*		offsets;
6536 	rec_offs_init(offsets_);
6537 
6538 	*n_rows = 0;
6539 
6540 	/* Don't support RTree Leaf level scan */
6541 	ut_ad(!dict_index_is_spatial(index));
6542 
6543 	if (dict_index_is_clust(index)) {
6544 		/* The clustered index of a table is always available.
6545 		During online ALTER TABLE that rebuilds the table, the
6546 		clustered index in the old table will have
6547 		index->online_log pointing to the new table. All
6548 		indexes of the old table will remain valid and the new
6549 		table will be unaccessible to MySQL until the
6550 		completion of the ALTER TABLE. */
6551 	} else if (dict_index_is_online_ddl(index)
6552 		   || (index->type & DICT_FTS)) {
6553 		/* Full Text index are implemented by auxiliary tables,
6554 		not the B-tree. We also skip secondary indexes that are
6555 		being created online. */
6556 		return(DB_SUCCESS);
6557 	}
6558 
6559 	ulint bufsize = ut_max(UNIV_PAGE_SIZE, prebuilt->mysql_row_len);
6560 	buf = static_cast<byte*>(ut_malloc_nokey(bufsize));
6561 	heap = mem_heap_create(100);
6562 
6563 	cnt = 1000;
6564 
6565 	ret = row_search_for_mysql(buf, PAGE_CUR_G, prebuilt, 0, 0);
6566 loop:
6567 	/* Check thd->killed every 1,000 scanned rows */
6568 	if (--cnt == 0) {
6569 		if (trx_is_interrupted(prebuilt->trx)) {
6570 			ret = DB_INTERRUPTED;
6571 			goto func_exit;
6572 		}
6573 		cnt = 1000;
6574 	}
6575 
6576 	switch (ret) {
6577 	case DB_SUCCESS:
6578 		break;
6579 	case DB_DEADLOCK:
6580 	case DB_LOCK_TABLE_FULL:
6581 	case DB_LOCK_WAIT_TIMEOUT:
6582 	case DB_INTERRUPTED:
6583 		goto func_exit;
6584 	default:
6585 	{
6586 		const char* doing = "CHECK TABLE";
6587 		ib::warn() << doing << " on index " << index->name << " of"
6588 			" table " << index->table->name << " returned " << ret;
6589 	}
6590 	/* Fall through */
6591 	/* (this error is ignored by CHECK TABLE) */
6592 	case DB_END_OF_INDEX:
6593 		ret = DB_SUCCESS;
6594 func_exit:
6595 		ut_free(buf);
6596 		mem_heap_free(heap);
6597 
6598 		return(ret);
6599 	}
6600 
6601 	*n_rows = *n_rows + 1;
6602 
6603 #ifdef WL6742
6604 	/*Removing WL6742 as part of Bug 23046302 */
6605 	if (!check_keys) {
6606 		goto next_rec;
6607 	}
6608 #endif
6609 	/* else this code is doing handler::check() for CHECK TABLE */
6610 
6611 	/* row_search... returns the index record in buf, record origin offset
6612 	within buf stored in the first 4 bytes, because we have built a dummy
6613 	template */
6614 
6615 	rec = buf + mach_read_from_4(buf);
6616 
6617 	offsets = rec_get_offsets(rec, index, offsets_,
6618 				  ULINT_UNDEFINED, &heap);
6619 
6620 	if (prev_entry != NULL) {
6621 		matched_fields = 0;
6622 
6623 		cmp = cmp_dtuple_rec_with_match(prev_entry, rec, offsets,
6624 						&matched_fields);
6625 		contains_null = FALSE;
6626 
6627 		/* In a unique secondary index we allow equal key values if
6628 		they contain SQL NULLs */
6629 
6630 		for (i = 0;
6631 		     i < dict_index_get_n_ordering_defined_by_user(index);
6632 		     i++) {
6633 			if (UNIV_SQL_NULL == dfield_get_len(
6634 				    dtuple_get_nth_field(prev_entry, i))) {
6635 
6636 				contains_null = TRUE;
6637 				break;
6638 			}
6639 		}
6640 
6641 		const char* msg;
6642 
6643 		if (cmp > 0) {
6644 			ret = DB_INDEX_CORRUPT;
6645 			msg = "index records in a wrong order in ";
6646 not_ok:
6647 			ib::error()
6648 				<< msg << index->name
6649 				<< " of table " << index->table->name
6650 				<< ": " << *prev_entry << ", "
6651 				<< rec_offsets_print(rec, offsets);
6652 			/* Continue reading */
6653 		} else if (dict_index_is_unique(index)
6654 			   && !contains_null
6655 			   && matched_fields
6656 			   >= dict_index_get_n_ordering_defined_by_user(
6657 				   index)) {
6658 			ret = DB_DUPLICATE_KEY;
6659 			msg = "duplicate key in ";
6660 			goto not_ok;
6661 		}
6662 	}
6663 
6664 	{
6665 		mem_heap_t*	tmp_heap = NULL;
6666 
6667 		/* Empty the heap on each round.  But preserve offsets[]
6668 		for the row_rec_to_index_entry() call, by copying them
6669 		into a separate memory heap when needed. */
6670 		if (UNIV_UNLIKELY(offsets != offsets_)) {
6671 			ulint	size = rec_offs_get_n_alloc(offsets)
6672 				* sizeof *offsets;
6673 
6674 			tmp_heap = mem_heap_create(size);
6675 
6676 			offsets = static_cast<ulint*>(
6677 				mem_heap_dup(tmp_heap, offsets, size));
6678 		}
6679 
6680 		mem_heap_empty(heap);
6681 
6682 		prev_entry = row_rec_to_index_entry(
6683 			rec, index, offsets, &n_ext, heap);
6684 
6685 		if (UNIV_LIKELY_NULL(tmp_heap)) {
6686 			mem_heap_free(tmp_heap);
6687 		}
6688 	}
6689 #ifdef WL6742
6690 /* Removed WL6742 as part of Bug 23046302 */
6691 next_rec:
6692 #endif
6693 	ret = row_search_for_mysql(
6694 		buf, PAGE_CUR_G, prebuilt, 0, ROW_SEL_NEXT);
6695 
6696 	goto loop;
6697 }
6698 
6699 /*********************************************************************//**
6700 Initialize this module */
6701 void
row_mysql_init(void)6702 row_mysql_init(void)
6703 /*================*/
6704 {
6705 	mutex_create(LATCH_ID_ROW_DROP_LIST, &row_drop_list_mutex);
6706 
6707 	UT_LIST_INIT(
6708 		row_mysql_drop_list,
6709 		&row_mysql_drop_t::row_mysql_drop_list);
6710 
6711 	row_mysql_drop_list_inited = TRUE;
6712 }
6713 
6714 /*********************************************************************//**
6715 Close this module */
6716 void
row_mysql_close(void)6717 row_mysql_close(void)
6718 /*================*/
6719 {
6720 	ut_a(UT_LIST_GET_LEN(row_mysql_drop_list) == 0);
6721 
6722 	mutex_free(&row_drop_list_mutex);
6723 
6724 	row_mysql_drop_list_inited = FALSE;
6725 }
6726