1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2018, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0ins.cc
29 Insert into a table
30 
31 Created 4/20/1996 Heikki Tuuri
32 *******************************************************/
33 
34 #include "ha_prototypes.h"
35 
36 #include "row0ins.h"
37 
38 #ifdef UNIV_NONINL
39 #include "row0ins.ic"
40 #endif
41 
42 #include "dict0dict.h"
43 #include "dict0boot.h"
44 #include "trx0rec.h"
45 #include "trx0undo.h"
46 #include "btr0btr.h"
47 #include "btr0cur.h"
48 #include "mach0data.h"
49 #include "que0que.h"
50 #include "row0upd.h"
51 #include "row0sel.h"
52 #include "row0row.h"
53 #include "row0log.h"
54 #include "rem0cmp.h"
55 #include "lock0lock.h"
56 #include "log0log.h"
57 #include "eval0eval.h"
58 #include "data0data.h"
59 #include "usr0sess.h"
60 #include "buf0lru.h"
61 #include "fts0fts.h"
62 #include "fts0types.h"
63 #include "m_string.h"
64 #include "gis0geo.h"
65 
66 /*************************************************************************
67 IMPORTANT NOTE: Any operation that generates redo MUST check that there
68 is enough space in the redo log before for that operation. This is
69 done by calling log_free_check(). The reason for checking the
70 availability of the redo log space before the start of the operation is
71 that we MUST not hold any synchonization objects when performing the
72 check.
73 If you make a change in this module make sure that no codepath is
74 introduced where a call to log_free_check() is bypassed. */
75 
76 /*********************************************************************//**
77 Creates an insert node struct.
78 @return own: insert node struct */
79 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)80 ins_node_create(
81 /*============*/
82 	ulint		ins_type,	/*!< in: INS_VALUES, ... */
83 	dict_table_t*	table,		/*!< in: table where to insert */
84 	mem_heap_t*	heap)		/*!< in: mem heap where created */
85 {
86 	ins_node_t*	node;
87 
88 	node = static_cast<ins_node_t*>(
89 		mem_heap_alloc(heap, sizeof(ins_node_t)));
90 
91 	node->common.type = QUE_NODE_INSERT;
92 
93 	node->ins_type = ins_type;
94 
95 	node->state = INS_NODE_SET_IX_LOCK;
96 	node->table = table;
97 	node->index = NULL;
98 	node->entry = NULL;
99 
100 	node->select = NULL;
101 
102 	node->trx_id = 0;
103 
104 	node->entry_sys_heap = mem_heap_create(128);
105 
106 	node->magic_n = INS_NODE_MAGIC_N;
107 
108 	return(node);
109 }
110 
111 /***********************************************************//**
112 Creates an entry template for each index of a table. */
113 static
114 void
ins_node_create_entry_list(ins_node_t * node)115 ins_node_create_entry_list(
116 /*=======================*/
117 	ins_node_t*	node)	/*!< in: row insert node */
118 {
119 	dict_index_t*	index;
120 	dtuple_t*	entry;
121 
122 	ut_ad(node->entry_sys_heap);
123 
124 	UT_LIST_INIT(node->entry_list, &dtuple_t::tuple_list);
125 
126 	/* We will include all indexes (include those corrupted
127 	secondary indexes) in the entry list. Filteration of
128 	these corrupted index will be done in row_ins() */
129 
130 	for (index = dict_table_get_first_index(node->table);
131 	     index != 0;
132 	     index = dict_table_get_next_index(index)) {
133 
134 		entry = row_build_index_entry_low(
135 			node->row, NULL, index, node->entry_sys_heap,
136 			ROW_BUILD_FOR_INSERT);
137 
138 		UT_LIST_ADD_LAST(node->entry_list, entry);
139 	}
140 }
141 
142 /*****************************************************************//**
143 Adds system field buffers to a row. */
144 static
145 void
row_ins_alloc_sys_fields(ins_node_t * node)146 row_ins_alloc_sys_fields(
147 /*=====================*/
148 	ins_node_t*	node)	/*!< in: insert node */
149 {
150 	dtuple_t*		row;
151 	dict_table_t*		table;
152 	mem_heap_t*		heap;
153 	const dict_col_t*	col;
154 	dfield_t*		dfield;
155 	byte*			ptr;
156 
157 	row = node->row;
158 	table = node->table;
159 	heap = node->entry_sys_heap;
160 
161 	ut_ad(row && table && heap);
162 	ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
163 
164 	/* allocate buffer to hold the needed system created hidden columns. */
165 	uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN;
166 	if (!dict_table_is_intrinsic(table)) {
167 		len += DATA_ROLL_PTR_LEN;
168 	}
169 	ptr = static_cast<byte*>(mem_heap_zalloc(heap, len));
170 
171 	/* 1. Populate row-id */
172 	col = dict_table_get_sys_col(table, DATA_ROW_ID);
173 
174 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
175 
176 	dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
177 
178 	node->row_id_buf = ptr;
179 
180 	ptr += DATA_ROW_ID_LEN;
181 
182 	/* 2. Populate trx id */
183 	col = dict_table_get_sys_col(table, DATA_TRX_ID);
184 
185 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
186 
187 	dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
188 
189 	node->trx_id_buf = ptr;
190 
191 	ptr += DATA_TRX_ID_LEN;
192 
193 	if (!dict_table_is_intrinsic(table)) {
194 		col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
195 
196 		dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
197 
198 		dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
199 	}
200 }
201 
202 /*********************************************************************//**
203 Sets a new row to insert for an INS_DIRECT node. This function is only used
204 if we have constructed the row separately, which is a rare case; this
205 function is quite slow. */
206 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)207 ins_node_set_new_row(
208 /*=================*/
209 	ins_node_t*	node,	/*!< in: insert node */
210 	dtuple_t*	row)	/*!< in: new row (or first row) for the node */
211 {
212 	node->state = INS_NODE_SET_IX_LOCK;
213 	node->index = NULL;
214 	node->entry = NULL;
215 
216 	node->row = row;
217 
218 	mem_heap_empty(node->entry_sys_heap);
219 
220 	/* Create templates for index entries */
221 
222 	ins_node_create_entry_list(node);
223 
224 	/* Allocate from entry_sys_heap buffers for sys fields */
225 
226 	row_ins_alloc_sys_fields(node);
227 
228 	/* As we allocated a new trx id buf, the trx id should be written
229 	there again: */
230 
231 	node->trx_id = 0;
232 }
233 
234 /*******************************************************************//**
235 Does an insert operation by updating a delete-marked existing record
236 in the index. This situation can occur if the delete-marked record is
237 kept in the index for consistent reads.
238 @return DB_SUCCESS or error code */
239 static MY_ATTRIBUTE((warn_unused_result))
240 dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)241 row_ins_sec_index_entry_by_modify(
242 /*==============================*/
243 	ulint		flags,	/*!< in: undo logging and locking flags */
244 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
245 				depending on whether mtr holds just a leaf
246 				latch or also a tree latch */
247 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
248 	ulint**		offsets,/*!< in/out: offsets on cursor->page_cur.rec */
249 	mem_heap_t*	offsets_heap,
250 				/*!< in/out: memory heap that can be emptied */
251 	mem_heap_t*	heap,	/*!< in/out: memory heap */
252 	const dtuple_t*	entry,	/*!< in: index entry to insert */
253 	que_thr_t*	thr,	/*!< in: query thread */
254 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
255 				latching any further pages */
256 {
257 	big_rec_t*	dummy_big_rec;
258 	upd_t*		update;
259 	rec_t*		rec;
260 	dberr_t		err;
261 
262 	rec = btr_cur_get_rec(cursor);
263 
264 	ut_ad(!dict_index_is_clust(cursor->index));
265 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
266 	ut_ad(!entry->info_bits);
267 
268 	/* We know that in the alphabetical ordering, entry and rec are
269 	identified. But in their binary form there may be differences if
270 	there are char fields in them. Therefore we have to calculate the
271 	difference. */
272 
273 	update = row_upd_build_sec_rec_difference_binary(
274 		rec, cursor->index, *offsets, entry, heap);
275 
276 	if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
277 		/* We should never insert in place of a record that
278 		has not been delete-marked. The only exception is when
279 		online CREATE INDEX copied the changes that we already
280 		made to the clustered index, and completed the
281 		secondary index creation before we got here. In this
282 		case, the change would already be there. The CREATE
283 		INDEX should be waiting for a MySQL meta-data lock
284 		upgrade at least until this INSERT or UPDATE
285 		returns. After that point, set_committed(true)
286 		would be invoked in commit_inplace_alter_table(). */
287 		ut_a(update->n_fields == 0);
288 		ut_a(!cursor->index->is_committed());
289 		ut_ad(!dict_index_is_online_ddl(cursor->index));
290 		return(DB_SUCCESS);
291 	}
292 
293 	if (mode == BTR_MODIFY_LEAF) {
294 		/* Try an optimistic updating of the record, keeping changes
295 		within the page */
296 
297 		/* TODO: pass only *offsets */
298 		err = btr_cur_optimistic_update(
299 			flags | BTR_KEEP_SYS_FLAG, cursor,
300 			offsets, &offsets_heap, update, 0, thr,
301 			thr_get_trx(thr)->id, mtr);
302 		switch (err) {
303 		case DB_OVERFLOW:
304 		case DB_UNDERFLOW:
305 		case DB_ZIP_OVERFLOW:
306 			err = DB_FAIL;
307 		default:
308 			break;
309 		}
310 	} else {
311 		ut_a(mode == BTR_MODIFY_TREE);
312 		if (buf_LRU_buf_pool_running_out()) {
313 
314 			return(DB_LOCK_TABLE_FULL);
315 		}
316 
317 		err = btr_cur_pessimistic_update(
318 			flags | BTR_KEEP_SYS_FLAG, cursor,
319 			offsets, &offsets_heap,
320 			heap, &dummy_big_rec, update, 0,
321 			thr, thr_get_trx(thr)->id, mtr);
322 		ut_ad(!dummy_big_rec);
323 	}
324 
325 	return(err);
326 }
327 
328 /*******************************************************************//**
329 Does an insert operation by delete unmarking and updating a delete marked
330 existing record in the index. This situation can occur if the delete marked
331 record is kept in the index for consistent reads.
332 @return DB_SUCCESS, DB_FAIL, or error code */
333 static MY_ATTRIBUTE((warn_unused_result))
334 dberr_t
row_ins_clust_index_entry_by_modify(btr_pcur_t * pcur,ulint flags,ulint mode,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)335 row_ins_clust_index_entry_by_modify(
336 /*================================*/
337 	btr_pcur_t*	pcur,	/*!< in/out: a persistent cursor pointing
338 				to the clust_rec that is being modified. */
339 	ulint		flags,	/*!< in: undo logging and locking flags */
340 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
341 				depending on whether mtr holds just a leaf
342 				latch or also a tree latch */
343 	ulint**		offsets,/*!< out: offsets on cursor->page_cur.rec */
344 	mem_heap_t**	offsets_heap,
345 				/*!< in/out: pointer to memory heap that can
346 				be emptied, or NULL */
347 	mem_heap_t*	heap,	/*!< in/out: memory heap */
348 	const dtuple_t*	entry,	/*!< in: index entry to insert */
349 	que_thr_t*	thr,	/*!< in: query thread */
350 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
351 				latching any further pages */
352 {
353 	const rec_t*	rec;
354 	upd_t*		update;
355 	dberr_t		err = DB_SUCCESS;
356 	btr_cur_t*	cursor	= btr_pcur_get_btr_cur(pcur);
357 	TABLE*		mysql_table = NULL;
358 	ut_ad(dict_index_is_clust(cursor->index));
359 
360 	rec = btr_cur_get_rec(cursor);
361 
362 	ut_ad(rec_get_deleted_flag(rec,
363 				   dict_table_is_comp(cursor->index->table)));
364 
365 	/* Build an update vector containing all the fields to be modified;
366 	NOTE that this vector may NOT contain system columns trx_id or
367 	roll_ptr */
368 	if (thr->prebuilt != NULL) {
369 		mysql_table = thr->prebuilt->m_mysql_table;
370 		ut_ad(thr->prebuilt->trx == thr_get_trx(thr));
371 	}
372 
373 	update = row_upd_build_difference_binary(
374 		cursor->index, entry, rec, NULL, true,
375 		thr_get_trx(thr), heap, mysql_table, &err);
376 	if (err != DB_SUCCESS) {
377 		return(err);
378 	}
379 
380 	if (mode != BTR_MODIFY_TREE) {
381 		ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
382 
383 		/* Try optimistic updating of the record, keeping changes
384 		within the page */
385 
386 		err = btr_cur_optimistic_update(
387 			flags, cursor, offsets, offsets_heap, update, 0, thr,
388 			thr_get_trx(thr)->id, mtr);
389 		switch (err) {
390 		case DB_OVERFLOW:
391 		case DB_UNDERFLOW:
392 		case DB_ZIP_OVERFLOW:
393 			err = DB_FAIL;
394 		default:
395 			break;
396 		}
397 	} else {
398 		if (buf_LRU_buf_pool_running_out()) {
399 
400 			return(DB_LOCK_TABLE_FULL);
401 
402 		}
403 
404 		big_rec_t*	big_rec	= NULL;
405 
406 		err = btr_cur_pessimistic_update(
407 			flags | BTR_KEEP_POS_FLAG,
408 			cursor, offsets, offsets_heap, heap,
409 			&big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
410 
411 		if (big_rec) {
412 			ut_a(err == DB_SUCCESS);
413 
414 			DEBUG_SYNC_C("before_row_ins_upd_extern");
415 			err = btr_store_big_rec_extern_fields(
416 				pcur, update, *offsets, big_rec, mtr,
417 				BTR_STORE_INSERT_UPDATE);
418 			DEBUG_SYNC_C("after_row_ins_upd_extern");
419 			dtuple_big_rec_free(big_rec);
420 		}
421 	}
422 
423 	return(err);
424 }
425 
426 /*********************************************************************//**
427 Returns TRUE if in a cascaded update/delete an ancestor node of node
428 updates (not DELETE, but UPDATE) table.
429 @return TRUE if an ancestor updates table */
430 static
431 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)432 row_ins_cascade_ancestor_updates_table(
433 /*===================================*/
434 	que_node_t*	node,	/*!< in: node in a query graph */
435 	dict_table_t*	table)	/*!< in: table */
436 {
437 	que_node_t*	parent;
438 
439 	for (parent = que_node_get_parent(node);
440 	     que_node_get_type(parent) == QUE_NODE_UPDATE;
441 	     parent = que_node_get_parent(parent)) {
442 
443 		upd_node_t*	upd_node;
444 
445 		upd_node = static_cast<upd_node_t*>(parent);
446 
447 		if (upd_node->table == table && upd_node->is_delete == FALSE) {
448 
449 			return(TRUE);
450 		}
451 	}
452 
453 	return(FALSE);
454 }
455 
456 /*********************************************************************//**
457 Returns the number of ancestor UPDATE or DELETE nodes of a
458 cascaded update/delete node.
459 @return number of ancestors */
460 static MY_ATTRIBUTE((warn_unused_result))
461 ulint
row_ins_cascade_n_ancestors(que_node_t * node)462 row_ins_cascade_n_ancestors(
463 /*========================*/
464 	que_node_t*	node)	/*!< in: node in a query graph */
465 {
466 	que_node_t*	parent;
467 	ulint		n_ancestors = 0;
468 
469 	for (parent = que_node_get_parent(node);
470 	     que_node_get_type(parent) == QUE_NODE_UPDATE;
471 	     parent = que_node_get_parent(parent)) {
472 
473 		n_ancestors++;
474 	}
475 
476 	return(n_ancestors);
477 }
478 
479 /******************************************************************//**
480 Calculates the update vector node->cascade->update for a child table in
481 a cascaded update.
482 @return number of fields in the calculated update vector; the value
483 can also be 0 if no foreign key fields changed; the returned value is
484 ULINT_UNDEFINED if the column type in the child table is too short to
485 fit the new value in the parent table: that means the update fails */
486 static MY_ATTRIBUTE((warn_unused_result))
487 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)488 row_ins_cascade_calc_update_vec(
489 /*============================*/
490 	upd_node_t*	node,		/*!< in: update node of the parent
491 					table */
492 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
493 					type is != 0 */
494 	mem_heap_t*	heap,		/*!< in: memory heap to use as
495 					temporary storage */
496 	trx_t*		trx,		/*!< in: update transaction */
497 	ibool*		fts_col_affected)
498 					/*!< out: is FTS column affected */
499 {
500 	upd_node_t*     cascade         = node->cascade_node;
501 	dict_table_t*	table		= foreign->foreign_table;
502 	dict_index_t*	index		= foreign->foreign_index;
503 	upd_t*		update;
504 	dict_table_t*	parent_table;
505 	dict_index_t*	parent_index;
506 	upd_t*		parent_update;
507 	ulint		n_fields_updated;
508 	ulint		parent_field_no;
509 	ulint		i;
510 	ulint		j;
511 	ibool		doc_id_updated = FALSE;
512 	ulint		doc_id_pos = 0;
513 	doc_id_t	new_doc_id = FTS_NULL_DOC_ID;
514 
515 	ut_a(node);
516 	ut_a(foreign);
517 	ut_a(cascade);
518 	ut_a(table);
519 	ut_a(index);
520 
521 	/* Calculate the appropriate update vector which will set the fields
522 	in the child index record to the same value (possibly padded with
523 	spaces if the column is a fixed length CHAR or FIXBINARY column) as
524 	the referenced index record will get in the update. */
525 
526 	parent_table = node->table;
527 	ut_a(parent_table == foreign->referenced_table);
528 	parent_index = foreign->referenced_index;
529 	parent_update = node->update;
530 
531 	update = cascade->update;
532 
533 	update->info_bits = 0;
534 
535 	n_fields_updated = 0;
536 
537 	*fts_col_affected = FALSE;
538 
539 	if (table->fts) {
540 		doc_id_pos = dict_table_get_nth_col_pos(
541 			table, table->fts->doc_col);
542 	}
543 
544 	for (i = 0; i < foreign->n_fields; i++) {
545 
546 		parent_field_no = dict_table_get_nth_col_pos(
547 			parent_table,
548 			dict_index_get_nth_col_no(parent_index, i));
549 
550 		for (j = 0; j < parent_update->n_fields; j++) {
551 			const upd_field_t*	parent_ufield
552 				= &parent_update->fields[j];
553 
554 			if (parent_ufield->field_no == parent_field_no) {
555 
556 				ulint			min_size;
557 				const dict_col_t*	col;
558 				ulint			ufield_len;
559 				upd_field_t*		ufield;
560 
561 				col = dict_index_get_nth_col(index, i);
562 
563 				/* A field in the parent index record is
564 				updated. Let us make the update vector
565 				field for the child table. */
566 
567 				ufield = update->fields + n_fields_updated;
568 
569 				ufield->field_no
570 					= dict_table_get_nth_col_pos(
571 					table, dict_col_get_no(col));
572 
573 				ufield->orig_len = 0;
574 				ufield->exp = NULL;
575 
576 				ufield->new_val = parent_ufield->new_val;
577 				ufield_len = dfield_get_len(&ufield->new_val);
578 
579 				/* Clear the "external storage" flag */
580 				dfield_set_len(&ufield->new_val, ufield_len);
581 
582 				/* Do not allow a NOT NULL column to be
583 				updated as NULL */
584 
585 				if (dfield_is_null(&ufield->new_val)
586 				    && (col->prtype & DATA_NOT_NULL)) {
587 
588 					return(ULINT_UNDEFINED);
589 				}
590 
591 				/* If the new value would not fit in the
592 				column, do not allow the update */
593 
594 				if (!dfield_is_null(&ufield->new_val)
595 				    && dtype_get_at_most_n_mbchars(
596 					col->prtype, col->mbminmaxlen,
597 					col->len,
598 					ufield_len,
599 					static_cast<char*>(
600 						dfield_get_data(
601 							&ufield->new_val)))
602 				    < ufield_len) {
603 
604 					return(ULINT_UNDEFINED);
605 				}
606 
607 				/* If the parent column type has a different
608 				length than the child column type, we may
609 				need to pad with spaces the new value of the
610 				child column */
611 
612 				min_size = dict_col_get_min_size(col);
613 
614 				/* Because UNIV_SQL_NULL (the marker
615 				of SQL NULL values) exceeds all possible
616 				values of min_size, the test below will
617 				not hold for SQL NULL columns. */
618 
619 				if (min_size > ufield_len) {
620 
621 					byte*	pad;
622 					ulint	pad_len;
623 					byte*	padded_data;
624 					ulint	mbminlen;
625 
626 					padded_data = static_cast<byte*>(
627 						mem_heap_alloc(
628 							heap, min_size));
629 
630 					pad = padded_data + ufield_len;
631 					pad_len = min_size - ufield_len;
632 
633 					memcpy(padded_data,
634 					       dfield_get_data(&ufield
635 							       ->new_val),
636 					       ufield_len);
637 
638 					mbminlen = dict_col_get_mbminlen(col);
639 
640 					ut_ad(!(ufield_len % mbminlen));
641 					ut_ad(!(min_size % mbminlen));
642 
643 					if (mbminlen == 1
644 					    && dtype_get_charset_coll(
645 						    col->prtype)
646 					    == DATA_MYSQL_BINARY_CHARSET_COLL) {
647 						/* Do not pad BINARY columns */
648 						return(ULINT_UNDEFINED);
649 					}
650 
651 					row_mysql_pad_col(mbminlen,
652 							  pad, pad_len);
653 					dfield_set_data(&ufield->new_val,
654 							padded_data, min_size);
655 				}
656 
657 				/* Check whether the current column has
658 				FTS index on it */
659 				if (table->fts
660 				    && dict_table_is_fts_column(
661 					table->fts->indexes,
662 					dict_col_get_no(col),
663 					dict_col_is_virtual(col))
664 					!= ULINT_UNDEFINED) {
665 					*fts_col_affected = TRUE;
666 				}
667 
668 				/* If Doc ID is updated, check whether the
669 				Doc ID is valid */
670 				if (table->fts
671 				    && ufield->field_no == doc_id_pos) {
672 					doc_id_t	n_doc_id;
673 
674 					n_doc_id =
675 						table->fts->cache->next_doc_id;
676 
677 					new_doc_id = fts_read_doc_id(
678 						static_cast<const byte*>(
679 							dfield_get_data(
680 							&ufield->new_val)));
681 
682 					if (new_doc_id <= 0) {
683 						ib::error() << "FTS Doc ID"
684 							" must be larger than"
685 							" 0";
686 						return(ULINT_UNDEFINED);
687 					}
688 
689 					if (new_doc_id < n_doc_id) {
690 						ib::error() << "FTS Doc ID"
691 							" must be larger than "
692 							<< n_doc_id - 1
693 							<< " for table "
694 							<< table->name;
695 
696 						return(ULINT_UNDEFINED);
697 					}
698 
699 					*fts_col_affected = TRUE;
700 					doc_id_updated = TRUE;
701 				}
702 
703 				n_fields_updated++;
704 			}
705 		}
706 	}
707 
708 	/* Generate a new Doc ID if FTS index columns get updated */
709 	if (table->fts && *fts_col_affected) {
710 		if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
711 			doc_id_t	doc_id;
712 			doc_id_t*	next_doc_id;
713 			upd_field_t*	ufield;
714 
715 			next_doc_id = static_cast<doc_id_t*>(mem_heap_alloc(
716 				heap, sizeof(doc_id_t)));
717 
718 			ut_ad(!doc_id_updated);
719 			ufield = update->fields + n_fields_updated;
720 			fts_get_next_doc_id(table, next_doc_id);
721 			doc_id = fts_update_doc_id(table, ufield, next_doc_id);
722 			n_fields_updated++;
723 			fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
724 		} else  {
725 			if (doc_id_updated) {
726 				ut_ad(new_doc_id);
727 				fts_trx_add_op(trx, table, new_doc_id,
728 					       FTS_INSERT, NULL);
729 			} else {
730 				ib::error() << "FTS Doc ID must be updated"
731 					" along with FTS indexed column for"
732 					" table " << table->name;
733 				return(ULINT_UNDEFINED);
734 			}
735 		}
736 	}
737 
738 	update->n_fields = n_fields_updated;
739 
740 	return(n_fields_updated);
741 }
742 
743 /*********************************************************************//**
744 Set detailed error message associated with foreign key errors for
745 the given transaction. */
746 static
747 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)748 row_ins_set_detailed(
749 /*=================*/
750 	trx_t*		trx,		/*!< in: transaction */
751 	dict_foreign_t*	foreign)	/*!< in: foreign key constraint */
752 {
753 	ut_ad(!srv_read_only_mode);
754 
755 	mutex_enter(&srv_misc_tmpfile_mutex);
756 	rewind(srv_misc_tmpfile);
757 
758 	if (os_file_set_eof(srv_misc_tmpfile)) {
759 		ut_print_name(srv_misc_tmpfile, trx,
760 			      foreign->foreign_table_name);
761 		dict_print_info_on_foreign_key_in_create_format(
762 			srv_misc_tmpfile, trx, foreign, FALSE);
763 		trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
764 	} else {
765 		trx_set_detailed_error(trx, "temp file operation failed");
766 	}
767 
768 	mutex_exit(&srv_misc_tmpfile_mutex);
769 }
770 
771 /*********************************************************************//**
772 Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
773 and displays information about the given transaction.
774 The caller must release dict_foreign_err_mutex. */
775 static
776 void
row_ins_foreign_trx_print(trx_t * trx)777 row_ins_foreign_trx_print(
778 /*======================*/
779 	trx_t*	trx)	/*!< in: transaction */
780 {
781 	ulint	n_rec_locks;
782 	ulint	n_trx_locks;
783 	ulint	heap_size;
784 
785 	if (srv_read_only_mode) {
786 		return;
787 	}
788 
789 	lock_mutex_enter();
790 	n_rec_locks = lock_number_of_rows_locked(&trx->lock);
791 	n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
792 	heap_size = mem_heap_get_size(trx->lock.lock_heap);
793 	lock_mutex_exit();
794 
795 	trx_sys_mutex_enter();
796 
797 	mutex_enter(&dict_foreign_err_mutex);
798 	rewind(dict_foreign_err_file);
799 	ut_print_timestamp(dict_foreign_err_file);
800 	fputs(" Transaction:\n", dict_foreign_err_file);
801 
802 	trx_print_low(dict_foreign_err_file, trx, 600,
803 		      n_rec_locks, n_trx_locks, heap_size);
804 
805 	trx_sys_mutex_exit();
806 
807 	ut_ad(mutex_own(&dict_foreign_err_mutex));
808 }
809 
810 /*********************************************************************//**
811 Reports a foreign key error associated with an update or a delete of a
812 parent table index entry. */
813 static
814 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)815 row_ins_foreign_report_err(
816 /*=======================*/
817 	const char*	errstr,		/*!< in: error string from the viewpoint
818 					of the parent table */
819 	que_thr_t*	thr,		/*!< in: query thread whose run_node
820 					is an update node */
821 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
822 	const rec_t*	rec,		/*!< in: a matching index record in the
823 					child table */
824 	const dtuple_t*	entry)		/*!< in: index entry in the parent
825 					table */
826 {
827 	if (srv_read_only_mode) {
828 		return;
829 	}
830 
831 	FILE*	ef	= dict_foreign_err_file;
832 	trx_t*	trx	= thr_get_trx(thr);
833 
834 	row_ins_set_detailed(trx, foreign);
835 
836 	row_ins_foreign_trx_print(trx);
837 
838 	fputs("Foreign key constraint fails for table ", ef);
839 	ut_print_name(ef, trx, foreign->foreign_table_name);
840 	fputs(":\n", ef);
841 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
842 							TRUE);
843 	putc('\n', ef);
844 	fputs(errstr, ef);
845 	fprintf(ef, " in parent table, in index %s",
846 		foreign->referenced_index->name());
847 	if (entry) {
848 		fputs(" tuple:\n", ef);
849 		dtuple_print(ef, entry);
850 	}
851 	fputs("\nBut in child table ", ef);
852 	ut_print_name(ef, trx, foreign->foreign_table_name);
853 	fprintf(ef, ", in index %s", foreign->foreign_index->name());
854 	if (rec) {
855 		fputs(", there is a record:\n", ef);
856 		rec_print(ef, rec, foreign->foreign_index);
857 	} else {
858 		fputs(", the record is not available\n", ef);
859 	}
860 	putc('\n', ef);
861 
862 	mutex_exit(&dict_foreign_err_mutex);
863 }
864 
865 /*********************************************************************//**
866 Reports a foreign key error to dict_foreign_err_file when we are trying
867 to add an index entry to a child table. Note that the adding may be the result
868 of an update, too. */
869 static
870 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)871 row_ins_foreign_report_add_err(
872 /*===========================*/
873 	trx_t*		trx,		/*!< in: transaction */
874 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
875 	const rec_t*	rec,		/*!< in: a record in the parent table:
876 					it does not match entry because we
877 					have an error! */
878 	const dtuple_t*	entry)		/*!< in: index entry to insert in the
879 					child table */
880 {
881 	if (srv_read_only_mode) {
882 		return;
883 	}
884 
885 	FILE*	ef	= dict_foreign_err_file;
886 
887 	row_ins_set_detailed(trx, foreign);
888 
889 	row_ins_foreign_trx_print(trx);
890 
891 	fputs("Foreign key constraint fails for table ", ef);
892 	ut_print_name(ef, trx, foreign->foreign_table_name);
893 	fputs(":\n", ef);
894 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
895 							TRUE);
896 	fprintf(ef, "\nTrying to add in child table, in index %s",
897 		foreign->foreign_index->name());
898 	if (entry) {
899 		fputs(" tuple:\n", ef);
900 		/* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
901 		It would be better to only display the user columns. */
902 		dtuple_print(ef, entry);
903 	}
904 	fputs("\nBut in parent table ", ef);
905 	ut_print_name(ef, trx, foreign->referenced_table_name);
906 	fprintf(ef, ", in index %s,\n"
907 		"the closest match we can find is record:\n",
908 		foreign->referenced_index->name());
909 	if (rec && page_rec_is_supremum(rec)) {
910 		/* If the cursor ended on a supremum record, it is better
911 		to report the previous record in the error message, so that
912 		the user gets a more descriptive error message. */
913 		rec = page_rec_get_prev_const(rec);
914 	}
915 
916 	if (rec) {
917 		rec_print(ef, rec, foreign->referenced_index);
918 	}
919 	putc('\n', ef);
920 
921 	mutex_exit(&dict_foreign_err_mutex);
922 }
923 
924 /*********************************************************************//**
925 Invalidate the query cache for the given table. */
926 static
927 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)928 row_ins_invalidate_query_cache(
929 /*===========================*/
930 	que_thr_t*	thr,		/*!< in: query thread whose run_node
931 					is an update node */
932 	const char*	name)		/*!< in: table name prefixed with
933 					database name and a '/' character */
934 {
935 	ulint	len = strlen(name) + 1;
936 	innobase_invalidate_query_cache(thr_get_trx(thr), name, len);
937 }
938 
939 /** Fill virtual column information in cascade node for the child table.
940 @param[out]	cascade		child update node
941 @param[in]	rec		clustered rec of child table
942 @param[in]	index		clustered index of child table
943 @param[in]	node		parent update node
944 @param[in]	foreign		foreign key information
945 @param[out]	err		error code. */
946 static
947 void
row_ins_foreign_fill_virtual(upd_node_t * cascade,const rec_t * rec,dict_index_t * index,upd_node_t * node,dict_foreign_t * foreign,dberr_t * err)948 row_ins_foreign_fill_virtual(
949 	upd_node_t*		cascade,
950 	const rec_t*		rec,
951 	dict_index_t*		index,
952 	upd_node_t*		node,
953 	dict_foreign_t*		foreign,
954 	dberr_t*		err)
955 {
956 	row_ext_t*	ext;
957 	THD*		thd = current_thd;
958 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
959 	rec_offs_init(offsets_);
960 	const ulint*	offsets =
961 		rec_get_offsets(rec, index, offsets_,
962 				ULINT_UNDEFINED, &cascade->heap);
963 	mem_heap_t*	v_heap = NULL;
964 	upd_t*		update = cascade->update;
965 	ulint		n_v_fld = index->table->n_v_def;
966 	ulint		n_diff;
967 	upd_field_t*	upd_field;
968 	dict_vcol_set*	v_cols = foreign->v_cols;
969 
970 	update->old_vrow = row_build(
971 		ROW_COPY_POINTERS, index, rec,
972 		offsets, index->table, NULL, NULL,
973 		&ext, cascade->heap);
974 
975 	n_diff = update->n_fields;
976 
977 	update->n_fields += n_v_fld;
978 
979 	if (index->table->vc_templ == NULL) {
980 		/** This can occur when there is a cascading
981 		delete or update after restart. */
982 		innobase_init_vc_templ(index->table);
983 	}
984 
985 	for (ulint i = 0; i < n_v_fld; i++) {
986 
987 		dict_v_col_t*     col = dict_table_get_nth_v_col(
988 				index->table, i);
989 
990 		dict_vcol_set::iterator it = v_cols->find(col);
991 
992 		if (it == v_cols->end()) {
993 			continue;
994 		}
995 
996 		dfield_t*	vfield = innobase_get_computed_value(
997 				update->old_vrow, col, index,
998 				&v_heap, update->heap, NULL, thd, NULL,
999 				NULL, NULL, NULL);
1000 
1001 		if (vfield == NULL) {
1002 			*err = DB_COMPUTE_VALUE_FAILED;
1003 			goto func_exit;
1004 		}
1005 
1006 		upd_field = upd_get_nth_field(update, n_diff);
1007 
1008 		upd_field->old_v_val = static_cast<dfield_t*>(
1009 				mem_heap_alloc(cascade->heap,
1010 					sizeof *upd_field->old_v_val));
1011 
1012 		dfield_copy(upd_field->old_v_val, vfield);
1013 
1014 		upd_field_set_v_field_no(upd_field, i, index);
1015 
1016 		if (node->is_delete
1017 		    ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1018 		    : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1019 
1020 			dfield_set_null(&upd_field->new_val);
1021 		}
1022 
1023 		if (!node->is_delete
1024 		    && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1025 
1026 			dfield_t* new_vfield = innobase_get_computed_value(
1027 					update->old_vrow, col, index,
1028 					&v_heap, update->heap, NULL, thd,
1029 					NULL, NULL, node->update, foreign);
1030 
1031 			if (new_vfield == NULL) {
1032 				*err = DB_COMPUTE_VALUE_FAILED;
1033 				goto func_exit;
1034 			}
1035 
1036 			dfield_copy(&(upd_field->new_val), new_vfield);
1037 		}
1038 
1039 		n_diff++;
1040 	}
1041 
1042 	update->n_fields = n_diff;
1043 	*err = DB_SUCCESS;
1044 
1045 func_exit:
1046 	if (v_heap) {
1047 		mem_heap_free(v_heap);
1048 	}
1049 }
1050 
1051 /*********************************************************************//**
1052 Perform referential actions or checks when a parent row is deleted or updated
1053 and the constraint had an ON DELETE or ON UPDATE condition which was not
1054 RESTRICT.
1055 @return DB_SUCCESS, DB_LOCK_WAIT, or error code */
1056 static MY_ATTRIBUTE((warn_unused_result))
1057 dberr_t
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)1058 row_ins_foreign_check_on_constraint(
1059 /*================================*/
1060 	que_thr_t*	thr,		/*!< in: query thread whose run_node
1061 					is an update node */
1062 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
1063 					type is != 0 */
1064 	btr_pcur_t*	pcur,		/*!< in: cursor placed on a matching
1065 					index record in the child table */
1066 	dtuple_t*	entry,		/*!< in: index entry in the parent
1067 					table */
1068 	mtr_t*		mtr)		/*!< in: mtr holding the latch of pcur
1069 					page */
1070 {
1071 	upd_node_t*	node;
1072 	upd_node_t*	cascade;
1073 	dict_table_t*	table		= foreign->foreign_table;
1074 	dict_index_t*	index;
1075 	dict_index_t*	clust_index;
1076 	dtuple_t*	ref;
1077 	const rec_t*	rec;
1078 	const rec_t*	clust_rec;
1079 	const buf_block_t* clust_block;
1080 	upd_t*		update;
1081 	ulint		n_to_update;
1082 	dberr_t		err;
1083 	ulint		i;
1084 	trx_t*		trx;
1085 	mem_heap_t*	tmp_heap	= NULL;
1086 	doc_id_t	doc_id = FTS_NULL_DOC_ID;
1087 	ibool		fts_col_affacted = FALSE;
1088 
1089 	DBUG_ENTER("row_ins_foreign_check_on_constraint");
1090 	ut_a(thr);
1091 	ut_a(foreign);
1092 	ut_a(pcur);
1093 	ut_a(mtr);
1094 
1095 	trx = thr_get_trx(thr);
1096 
1097 	/* Since we are going to delete or update a row, we have to invalidate
1098 	the MySQL query cache for table. A deadlock of threads is not possible
1099 	here because the caller of this function does not hold any latches with
1100 	the mutex rank above the lock_sys_t::mutex. The query cache mutex
1101 	has a rank just above the lock_sys_t::mutex. */
1102 
1103 	row_ins_invalidate_query_cache(thr, table->name.m_name);
1104 
1105 	node = static_cast<upd_node_t*>(thr->run_node);
1106 
1107 	if (node->is_delete && 0 == (foreign->type
1108 				     & (DICT_FOREIGN_ON_DELETE_CASCADE
1109 					| DICT_FOREIGN_ON_DELETE_SET_NULL))) {
1110 
1111 		row_ins_foreign_report_err("Trying to delete",
1112 					   thr, foreign,
1113 					   btr_pcur_get_rec(pcur), entry);
1114 
1115 		DBUG_RETURN(DB_ROW_IS_REFERENCED);
1116 	}
1117 
1118 	if (!node->is_delete && 0 == (foreign->type
1119 				      & (DICT_FOREIGN_ON_UPDATE_CASCADE
1120 					 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
1121 
1122 		/* This is an UPDATE */
1123 
1124 		row_ins_foreign_report_err("Trying to update",
1125 					   thr, foreign,
1126 					   btr_pcur_get_rec(pcur), entry);
1127 
1128 		DBUG_RETURN(DB_ROW_IS_REFERENCED);
1129 	}
1130 
1131 	if (node->cascade_node == NULL) {
1132 		node->cascade_heap = mem_heap_create(128);
1133 		node->cascade_node = row_create_update_node_for_mysql(
1134 			table, node->cascade_heap);
1135 		que_node_set_parent(node->cascade_node, node);
1136 
1137 	}
1138 	cascade = node->cascade_node;
1139 	cascade->table = table;
1140 	cascade->foreign = foreign;
1141 
1142 	if (node->is_delete
1143 	    && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1144 		cascade->is_delete = TRUE;
1145 	} else {
1146 		cascade->is_delete = FALSE;
1147 
1148 		if (foreign->n_fields > cascade->update_n_fields) {
1149 			/* We have to make the update vector longer */
1150 
1151 			cascade->update = upd_create(foreign->n_fields,
1152 						     node->cascade_heap);
1153 			cascade->update_n_fields = foreign->n_fields;
1154 		}
1155 	}
1156 
1157 	/* We do not allow cyclic cascaded updating (DELETE is allowed,
1158 	but not UPDATE) of the same table, as this can lead to an infinite
1159 	cycle. Check that we are not updating the same table which is
1160 	already being modified in this cascade chain. We have to check
1161 	this also because the modification of the indexes of a 'parent'
1162 	table may still be incomplete, and we must avoid seeing the indexes
1163 	of the parent table in an inconsistent state! */
1164 
1165 	if (!cascade->is_delete
1166 	    && row_ins_cascade_ancestor_updates_table(cascade, table)) {
1167 
1168 		/* We do not know if this would break foreign key
1169 		constraints, but play safe and return an error */
1170 
1171 		err = DB_ROW_IS_REFERENCED;
1172 
1173 		row_ins_foreign_report_err(
1174 			"Trying an update, possibly causing a cyclic"
1175 			" cascaded update\n"
1176 			"in the child table,", thr, foreign,
1177 			btr_pcur_get_rec(pcur), entry);
1178 
1179 		goto nonstandard_exit_func;
1180 	}
1181 
1182 	if (row_ins_cascade_n_ancestors(cascade) >= FK_MAX_CASCADE_DEL) {
1183 		err = DB_FOREIGN_EXCEED_MAX_CASCADE;
1184 
1185 		row_ins_foreign_report_err(
1186 			"Trying a too deep cascaded delete or update\n",
1187 			thr, foreign, btr_pcur_get_rec(pcur), entry);
1188 
1189 		goto nonstandard_exit_func;
1190 	}
1191 
1192 	index = btr_pcur_get_btr_cur(pcur)->index;
1193 
1194 	ut_a(index == foreign->foreign_index);
1195 
1196 	rec = btr_pcur_get_rec(pcur);
1197 
1198 	tmp_heap = mem_heap_create(256);
1199 
1200 	if (dict_index_is_clust(index)) {
1201 		/* pcur is already positioned in the clustered index of
1202 		the child table */
1203 
1204 		clust_index = index;
1205 		clust_rec = rec;
1206 		clust_block = btr_pcur_get_block(pcur);
1207 	} else {
1208 		/* We have to look for the record in the clustered index
1209 		in the child table */
1210 
1211 		clust_index = dict_table_get_first_index(table);
1212 
1213 		ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1214 					tmp_heap);
1215 		btr_pcur_open_with_no_init(clust_index, ref,
1216 					   PAGE_CUR_LE, BTR_SEARCH_LEAF,
1217 					   cascade->pcur, 0, mtr);
1218 
1219 		clust_rec = btr_pcur_get_rec(cascade->pcur);
1220 		clust_block = btr_pcur_get_block(cascade->pcur);
1221 
1222 		if (!page_rec_is_user_rec(clust_rec)
1223 		    || btr_pcur_get_low_match(cascade->pcur)
1224 		    < dict_index_get_n_unique(clust_index)) {
1225 
1226 			ib::error() << "In cascade of a foreign key op index "
1227 				<< index->name
1228 				<< " of table " << index->table->name;
1229 
1230 			fputs("InnoDB: record ", stderr);
1231 			rec_print(stderr, rec, index);
1232 			fputs("\n"
1233 			      "InnoDB: clustered record ", stderr);
1234 			rec_print(stderr, clust_rec, clust_index);
1235 			fputs("\n"
1236 			      "InnoDB: Submit a detailed bug report to"
1237 			      " http://bugs.mysql.com\n", stderr);
1238 			ut_ad(0);
1239 			err = DB_SUCCESS;
1240 
1241 			goto nonstandard_exit_func;
1242 		}
1243 	}
1244 
1245 	/* Set an X-lock on the row to delete or update in the child table */
1246 
1247 	err = lock_table(0, table, LOCK_IX, thr);
1248 
1249 	if (err == DB_SUCCESS) {
1250 		/* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1251 		we already have a normal shared lock on the appropriate
1252 		gap if the search criterion was not unique */
1253 
1254 		err = lock_clust_rec_read_check_and_lock_alt(
1255 			0, clust_block, clust_rec, clust_index,
1256 			LOCK_X, LOCK_REC_NOT_GAP, thr);
1257 	}
1258 
1259 	if (err != DB_SUCCESS) {
1260 
1261 		goto nonstandard_exit_func;
1262 	}
1263 
1264 	if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1265 		/* This can happen if there is a circular reference of
1266 		rows such that cascading delete comes to delete a row
1267 		already in the process of being delete marked */
1268 		err = DB_SUCCESS;
1269 
1270 		goto nonstandard_exit_func;
1271 	}
1272 
1273 
1274 	if (table->fts) {
1275 		doc_id = fts_get_doc_id_from_rec(table, clust_rec,
1276 						 clust_index, tmp_heap);
1277 	}
1278 
1279 	if (cascade->is_delete && foreign->v_cols != NULL
1280 	    && foreign->v_cols->size() > 0
1281 	    && table->vc_templ == NULL) {
1282 		innobase_init_vc_templ(table);
1283 	}
1284 	if (node->is_delete
1285 	    ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1286 	    : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1287 		/* Build the appropriate update vector which sets
1288 		foreign->n_fields first fields in rec to SQL NULL */
1289 
1290 		update = cascade->update;
1291 
1292 		update->info_bits = 0;
1293 		update->n_fields = foreign->n_fields;
1294 		UNIV_MEM_INVALID(update->fields,
1295 				 update->n_fields * sizeof *update->fields);
1296 
1297 		for (i = 0; i < foreign->n_fields; i++) {
1298 			upd_field_t*	ufield = &update->fields[i];
1299 			ulint		col_no = dict_index_get_nth_col_no(
1300 						index, i);
1301 
1302 			ufield->field_no = dict_table_get_nth_col_pos(
1303 				table, col_no);
1304 			dict_col_t*	col = dict_table_get_nth_col(
1305 				table, col_no);
1306 			dict_col_copy_type(col, dfield_get_type(&ufield->new_val));
1307 
1308 			ufield->orig_len = 0;
1309 			ufield->exp = NULL;
1310 			dfield_set_null(&ufield->new_val);
1311 
1312 			if (table->fts && dict_table_is_fts_column(
1313 				table->fts->indexes,
1314 				dict_index_get_nth_col_no(index, i),
1315 				dict_col_is_virtual(
1316 					dict_index_get_nth_col(index, i)))
1317 			    != ULINT_UNDEFINED) {
1318 				fts_col_affacted = TRUE;
1319 			}
1320 		}
1321 
1322 		if (fts_col_affacted) {
1323 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1324 		}
1325 
1326 		if (foreign->v_cols != NULL
1327 		    && foreign->v_cols->size() > 0) {
1328 			row_ins_foreign_fill_virtual(
1329 				cascade, clust_rec, clust_index,
1330 				node, foreign, &err);
1331 
1332 			if (err != DB_SUCCESS) {
1333 				goto nonstandard_exit_func;
1334 			}
1335 		}
1336 
1337 	} else if (table->fts && cascade->is_delete) {
1338 		/* DICT_FOREIGN_ON_DELETE_CASCADE case */
1339 		for (i = 0; i < foreign->n_fields; i++) {
1340 			if (table->fts && dict_table_is_fts_column(
1341 				table->fts->indexes,
1342 				dict_index_get_nth_col_no(index, i),
1343 				dict_col_is_virtual(
1344 					dict_index_get_nth_col(index, i)))
1345 			    != ULINT_UNDEFINED) {
1346 				fts_col_affacted = TRUE;
1347 			}
1348 		}
1349 
1350 		if (fts_col_affacted) {
1351 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1352 		}
1353 	}
1354 
1355 	if (!node->is_delete
1356 	    && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1357 
1358 		/* Build the appropriate update vector which sets changing
1359 		foreign->n_fields first fields in rec to new values */
1360 
1361 		n_to_update = row_ins_cascade_calc_update_vec(
1362 			node, foreign, tmp_heap,
1363 			trx, &fts_col_affacted);
1364 
1365 
1366 		if (foreign->v_cols != NULL
1367 		    && foreign->v_cols->size() > 0) {
1368 			row_ins_foreign_fill_virtual(
1369 				cascade, clust_rec, clust_index,
1370 				node, foreign, &err);
1371 
1372 			if (err != DB_SUCCESS) {
1373 				goto nonstandard_exit_func;
1374 			}
1375 		}
1376 
1377 		if (n_to_update == ULINT_UNDEFINED) {
1378 			err = DB_ROW_IS_REFERENCED;
1379 
1380 			row_ins_foreign_report_err(
1381 				"Trying a cascaded update where the"
1382 				" updated value in the child\n"
1383 				"table would not fit in the length"
1384 				" of the column, or the value would\n"
1385 				"be NULL and the column is"
1386 				" declared as not NULL in the child table,",
1387 				thr, foreign, btr_pcur_get_rec(pcur), entry);
1388 
1389 			goto nonstandard_exit_func;
1390 		}
1391 
1392 		if (cascade->update->n_fields == 0) {
1393 
1394 			/* The update does not change any columns referred
1395 			to in this foreign key constraint: no need to do
1396 			anything */
1397 
1398 			err = DB_SUCCESS;
1399 
1400 			goto nonstandard_exit_func;
1401 		}
1402 
1403 		/* Mark the old Doc ID as deleted */
1404 		if (fts_col_affacted) {
1405 			ut_ad(table->fts);
1406 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1407 		}
1408 	}
1409 
1410 	/* Store pcur position and initialize or store the cascade node
1411 	pcur stored position */
1412 
1413 	btr_pcur_store_position(pcur, mtr);
1414 
1415 	if (index == clust_index) {
1416 		btr_pcur_copy_stored_position(cascade->pcur, pcur);
1417 	} else {
1418 		btr_pcur_store_position(cascade->pcur, mtr);
1419 	}
1420 
1421 	mtr_commit(mtr);
1422 
1423 	ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1424 
1425 	cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1426 
1427 	err = row_update_cascade_for_mysql(thr, cascade,
1428                                            foreign->foreign_table);
1429 
1430 	/* Release the data dictionary latch for a while, so that we do not
1431 	starve other threads from doing CREATE TABLE etc. if we have a huge
1432 	cascaded operation running. The counter n_foreign_key_checks_running
1433 	will prevent other users from dropping or ALTERing the table when we
1434 	release the latch. */
1435 
1436 	row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1437 
1438 	DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1439 
1440 	row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1441 
1442 	mtr_start(mtr);
1443 
1444 	/* Restore pcur position */
1445 
1446 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1447 
1448 	if (tmp_heap) {
1449 		mem_heap_free(tmp_heap);
1450 	}
1451 
1452 	DBUG_RETURN(err);
1453 
1454 nonstandard_exit_func:
1455 
1456 	if (tmp_heap) {
1457 		mem_heap_free(tmp_heap);
1458 	}
1459 
1460 	btr_pcur_store_position(pcur, mtr);
1461 
1462 	mtr_commit(mtr);
1463 	mtr_start(mtr);
1464 
1465 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1466 
1467 	DBUG_RETURN(err);
1468 }
1469 
1470 /*********************************************************************//**
1471 Sets a shared lock on a record. Used in locking possible duplicate key
1472 records and also in checking foreign key constraints.
1473 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1474 static
1475 dberr_t
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1476 row_ins_set_shared_rec_lock(
1477 /*========================*/
1478 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1479 					LOCK_REC_NOT_GAP type lock */
1480 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1481 	const rec_t*		rec,	/*!< in: record */
1482 	dict_index_t*		index,	/*!< in: index */
1483 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1484 	que_thr_t*		thr)	/*!< in: query thread */
1485 {
1486 	dberr_t	err;
1487 
1488 	ut_ad(rec_offs_validate(rec, index, offsets));
1489 
1490 	if (dict_index_is_clust(index)) {
1491 		err = lock_clust_rec_read_check_and_lock(
1492 			0, block, rec, index, offsets, LOCK_S, type, thr);
1493 	} else {
1494 		err = lock_sec_rec_read_check_and_lock(
1495 			0, block, rec, index, offsets, LOCK_S, type, thr);
1496 	}
1497 
1498 	return(err);
1499 }
1500 
1501 /*********************************************************************//**
1502 Sets a exclusive lock on a record. Used in locking possible duplicate key
1503 records
1504 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1505 static
1506 dberr_t
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1507 row_ins_set_exclusive_rec_lock(
1508 /*===========================*/
1509 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1510 					LOCK_REC_NOT_GAP type lock */
1511 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1512 	const rec_t*		rec,	/*!< in: record */
1513 	dict_index_t*		index,	/*!< in: index */
1514 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1515 	que_thr_t*		thr)	/*!< in: query thread */
1516 {
1517 	dberr_t	err;
1518 
1519 	ut_ad(rec_offs_validate(rec, index, offsets));
1520 
1521 	if (dict_index_is_clust(index)) {
1522 		err = lock_clust_rec_read_check_and_lock(
1523 			0, block, rec, index, offsets, LOCK_X, type, thr);
1524 	} else {
1525 		err = lock_sec_rec_read_check_and_lock(
1526 			0, block, rec, index, offsets, LOCK_X, type, thr);
1527 	}
1528 
1529 	return(err);
1530 }
1531 
1532 /* Decrement a counter in the destructor. */
1533 class ib_dec_in_dtor {
1534 public:
ib_dec_in_dtor(ulint & c)1535 	ib_dec_in_dtor(ulint& c): counter(c) {}
~ib_dec_in_dtor()1536 	~ib_dec_in_dtor() {
1537 		os_atomic_decrement_ulint(&counter, 1);
1538 	}
1539 private:
1540 	ulint&		counter;
1541 };
1542 
1543 /***************************************************************//**
1544 Checks if foreign key constraint fails for an index entry. Sets shared locks
1545 which lock either the success or the failure of the constraint. NOTE that
1546 the caller must have a shared latch on dict_operation_lock.
1547 @return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1548 dberr_t
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1549 row_ins_check_foreign_constraint(
1550 /*=============================*/
1551 	ibool		check_ref,/*!< in: TRUE if we want to check that
1552 				the referenced table is ok, FALSE if we
1553 				want to check the foreign key table */
1554 	dict_foreign_t*	foreign,/*!< in: foreign constraint; NOTE that the
1555 				tables mentioned in it must be in the
1556 				dictionary cache if they exist at all */
1557 	dict_table_t*	table,	/*!< in: if check_ref is TRUE, then the foreign
1558 				table, else the referenced table */
1559 	dtuple_t*	entry,	/*!< in: index entry for index */
1560 	que_thr_t*	thr)	/*!< in: query thread */
1561 {
1562 	dberr_t		err;
1563 	upd_node_t*	upd_node;
1564 	dict_table_t*	check_table;
1565 	dict_index_t*	check_index;
1566 	ulint		n_fields_cmp;
1567 	btr_pcur_t	pcur;
1568 	int		cmp;
1569 	mtr_t		mtr;
1570 	trx_t*		trx		= thr_get_trx(thr);
1571 	mem_heap_t*	heap		= NULL;
1572 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1573 	ulint*		offsets		= offsets_;
1574 
1575 	bool		skip_gap_lock;
1576 
1577 	skip_gap_lock = (trx->isolation_level <= TRX_ISO_READ_COMMITTED);
1578 
1579 	DBUG_ENTER("row_ins_check_foreign_constraint");
1580 
1581 	rec_offs_init(offsets_);
1582 
1583 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_S));
1584 
1585 	err = DB_SUCCESS;
1586 
1587 	if (trx->check_foreigns == FALSE) {
1588 		/* The user has suppressed foreign key checks currently for
1589 		this session */
1590 		goto exit_func;
1591 	}
1592 
1593 	/* If any of the foreign key fields in entry is SQL NULL, we
1594 	suppress the foreign key check: this is compatible with Oracle,
1595 	for example */
1596 	for (ulint i = 0; i < foreign->n_fields; i++) {
1597 		if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1598 			goto exit_func;
1599 		}
1600 	}
1601 
1602 	if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1603 		upd_node = static_cast<upd_node_t*>(thr->run_node);
1604 
1605 		if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1606 			/* If a cascaded update is done as defined by a
1607 			foreign key constraint, do not check that
1608 			constraint for the child row. In ON UPDATE CASCADE
1609 			the update of the parent row is only half done when
1610 			we come here: if we would check the constraint here
1611 			for the child row it would fail.
1612 
1613 			A QUESTION remains: if in the child table there are
1614 			several constraints which refer to the same parent
1615 			table, we should merge all updates to the child as
1616 			one update? And the updates can be contradictory!
1617 			Currently we just perform the update associated
1618 			with each foreign key constraint, one after
1619 			another, and the user has problems predicting in
1620 			which order they are performed. */
1621 
1622 			goto exit_func;
1623 		}
1624 	}
1625 
1626 	if (check_ref) {
1627 		check_table = foreign->referenced_table;
1628 		check_index = foreign->referenced_index;
1629 	} else {
1630 		check_table = foreign->foreign_table;
1631 		check_index = foreign->foreign_index;
1632 	}
1633 
1634 	if (check_table == NULL
1635 	    || check_table->ibd_file_missing
1636 	    || check_index == NULL
1637 	    || fil_space_is_being_truncated(check_table->space)) {
1638 
1639 		if (!srv_read_only_mode && check_ref) {
1640 			FILE*	ef = dict_foreign_err_file;
1641 
1642 			row_ins_set_detailed(trx, foreign);
1643 
1644 			row_ins_foreign_trx_print(trx);
1645 
1646 			fputs("Foreign key constraint fails for table ", ef);
1647 			ut_print_name(ef, trx,
1648 				      foreign->foreign_table_name);
1649 			fputs(":\n", ef);
1650 			dict_print_info_on_foreign_key_in_create_format(
1651 				ef, trx, foreign, TRUE);
1652 			fprintf(ef, "\nTrying to add to index %s tuple:\n",
1653 				foreign->foreign_index->name());
1654 			dtuple_print(ef, entry);
1655 			fputs("\nBut the parent table ", ef);
1656 			ut_print_name(ef, trx,
1657 				      foreign->referenced_table_name);
1658 			fputs("\nor its .ibd file does"
1659 			      " not currently exist!, or"
1660 			      " is undergoing truncate!\n", ef);
1661 			mutex_exit(&dict_foreign_err_mutex);
1662 
1663 			err = DB_NO_REFERENCED_ROW;
1664 		}
1665 
1666 		goto exit_func;
1667 	}
1668 
1669 	if (check_table != table) {
1670 		/* We already have a LOCK_IX on table, but not necessarily
1671 		on check_table */
1672 
1673 		err = lock_table(0, check_table, LOCK_IS, thr);
1674 
1675 		if (err != DB_SUCCESS) {
1676 
1677 			goto do_possible_lock_wait;
1678 		}
1679 	}
1680 
1681 	mtr_start(&mtr);
1682 
1683 	/* Store old value on n_fields_cmp */
1684 
1685 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1686 
1687 	dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1688 
1689 	btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1690 		      BTR_SEARCH_LEAF, &pcur, &mtr);
1691 
1692 	/* Scan index records and check if there is a matching record */
1693 
1694 	do {
1695 		const rec_t*		rec = btr_pcur_get_rec(&pcur);
1696 		const buf_block_t*	block = btr_pcur_get_block(&pcur);
1697 
1698 		if (page_rec_is_infimum(rec)) {
1699 
1700 			continue;
1701 		}
1702 
1703 		offsets = rec_get_offsets(rec, check_index,
1704 					  offsets, ULINT_UNDEFINED, &heap);
1705 
1706 		if (page_rec_is_supremum(rec)) {
1707 
1708 			if (skip_gap_lock) {
1709 
1710 				continue;
1711 			}
1712 
1713 			err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1714 							  rec, check_index,
1715 							  offsets, thr);
1716 			switch (err) {
1717 			case DB_SUCCESS_LOCKED_REC:
1718 			case DB_SUCCESS:
1719 				continue;
1720 			default:
1721 				goto end_scan;
1722 			}
1723 		}
1724 
1725 		cmp = cmp_dtuple_rec(entry, rec, offsets);
1726 
1727 		if (cmp == 0) {
1728 
1729 			ulint	lock_type;
1730 
1731 			lock_type = skip_gap_lock
1732 				? LOCK_REC_NOT_GAP
1733 				: LOCK_ORDINARY;
1734 
1735 			if (rec_get_deleted_flag(rec,
1736 						 rec_offs_comp(offsets))) {
1737 				err = row_ins_set_shared_rec_lock(
1738 					lock_type, block,
1739 					rec, check_index, offsets, thr);
1740 				switch (err) {
1741 				case DB_SUCCESS_LOCKED_REC:
1742 				case DB_SUCCESS:
1743 					break;
1744 				default:
1745 					goto end_scan;
1746 				}
1747 			} else {
1748 				/* Found a matching record. Lock only
1749 				a record because we can allow inserts
1750 				into gaps */
1751 
1752 				err = row_ins_set_shared_rec_lock(
1753 					LOCK_REC_NOT_GAP, block,
1754 					rec, check_index, offsets, thr);
1755 
1756 				switch (err) {
1757 				case DB_SUCCESS_LOCKED_REC:
1758 				case DB_SUCCESS:
1759 					break;
1760 				default:
1761 					goto end_scan;
1762 				}
1763 
1764 				if (check_ref) {
1765 					err = DB_SUCCESS;
1766 
1767 					goto end_scan;
1768 				} else if (foreign->type != 0) {
1769 					/* There is an ON UPDATE or ON DELETE
1770 					condition: check them in a separate
1771 					function */
1772 
1773 					err = row_ins_foreign_check_on_constraint(
1774 						thr, foreign, &pcur, entry,
1775 						&mtr);
1776 					if (err != DB_SUCCESS) {
1777 						/* Since reporting a plain
1778 						"duplicate key" error
1779 						message to the user in
1780 						cases where a long CASCADE
1781 						operation would lead to a
1782 						duplicate key in some
1783 						other table is very
1784 						confusing, map duplicate
1785 						key errors resulting from
1786 						FK constraints to a
1787 						separate error code. */
1788 
1789 						if (err == DB_DUPLICATE_KEY) {
1790 							err = DB_FOREIGN_DUPLICATE_KEY;
1791 						}
1792 
1793 						goto end_scan;
1794 					}
1795 
1796 					/* row_ins_foreign_check_on_constraint
1797 					may have repositioned pcur on a
1798 					different block */
1799 					block = btr_pcur_get_block(&pcur);
1800 				} else {
1801 					row_ins_foreign_report_err(
1802 						"Trying to delete or update",
1803 						thr, foreign, rec, entry);
1804 
1805 					err = DB_ROW_IS_REFERENCED;
1806 					goto end_scan;
1807 				}
1808 			}
1809 		} else {
1810 			ut_a(cmp < 0);
1811 
1812 			err = DB_SUCCESS;
1813 
1814 			if (!skip_gap_lock) {
1815 				err = row_ins_set_shared_rec_lock(
1816 					LOCK_GAP, block,
1817 					rec, check_index, offsets, thr);
1818 			}
1819 
1820 			switch (err) {
1821 			case DB_SUCCESS_LOCKED_REC:
1822 			case DB_SUCCESS:
1823 				if (check_ref) {
1824 					err = DB_NO_REFERENCED_ROW;
1825 					row_ins_foreign_report_add_err(
1826 						trx, foreign, rec, entry);
1827 				} else {
1828 					err = DB_SUCCESS;
1829 				}
1830 			default:
1831 				break;
1832 			}
1833 
1834 			goto end_scan;
1835 		}
1836 	} while (btr_pcur_move_to_next(&pcur, &mtr));
1837 
1838 	if (check_ref) {
1839 		row_ins_foreign_report_add_err(
1840 			trx, foreign, btr_pcur_get_rec(&pcur), entry);
1841 		err = DB_NO_REFERENCED_ROW;
1842 	} else {
1843 		err = DB_SUCCESS;
1844 	}
1845 
1846 end_scan:
1847 	btr_pcur_close(&pcur);
1848 
1849 	mtr_commit(&mtr);
1850 
1851 	/* Restore old value */
1852 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1853 
1854 do_possible_lock_wait:
1855 	if (err == DB_LOCK_WAIT) {
1856 		/* An object that will correctly decrement the FK check counter
1857 		when it goes out of this scope. */
1858 		ib_dec_in_dtor	dec(check_table->n_foreign_key_checks_running);
1859 
1860 		trx->error_state = err;
1861 
1862 		que_thr_stop_for_mysql(thr);
1863 
1864 		thr->lock_state = QUE_THR_LOCK_ROW;
1865 
1866 		/* To avoid check_table being dropped, increment counter */
1867 		os_atomic_increment_ulint(
1868 			&check_table->n_foreign_key_checks_running, 1);
1869 
1870 		trx_kill_blocking(trx);
1871 
1872 		lock_wait_suspend_thread(thr);
1873 
1874 		thr->lock_state = QUE_THR_LOCK_NOLOCK;
1875 
1876 		if(trx->error_state != DB_SUCCESS) {
1877 		    err = trx->error_state;
1878 		    goto exit_func;
1879 		}
1880 
1881 		DBUG_PRINT("to_be_dropped",
1882 			   ("table: %s", check_table->name.m_name));
1883 		if (check_table->to_be_dropped) {
1884 			/* The table is being dropped. We shall timeout
1885 			this operation */
1886 			err = DB_LOCK_WAIT_TIMEOUT;
1887 
1888 			goto exit_func;
1889 		}
1890 	}
1891 
1892 
1893 exit_func:
1894 	if (heap != NULL) {
1895 		mem_heap_free(heap);
1896 	}
1897 
1898 	DEBUG_SYNC_C("finished_scanning_index");
1899 	DBUG_RETURN(err);
1900 }
1901 
1902 /***************************************************************//**
1903 Checks if foreign key constraints fail for an index entry. If index
1904 is not mentioned in any constraint, this function does nothing,
1905 Otherwise does searches to the indexes of referenced tables and
1906 sets shared locks which lock either the success or the failure of
1907 a constraint.
1908 @return DB_SUCCESS or error code */
1909 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1910 dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1911 row_ins_check_foreign_constraints(
1912 /*==============================*/
1913 	dict_table_t*	table,	/*!< in: table */
1914 	dict_index_t*	index,	/*!< in: index */
1915 	dtuple_t*	entry,	/*!< in: index entry for index */
1916 	que_thr_t*	thr)	/*!< in: query thread */
1917 {
1918 	dict_foreign_t*	foreign;
1919 	dberr_t		err;
1920 	trx_t*		trx;
1921 	ibool		got_s_lock	= FALSE;
1922 
1923 	trx = thr_get_trx(thr);
1924 
1925 	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1926 			    "foreign_constraint_check_for_ins");
1927 
1928 	for (dict_foreign_set::iterator it = table->foreign_set.begin();
1929 	     it != table->foreign_set.end();
1930 	     ++it) {
1931 
1932 		foreign = *it;
1933 
1934 		if (foreign->foreign_index == index) {
1935 			dict_table_t*	ref_table = NULL;
1936 			dict_table_t*   foreign_table = foreign->foreign_table;
1937 			dict_table_t*	referenced_table
1938 						= foreign->referenced_table;
1939 
1940 			if (referenced_table == NULL) {
1941 
1942 				ref_table = dict_table_open_on_name(
1943 					foreign->referenced_table_name_lookup,
1944 					FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1945 			}
1946 
1947 			if (0 == trx->dict_operation_lock_mode) {
1948 				got_s_lock = TRUE;
1949 
1950 				row_mysql_freeze_data_dictionary(trx);
1951 			}
1952 
1953                         if (referenced_table) {
1954 				os_atomic_increment_ulint(
1955 					&foreign_table->n_foreign_key_checks_running, 1);
1956                         }
1957 
1958 			/* NOTE that if the thread ends up waiting for a lock
1959 			we will release dict_operation_lock temporarily!
1960 			But the counter on the table protects the referenced
1961 			table from being dropped while the check is running. */
1962 
1963 			err = row_ins_check_foreign_constraint(
1964 				TRUE, foreign, table, entry, thr);
1965 
1966                         if (referenced_table) {
1967 				os_atomic_decrement_ulint(
1968 					&foreign_table->n_foreign_key_checks_running, 1);
1969                         }
1970 
1971 			if (got_s_lock) {
1972 				row_mysql_unfreeze_data_dictionary(trx);
1973 			}
1974 
1975 			if (ref_table != NULL) {
1976 				dict_table_close(ref_table, FALSE, FALSE);
1977 			}
1978 
1979 			if (err != DB_SUCCESS) {
1980 
1981 				return(err);
1982 			}
1983 		}
1984 	}
1985 
1986 	return(DB_SUCCESS);
1987 }
1988 
1989 /***************************************************************//**
1990 Checks if a unique key violation to rec would occur at the index entry
1991 insert.
1992 @return TRUE if error */
1993 static
1994 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1995 row_ins_dupl_error_with_rec(
1996 /*========================*/
1997 	const rec_t*	rec,	/*!< in: user record; NOTE that we assume
1998 				that the caller already has a record lock on
1999 				the record! */
2000 	const dtuple_t*	entry,	/*!< in: entry to insert */
2001 	dict_index_t*	index,	/*!< in: index */
2002 	const ulint*	offsets)/*!< in: rec_get_offsets(rec, index) */
2003 {
2004 	ulint	matched_fields;
2005 	ulint	n_unique;
2006 	ulint	i;
2007 
2008 	ut_ad(rec_offs_validate(rec, index, offsets));
2009 
2010 	n_unique = dict_index_get_n_unique(index);
2011 
2012 	matched_fields = 0;
2013 
2014 	cmp_dtuple_rec_with_match(entry, rec, offsets, &matched_fields);
2015 
2016 	if (matched_fields < n_unique) {
2017 
2018 		return(FALSE);
2019 	}
2020 
2021 	/* In a unique secondary index we allow equal key values if they
2022 	contain SQL NULLs */
2023 
2024 	if (!dict_index_is_clust(index) && !index->nulls_equal) {
2025 
2026 		for (i = 0; i < n_unique; i++) {
2027 			if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
2028 
2029 				return(FALSE);
2030 			}
2031 		}
2032 	}
2033 
2034 	return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2035 }
2036 
2037 /***************************************************************//**
2038 Scans a unique non-clustered index at a given index entry to determine
2039 whether a uniqueness violation has occurred for the key value of the entry.
2040 Set shared locks on possible duplicate records.
2041 @return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
2042 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2043 dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)2044 row_ins_scan_sec_index_for_duplicate(
2045 /*=================================*/
2046 	ulint		flags,	/*!< in: undo logging and locking flags */
2047 	dict_index_t*	index,	/*!< in: non-clustered unique index */
2048 	dtuple_t*	entry,	/*!< in: index entry */
2049 	que_thr_t*	thr,	/*!< in: query thread */
2050 	bool		s_latch,/*!< in: whether index->lock is being held */
2051 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
2052 	mem_heap_t*	offsets_heap)
2053 				/*!< in/out: memory heap that can be emptied */
2054 {
2055 	ulint		n_unique;
2056 	int		cmp;
2057 	ulint		n_fields_cmp;
2058 	btr_pcur_t	pcur;
2059 	dberr_t		err		= DB_SUCCESS;
2060 	ulint		allow_duplicates;
2061 	ulint*		offsets		= NULL;
2062 	DBUG_ENTER("row_ins_scan_sec_index_for_duplicate");
2063 
2064 
2065 	ut_ad(s_latch == rw_lock_own_flagged(
2066 			&index->lock, RW_LOCK_FLAG_S | RW_LOCK_FLAG_SX));
2067 
2068 	n_unique = dict_index_get_n_unique(index);
2069 
2070 	/* If the secondary index is unique, but one of the fields in the
2071 	n_unique first fields is NULL, a unique key violation cannot occur,
2072 	since we define NULL != NULL in this case */
2073 
2074 	if (!index->nulls_equal) {
2075 		for (ulint i = 0; i < n_unique; i++) {
2076 			if (UNIV_SQL_NULL == dfield_get_len(
2077 					dtuple_get_nth_field(entry, i))) {
2078 
2079 				DBUG_RETURN(DB_SUCCESS);
2080 			}
2081 		}
2082 	}
2083 
2084 	/* Store old value on n_fields_cmp */
2085 
2086 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
2087 
2088 	dtuple_set_n_fields_cmp(entry, n_unique);
2089 
2090 	btr_pcur_open(index, entry, PAGE_CUR_GE,
2091 		      s_latch
2092 		      ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED
2093 		      : BTR_SEARCH_LEAF,
2094 		      &pcur, mtr);
2095 
2096 	allow_duplicates = thr_get_trx(thr)->duplicates;
2097 
2098 	/* Scan index records and check if there is a duplicate */
2099 
2100 	do {
2101 		const rec_t*		rec	= btr_pcur_get_rec(&pcur);
2102 		const buf_block_t*	block	= btr_pcur_get_block(&pcur);
2103 		const ulint		lock_type = LOCK_ORDINARY;
2104 
2105 		if (page_rec_is_infimum(rec)) {
2106 
2107 			continue;
2108 		}
2109 
2110 		offsets = rec_get_offsets(rec, index, offsets,
2111 					  ULINT_UNDEFINED, &offsets_heap);
2112 
2113 		if (flags & BTR_NO_LOCKING_FLAG) {
2114 			/* Set no locks when applying log
2115 			in online table rebuild. */
2116 		} else if (allow_duplicates) {
2117 
2118 			/* If the SQL-query will update or replace
2119 			duplicate key we will take X-lock for
2120 			duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2121 			INSERT ON DUPLICATE KEY UPDATE). */
2122 
2123 			err = row_ins_set_exclusive_rec_lock(
2124 				lock_type, block, rec, index, offsets, thr);
2125 		} else {
2126 
2127 			err = row_ins_set_shared_rec_lock(
2128 				lock_type, block, rec, index, offsets, thr);
2129 		}
2130 
2131 		switch (err) {
2132 		case DB_SUCCESS_LOCKED_REC:
2133 			err = DB_SUCCESS;
2134 		case DB_SUCCESS:
2135 			break;
2136 		default:
2137 			goto end_scan;
2138 		}
2139 
2140 		if (page_rec_is_supremum(rec)) {
2141 
2142 			continue;
2143 		}
2144 
2145 		cmp = cmp_dtuple_rec(entry, rec, offsets);
2146 
2147 		if (cmp == 0 && !index->allow_duplicates) {
2148 			if (row_ins_dupl_error_with_rec(rec, entry,
2149 							index, offsets)) {
2150 				err = DB_DUPLICATE_KEY;
2151 
2152 				thr_get_trx(thr)->error_index = index;
2153 
2154 				/* If the duplicate is on hidden FTS_DOC_ID,
2155 				state so in the error log */
2156 				if (index == index->table->fts_doc_id_index
2157 				    && DICT_TF2_FLAG_IS_SET(
2158 					index->table,
2159 					DICT_TF2_FTS_HAS_DOC_ID)) {
2160 
2161 					ib::error() << "Duplicate FTS_DOC_ID"
2162 						" value on table "
2163 						<< index->table->name;
2164 				}
2165 
2166 				goto end_scan;
2167 			}
2168 		} else {
2169 			ut_a(cmp < 0 || index->allow_duplicates);
2170 			goto end_scan;
2171 		}
2172 	} while (btr_pcur_move_to_next(&pcur, mtr));
2173 
2174 end_scan:
2175 	/* Restore old value */
2176 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2177 
2178 	DBUG_RETURN(err);
2179 }
2180 
2181 /** Checks for a duplicate when the table is being rebuilt online.
2182 @retval DB_SUCCESS when no duplicate is detected
2183 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2184 a newer version of entry (the entry should not be inserted)
2185 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2186 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2187 dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,ulint * offsets)2188 row_ins_duplicate_online(
2189 /*=====================*/
2190 	ulint		n_uniq,	/*!< in: offset of DB_TRX_ID */
2191 	const dtuple_t*	entry,	/*!< in: entry that is being inserted */
2192 	const rec_t*	rec,	/*!< in: clustered index record */
2193 	ulint*		offsets)/*!< in/out: rec_get_offsets(rec) */
2194 {
2195 	ulint	fields	= 0;
2196 
2197 	/* During rebuild, there should not be any delete-marked rows
2198 	in the new table. */
2199 	ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2200 	ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2201 
2202 	/* Compare the PRIMARY KEY fields and the
2203 	DB_TRX_ID, DB_ROLL_PTR. */
2204 	cmp_dtuple_rec_with_match_low(
2205 		entry, rec, offsets, n_uniq + 2, &fields);
2206 
2207 	if (fields < n_uniq) {
2208 		/* Not a duplicate. */
2209 		return(DB_SUCCESS);
2210 	}
2211 
2212 	if (fields == n_uniq + 2) {
2213 		/* rec is an exact match of entry. */
2214 		return(DB_SUCCESS_LOCKED_REC);
2215 	}
2216 
2217 	return(DB_DUPLICATE_KEY);
2218 }
2219 
2220 /** Checks for a duplicate when the table is being rebuilt online.
2221 @retval DB_SUCCESS when no duplicate is detected
2222 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2223 a newer version of entry (the entry should not be inserted)
2224 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2225 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2226 dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2227 row_ins_duplicate_error_in_clust_online(
2228 /*====================================*/
2229 	ulint		n_uniq,	/*!< in: offset of DB_TRX_ID */
2230 	const dtuple_t*	entry,	/*!< in: entry that is being inserted */
2231 	const btr_cur_t*cursor,	/*!< in: cursor on insert position */
2232 	ulint**		offsets,/*!< in/out: rec_get_offsets(rec) */
2233 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2234 {
2235 	dberr_t		err	= DB_SUCCESS;
2236 	const rec_t*	rec	= btr_cur_get_rec(cursor);
2237 
2238 	if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2239 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2240 					   ULINT_UNDEFINED, heap);
2241 		err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2242 		if (err != DB_SUCCESS) {
2243 			return(err);
2244 		}
2245 	}
2246 
2247 	rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2248 
2249 	if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2250 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2251 					   ULINT_UNDEFINED, heap);
2252 		err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2253 	}
2254 
2255 	return(err);
2256 }
2257 
2258 /***************************************************************//**
2259 Checks if a unique key violation error would occur at an index entry
2260 insert. Sets shared locks on possible duplicate records. Works only
2261 for a clustered index!
2262 @retval DB_SUCCESS if no error
2263 @retval DB_DUPLICATE_KEY if error,
2264 @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2265 record
2266 @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2267 in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2268 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2269 dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2270 row_ins_duplicate_error_in_clust(
2271 /*=============================*/
2272 	ulint		flags,	/*!< in: undo logging and locking flags */
2273 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
2274 	const dtuple_t*	entry,	/*!< in: entry to insert */
2275 	que_thr_t*	thr,	/*!< in: query thread */
2276 	mtr_t*		mtr)	/*!< in: mtr */
2277 {
2278 	dberr_t	err;
2279 	rec_t*	rec;
2280 	ulint	n_unique;
2281 	trx_t*	trx		= thr_get_trx(thr);
2282 	mem_heap_t*heap		= NULL;
2283 	ulint	offsets_[REC_OFFS_NORMAL_SIZE];
2284 	ulint*	offsets		= offsets_;
2285 	rec_offs_init(offsets_);
2286 
2287 	UT_NOT_USED(mtr);
2288 
2289 	ut_ad(dict_index_is_clust(cursor->index));
2290 
2291 	/* NOTE: For unique non-clustered indexes there may be any number
2292 	of delete marked records with the same value for the non-clustered
2293 	index key (remember multiversioning), and which differ only in
2294 	the row refererence part of the index record, containing the
2295 	clustered index key fields. For such a secondary index record,
2296 	to avoid race condition, we must FIRST do the insertion and after
2297 	that check that the uniqueness condition is not breached! */
2298 
2299 	/* NOTE: A problem is that in the B-tree node pointers on an
2300 	upper level may match more to the entry than the actual existing
2301 	user records on the leaf level. So, even if low_match would suggest
2302 	that a duplicate key violation may occur, this may not be the case. */
2303 
2304 	n_unique = dict_index_get_n_unique(cursor->index);
2305 
2306 	if (cursor->low_match >= n_unique) {
2307 
2308 		rec = btr_cur_get_rec(cursor);
2309 
2310 		if (!page_rec_is_infimum(rec)) {
2311 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2312 						  ULINT_UNDEFINED, &heap);
2313 
2314 			/* We set a lock on the possible duplicate: this
2315 			is needed in logical logging of MySQL to make
2316 			sure that in roll-forward we get the same duplicate
2317 			errors as in original execution */
2318 
2319 			if (flags & BTR_NO_LOCKING_FLAG) {
2320 				/* Do nothing if no-locking is set */
2321 				err = DB_SUCCESS;
2322 			} else if (trx->duplicates) {
2323 
2324 				/* If the SQL-query will update or replace
2325 				duplicate key we will take X-lock for
2326 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2327 				INSERT ON DUPLICATE KEY UPDATE). */
2328 
2329 				err = row_ins_set_exclusive_rec_lock(
2330 					LOCK_REC_NOT_GAP,
2331 					btr_cur_get_block(cursor),
2332 					rec, cursor->index, offsets, thr);
2333 			} else {
2334 
2335 				err = row_ins_set_shared_rec_lock(
2336 					LOCK_REC_NOT_GAP,
2337 					btr_cur_get_block(cursor), rec,
2338 					cursor->index, offsets, thr);
2339 			}
2340 
2341 			switch (err) {
2342 			case DB_SUCCESS_LOCKED_REC:
2343 			case DB_SUCCESS:
2344 				break;
2345 			default:
2346 				goto func_exit;
2347 			}
2348 
2349 			if (row_ins_dupl_error_with_rec(
2350 				    rec, entry, cursor->index, offsets)) {
2351 duplicate:
2352 				trx->error_index = cursor->index;
2353 				err = DB_DUPLICATE_KEY;
2354 				goto func_exit;
2355 			}
2356 		}
2357 	}
2358 
2359 	if (cursor->up_match >= n_unique) {
2360 
2361 		rec = page_rec_get_next(btr_cur_get_rec(cursor));
2362 
2363 		if (!page_rec_is_supremum(rec)) {
2364 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2365 						  ULINT_UNDEFINED, &heap);
2366 
2367 			if (trx->duplicates) {
2368 
2369 				/* If the SQL-query will update or replace
2370 				duplicate key we will take X-lock for
2371 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2372 				INSERT ON DUPLICATE KEY UPDATE). */
2373 
2374 				err = row_ins_set_exclusive_rec_lock(
2375 					LOCK_REC_NOT_GAP,
2376 					btr_cur_get_block(cursor),
2377 					rec, cursor->index, offsets, thr);
2378 			} else {
2379 
2380 				err = row_ins_set_shared_rec_lock(
2381 					LOCK_REC_NOT_GAP,
2382 					btr_cur_get_block(cursor),
2383 					rec, cursor->index, offsets, thr);
2384 			}
2385 
2386 			switch (err) {
2387 			case DB_SUCCESS_LOCKED_REC:
2388 			case DB_SUCCESS:
2389 				break;
2390 			default:
2391 				goto func_exit;
2392 			}
2393 
2394 			if (row_ins_dupl_error_with_rec(
2395 				    rec, entry, cursor->index, offsets)) {
2396 				goto duplicate;
2397 			}
2398 		}
2399 
2400 		/* This should never happen */
2401 		ut_error;
2402 	}
2403 
2404 	err = DB_SUCCESS;
2405 func_exit:
2406 	if (UNIV_LIKELY_NULL(heap)) {
2407 		mem_heap_free(heap);
2408 	}
2409 	return(err);
2410 }
2411 
2412 /***************************************************************//**
2413 Checks if an index entry has long enough common prefix with an
2414 existing record so that the intended insert of the entry must be
2415 changed to a modify of the existing record. In the case of a clustered
2416 index, the prefix must be n_unique fields long. In the case of a
2417 secondary index, all fields must be equal.  InnoDB never updates
2418 secondary index records in place, other than clearing or setting the
2419 delete-mark flag. We could be able to update the non-unique fields
2420 of a unique secondary index record by checking the cursor->up_match,
2421 but we do not do so, because it could have some locking implications.
2422 @return TRUE if the existing record should be updated; FALSE if not */
2423 UNIV_INLINE
2424 ibool
row_ins_must_modify_rec(const btr_cur_t * cursor)2425 row_ins_must_modify_rec(
2426 /*====================*/
2427 	const btr_cur_t*	cursor)	/*!< in: B-tree cursor */
2428 {
2429 	/* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2430 	Because node pointers on upper levels of the B-tree may match more
2431 	to entry than to actual user records on the leaf level, we
2432 	have to check if the candidate record is actually a user record.
2433 	A clustered index node pointer contains index->n_unique first fields,
2434 	and a secondary index node pointer contains all index fields. */
2435 
2436 	return(cursor->low_match
2437 	       >= dict_index_get_n_unique_in_tree(cursor->index)
2438 	       && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2439 }
2440 
2441 /***************************************************************//**
2442 Tries to insert an entry into a clustered index, ignoring foreign key
2443 constraints. If a record with the same unique key is found, the other
2444 record is necessarily marked deleted by a committed transaction, or a
2445 unique key violation error occurs. The delete marked record is then
2446 updated to an existing record, and we must write an undo log record on
2447 the delete marked record.
2448 @retval DB_SUCCESS on success
2449 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2450 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2451 @return error code */
2452 dberr_t
row_ins_clust_index_entry_low(ulint flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,ulint n_ext,que_thr_t * thr,bool dup_chk_only)2453 row_ins_clust_index_entry_low(
2454 /*==========================*/
2455 	ulint		flags,	/*!< in: undo logging and locking flags */
2456 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2457 				depending on whether we wish optimistic or
2458 				pessimistic descent down the index tree */
2459 	dict_index_t*	index,	/*!< in: clustered index */
2460 	ulint		n_uniq,	/*!< in: 0 or index->n_uniq */
2461 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2462 	ulint		n_ext,	/*!< in: number of externally stored columns */
2463 	que_thr_t*	thr,	/*!< in: query thread */
2464 	bool		dup_chk_only)
2465 				/*!< in: if true, just do duplicate check
2466 				and return. don't execute actual insert. */
2467 {
2468 	btr_pcur_t	pcur;
2469 	btr_cur_t*	cursor;
2470 	dberr_t		err		= DB_SUCCESS;
2471 	big_rec_t*	big_rec		= NULL;
2472 	mtr_t		mtr;
2473 	mem_heap_t*	offsets_heap	= NULL;
2474 	ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2475 	ulint*          offsets         = offsets_;
2476 	rec_offs_init(offsets_);
2477 
2478 	DBUG_ENTER("row_ins_clust_index_entry_low");
2479 
2480 	ut_ad(dict_index_is_clust(index));
2481 	ut_ad(!dict_index_is_unique(index)
2482 	      || n_uniq == dict_index_get_n_unique(index));
2483 	ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2484 	ut_ad(!thr_get_trx(thr)->in_rollback);
2485 
2486 	mtr_start(&mtr);
2487 	mtr.set_named_space(index->space);
2488 
2489 	if (dict_table_is_temporary(index->table)) {
2490 		/* Disable REDO logging as the lifetime of temp-tables is
2491 		limited to server or connection lifetime and so REDO
2492 		information is not needed on restart for recovery.
2493 		Disable locking as temp-tables are local to a connection. */
2494 
2495 		ut_ad(flags & BTR_NO_LOCKING_FLAG);
2496 		ut_ad(!dict_table_is_intrinsic(index->table)
2497 		      || (flags & BTR_NO_UNDO_LOG_FLAG));
2498 
2499 		mtr.set_log_mode(MTR_LOG_NO_REDO);
2500 	}
2501 
2502 	if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2503 		mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
2504 		mtr_s_lock(dict_index_get_lock(index), &mtr);
2505 	}
2506 
2507 	/* Note that we use PAGE_CUR_LE as the search mode, because then
2508 	the function will return in both low_match and up_match of the
2509 	cursor sensible values */
2510 	btr_pcur_open(index, entry, PAGE_CUR_LE, mode, &pcur, &mtr);
2511 	cursor = btr_pcur_get_btr_cur(&pcur);
2512 	cursor->thr = thr;
2513 
2514 	ut_ad(!dict_table_is_intrinsic(index->table)
2515 	      || cursor->page_cur.block->made_dirty_with_no_latch);
2516 
2517 #ifdef UNIV_DEBUG
2518 	{
2519 		page_t*	page = btr_cur_get_page(cursor);
2520 		rec_t*	first_rec = page_rec_get_next(
2521 			page_get_infimum_rec(page));
2522 
2523 		ut_ad(page_rec_is_supremum(first_rec)
2524 		      || rec_n_fields_is_sane(index, first_rec, entry));
2525 	}
2526 #endif /* UNIV_DEBUG */
2527 
2528 	/* Allowing duplicates in clustered index is currently enabled
2529 	only for intrinsic table and caller understand the limited
2530 	operation that can be done in this case. */
2531 	ut_ad(!index->allow_duplicates
2532 	      || (index->allow_duplicates
2533 		  && dict_table_is_intrinsic(index->table)));
2534 
2535 	if (!index->allow_duplicates
2536 	    && n_uniq
2537 	    && (cursor->up_match >= n_uniq || cursor->low_match >= n_uniq)) {
2538 
2539 		if (flags
2540 		    == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2541 			| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
2542 			/* Set no locks when applying log
2543 			in online table rebuild. Only check for duplicates. */
2544 			err = row_ins_duplicate_error_in_clust_online(
2545 				n_uniq, entry, cursor,
2546 				&offsets, &offsets_heap);
2547 
2548 			switch (err) {
2549 			case DB_SUCCESS:
2550 				break;
2551 			default:
2552 				ut_ad(0);
2553 				/* fall through */
2554 			case DB_SUCCESS_LOCKED_REC:
2555 			case DB_DUPLICATE_KEY:
2556 				thr_get_trx(thr)->error_index = cursor->index;
2557 			}
2558 		} else {
2559 			/* Note that the following may return also
2560 			DB_LOCK_WAIT */
2561 
2562 			err = row_ins_duplicate_error_in_clust(
2563 				flags, cursor, entry, thr, &mtr);
2564 		}
2565 
2566 		if (err != DB_SUCCESS) {
2567 err_exit:
2568 			mtr_commit(&mtr);
2569 			goto func_exit;
2570 		}
2571 	}
2572 
2573 	if (dup_chk_only) {
2574 		mtr_commit(&mtr);
2575 		goto func_exit;
2576 	}
2577 
2578 	/* Note: Allowing duplicates would qualify for modification of
2579 	an existing record as the new entry is exactly same as old entry.
2580 	Avoid this check if allow duplicates is enabled. */
2581 	if (!index->allow_duplicates && row_ins_must_modify_rec(cursor)) {
2582 		/* There is already an index entry with a long enough common
2583 		prefix, we must convert the insert into a modify of an
2584 		existing record */
2585 		mem_heap_t*	entry_heap	= mem_heap_create(1024);
2586 
2587 		/* If the existing record is being modified and the new record
2588 		doesn't fit the provided slot then existing record is added
2589 		to free list and new record is inserted. This also means
2590 		cursor that we have cached for SELECT is now invalid. */
2591 		if(index->last_sel_cur) {
2592 			index->last_sel_cur->invalid = true;
2593 		}
2594 
2595 		err = row_ins_clust_index_entry_by_modify(
2596 			&pcur, flags, mode, &offsets, &offsets_heap,
2597 			entry_heap, entry, thr, &mtr);
2598 
2599 		if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2600 			row_log_table_insert(btr_cur_get_rec(cursor), entry,
2601 					     index, offsets);
2602 		}
2603 
2604 		mtr_commit(&mtr);
2605 		mem_heap_free(entry_heap);
2606 	} else {
2607 		rec_t*	insert_rec;
2608 
2609 		if (mode != BTR_MODIFY_TREE) {
2610 			ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2611 			      == BTR_MODIFY_LEAF);
2612 			err = btr_cur_optimistic_insert(
2613 				flags, cursor, &offsets, &offsets_heap,
2614 				entry, &insert_rec, &big_rec,
2615 				n_ext, thr, &mtr);
2616 		} else {
2617 			if (buf_LRU_buf_pool_running_out()) {
2618 
2619 				err = DB_LOCK_TABLE_FULL;
2620 				goto err_exit;
2621 			}
2622 
2623 			DEBUG_SYNC_C("before_insert_pessimitic_row_ins_clust");
2624 
2625 			err = btr_cur_optimistic_insert(
2626 				flags, cursor,
2627 				&offsets, &offsets_heap,
2628 				entry, &insert_rec, &big_rec,
2629 				n_ext, thr, &mtr);
2630 
2631 			if (err == DB_FAIL) {
2632 				err = btr_cur_pessimistic_insert(
2633 					flags, cursor,
2634 					&offsets, &offsets_heap,
2635 					entry, &insert_rec, &big_rec,
2636 					n_ext, thr, &mtr);
2637 			}
2638 		}
2639 
2640 		if (big_rec != NULL) {
2641 			mtr_commit(&mtr);
2642 
2643 			/* Online table rebuild could read (and
2644 			ignore) the incomplete record at this point.
2645 			If online rebuild is in progress, the
2646 			row_ins_index_entry_big_rec() will write log. */
2647 
2648 			DBUG_EXECUTE_IF(
2649 				"row_ins_extern_checkpoint",
2650 				log_make_checkpoint_at(
2651 					LSN_MAX, TRUE););
2652 			err = row_ins_index_entry_big_rec(
2653 				entry, big_rec, offsets, &offsets_heap, index,
2654 				thr_get_trx(thr)->mysql_thd,
2655 				__FILE__, __LINE__);
2656 			dtuple_convert_back_big_rec(index, entry, big_rec);
2657 		} else {
2658 			if (err == DB_SUCCESS
2659 			    && dict_index_is_online_ddl(index)) {
2660 				row_log_table_insert(
2661 					insert_rec, entry, index, offsets);
2662 			}
2663 
2664 			mtr_commit(&mtr);
2665 		}
2666 	}
2667 
2668 func_exit:
2669 	if (offsets_heap != NULL) {
2670 		mem_heap_free(offsets_heap);
2671 	}
2672 
2673 	btr_pcur_close(&pcur);
2674 
2675 	DBUG_RETURN(err);
2676 }
2677 
2678 /** This is a specialized function meant for direct insertion to
2679 auto-generated clustered index based on cached position from
2680 last successful insert. To be used when data is sorted.
2681 
2682 @param[in]	mode	BTR_MODIFY_LEAF or BTR_MODIFY_TREE.
2683 			depending on whether we wish optimistic or
2684 			pessimistic descent down the index tree
2685 @param[in,out]	index	clustered index
2686 @param[in,out]	entry	index entry to insert
2687 @param[in]	thr	query thread
2688 
2689 @return error code */
2690 static
2691 dberr_t
row_ins_sorted_clust_index_entry(ulint mode,dict_index_t * index,dtuple_t * entry,ulint n_ext,que_thr_t * thr)2692 row_ins_sorted_clust_index_entry(
2693 	ulint		mode,
2694 	dict_index_t*	index,
2695 	dtuple_t*	entry,
2696 	ulint		n_ext,
2697 	que_thr_t*	thr)
2698 {
2699 	dberr_t		err;
2700 	mtr_t*		mtr;
2701 	const bool	commit_mtr	= mode == BTR_MODIFY_TREE;
2702 
2703 	mem_heap_t*	offsets_heap	= NULL;
2704 	ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2705 	ulint*          offsets         = offsets_;
2706 	rec_offs_init(offsets_);
2707 
2708 	DBUG_ENTER("row_ins_sorted_clust_index_entry");
2709 
2710 	ut_ad(index->last_ins_cur != NULL);
2711 	ut_ad(dict_index_is_clust(index));
2712 	ut_ad(dict_table_is_intrinsic(index->table));
2713 	ut_ad(dict_index_is_auto_gen_clust(index));
2714 
2715 	btr_cur_t	cursor;
2716 	cursor.thr = thr;
2717 	mtr = &index->last_ins_cur->mtr;
2718 
2719 	/* Search for position if tree needs to be split or if last position
2720 	is not cached. */
2721 	if (mode == BTR_MODIFY_TREE
2722 	    || index->last_ins_cur->rec == NULL
2723 	    || index->last_ins_cur->disable_caching) {
2724 
2725 		/* Commit the previous mtr. */
2726 		index->last_ins_cur->release();
2727 
2728 		mtr_start(mtr);
2729 		mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
2730 
2731 		btr_cur_search_to_nth_level_with_no_latch(
2732 			index, 0, entry, PAGE_CUR_LE, &cursor,
2733 			__FILE__, __LINE__, mtr);
2734 		ut_ad(cursor.page_cur.block != NULL);
2735 		ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2736 	} else {
2737 		cursor.index = index;
2738 
2739 		cursor.page_cur.index = index;
2740 
2741 		cursor.page_cur.rec = index->last_ins_cur->rec;
2742 
2743 		cursor.page_cur.block = index->last_ins_cur->block;
2744 	}
2745 
2746 	const ulint	flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
2747 
2748 	for (;;) {
2749 		rec_t*		insert_rec;
2750 		big_rec_t*	big_rec		= NULL;
2751 
2752 		if (mode != BTR_MODIFY_TREE) {
2753 			ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2754 				== BTR_MODIFY_LEAF);
2755 
2756 			err = btr_cur_optimistic_insert(
2757 				flags, &cursor, &offsets, &offsets_heap, entry,
2758 				&insert_rec, &big_rec, n_ext, thr, mtr);
2759 			if (err != DB_SUCCESS) {
2760 				break;
2761 			}
2762 		} else {
2763 			/* TODO: Check if this is needed for intrinsic table. */
2764 			if (buf_LRU_buf_pool_running_out()) {
2765 				err = DB_LOCK_TABLE_FULL;
2766 				break;
2767 			}
2768 
2769 			err = btr_cur_optimistic_insert(
2770 				flags, &cursor, &offsets, &offsets_heap, entry,
2771 				&insert_rec, &big_rec, n_ext, thr, mtr);
2772 
2773 			if (err == DB_FAIL) {
2774 				err = btr_cur_pessimistic_insert(
2775 					flags, &cursor, &offsets, &offsets_heap,
2776 					entry, &insert_rec, &big_rec, n_ext,
2777 					thr, mtr);
2778 			}
2779 		}
2780 
2781 		if (big_rec != NULL) {
2782 			/* If index involves big-record optimization is
2783 			turned-off. */
2784 			index->last_ins_cur->release();
2785 			index->last_ins_cur->disable_caching = true;
2786 
2787 			err = row_ins_index_entry_big_rec(
2788 				entry, big_rec, offsets, &offsets_heap, index,
2789 				thr_get_trx(thr)->mysql_thd, __FILE__, __LINE__);
2790 
2791 			dtuple_convert_back_big_rec(index, entry, big_rec);
2792 
2793 		} else if (err == DB_SUCCESS ) {
2794 			if (!commit_mtr
2795 			    && !index->last_ins_cur->disable_caching) {
2796 				index->last_ins_cur->rec = insert_rec;
2797 
2798 				index->last_ins_cur->block
2799 					= cursor.page_cur.block;
2800 			} else {
2801 				index->last_ins_cur->release();
2802 			}
2803 		}
2804 
2805 		break;
2806 	}
2807 
2808 	if (err != DB_SUCCESS) {
2809 		index->last_ins_cur->release();
2810 	}
2811 
2812 	if (offsets_heap != NULL) {
2813 		mem_heap_free(offsets_heap);
2814 	}
2815 
2816 	DBUG_RETURN(err);
2817 }
2818 
2819 /** Start a mini-transaction and check if the index will be dropped.
2820 @param[in,out]	mtr		mini-transaction
2821 @param[in,out]	index		secondary index
2822 @param[in]	check		whether to check
2823 @param[in]	search_mode	flags
2824 @return true if the index is to be dropped */
2825 static MY_ATTRIBUTE((warn_unused_result))
2826 bool
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2827 row_ins_sec_mtr_start_and_check_if_aborted(
2828 	mtr_t*		mtr,
2829 	dict_index_t*	index,
2830 	bool		check,
2831 	ulint		search_mode)
2832 {
2833 	ut_ad(!dict_index_is_clust(index));
2834 	ut_ad(mtr->is_named_space(index->space));
2835 
2836 	const mtr_log_t	log_mode = mtr->get_log_mode();
2837 
2838 	mtr_start(mtr);
2839 	mtr->set_named_space(index->space);
2840 	mtr->set_log_mode(log_mode);
2841 
2842 	if (!check) {
2843 		return(false);
2844 	}
2845 
2846 	if (search_mode & BTR_ALREADY_S_LATCHED) {
2847 		mtr_s_lock(dict_index_get_lock(index), mtr);
2848 	} else {
2849 		mtr_sx_lock(dict_index_get_lock(index), mtr);
2850 	}
2851 
2852 	switch (index->online_status) {
2853 	case ONLINE_INDEX_ABORTED:
2854 	case ONLINE_INDEX_ABORTED_DROPPED:
2855 		ut_ad(!index->is_committed());
2856 		return(true);
2857 	case ONLINE_INDEX_COMPLETE:
2858 		return(false);
2859 	case ONLINE_INDEX_CREATION:
2860 		break;
2861 	}
2862 
2863 	ut_error;
2864 	return(true);
2865 }
2866 
2867 /***************************************************************//**
2868 Tries to insert an entry into a secondary index. If a record with exactly the
2869 same fields is found, the other record is necessarily marked deleted.
2870 It is then unmarked. Otherwise, the entry is just inserted to the index.
2871 @retval DB_SUCCESS on success
2872 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2873 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2874 @return error code */
2875 dberr_t
row_ins_sec_index_entry_low(ulint flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr,bool dup_chk_only)2876 row_ins_sec_index_entry_low(
2877 /*========================*/
2878 	ulint		flags,	/*!< in: undo logging and locking flags */
2879 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2880 				depending on whether we wish optimistic or
2881 				pessimistic descent down the index tree */
2882 	dict_index_t*	index,	/*!< in: secondary index */
2883 	mem_heap_t*	offsets_heap,
2884 				/*!< in/out: memory heap that can be emptied */
2885 	mem_heap_t*	heap,	/*!< in/out: memory heap */
2886 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2887 	trx_id_t	trx_id,	/*!< in: PAGE_MAX_TRX_ID during
2888 				row_log_table_apply(), or 0 */
2889 	que_thr_t*	thr,	/*!< in: query thread */
2890 	bool		dup_chk_only)
2891 				/*!< in: if true, just do duplicate check
2892 				and return. don't execute actual insert. */
2893 {
2894 	DBUG_ENTER("row_ins_sec_index_entry_low");
2895 
2896 	btr_cur_t	cursor;
2897 	ulint		search_mode	= mode;
2898 	dberr_t		err		= DB_SUCCESS;
2899 	ulint		n_unique;
2900 	mtr_t		mtr;
2901 	ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2902 	ulint*          offsets         = offsets_;
2903 	rec_offs_init(offsets_);
2904 	rtr_info_t	rtr_info;
2905 
2906 	ut_ad(!dict_index_is_clust(index));
2907 	ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2908 
2909 	cursor.thr = thr;
2910 	cursor.rtr_info = NULL;
2911 	ut_ad(thr_get_trx(thr)->id != 0
2912 	      || dict_table_is_intrinsic(index->table));
2913 
2914 	mtr_start(&mtr);
2915 	mtr.set_named_space(index->space);
2916 
2917 	if (dict_table_is_temporary(index->table)) {
2918 		/* Disable REDO logging as the lifetime of temp-tables is
2919 		limited to server or connection lifetime and so REDO
2920 		information is not needed on restart for recovery.
2921 		Disable locking as temp-tables are local to a connection. */
2922 
2923 		ut_ad(flags & BTR_NO_LOCKING_FLAG);
2924 		ut_ad(!dict_table_is_intrinsic(index->table)
2925 		      || (flags & BTR_NO_UNDO_LOG_FLAG));
2926 
2927 		mtr.set_log_mode(MTR_LOG_NO_REDO);
2928 	} else if (!dict_index_is_spatial(index)) {
2929 		/* Enable insert buffering if it's neither temp-table
2930 		nor spatial index. */
2931 		search_mode |= BTR_INSERT;
2932 	}
2933 
2934 	/* Ensure that we acquire index->lock when inserting into an
2935 	index with index->online_status == ONLINE_INDEX_COMPLETE, but
2936 	could still be subject to rollback_inplace_alter_table().
2937 	This prevents a concurrent change of index->online_status.
2938 	The memory object cannot be freed as long as we have an open
2939 	reference to the table, or index->table->n_ref_count > 0. */
2940 	const bool	check = !index->is_committed();
2941 	if (check) {
2942 		DEBUG_SYNC_C("row_ins_sec_index_enter");
2943 		if (mode == BTR_MODIFY_LEAF) {
2944 			search_mode |= BTR_ALREADY_S_LATCHED;
2945 			mtr_s_lock(dict_index_get_lock(index), &mtr);
2946 		} else {
2947 			mtr_sx_lock(dict_index_get_lock(index), &mtr);
2948 		}
2949 
2950 		if (row_log_online_op_try(
2951 			    index, entry, thr_get_trx(thr)->id)) {
2952 			goto func_exit;
2953 		}
2954 	}
2955 
2956 	/* Note that we use PAGE_CUR_LE as the search mode, because then
2957 	the function will return in both low_match and up_match of the
2958 	cursor sensible values */
2959 
2960 	if (!thr_get_trx(thr)->check_unique_secondary) {
2961 		search_mode |= BTR_IGNORE_SEC_UNIQUE;
2962 	}
2963 
2964 	if (dict_index_is_spatial(index)) {
2965 		cursor.index = index;
2966 		rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2967 		rtr_info_update_btr(&cursor, &rtr_info);
2968 
2969 		btr_cur_search_to_nth_level(
2970 			index, 0, entry, PAGE_CUR_RTREE_INSERT,
2971 			search_mode,
2972 			&cursor, 0, __FILE__, __LINE__, &mtr);
2973 
2974 		if (mode == BTR_MODIFY_LEAF && rtr_info.mbr_adj) {
2975 			mtr_commit(&mtr);
2976 			rtr_clean_rtr_info(&rtr_info, true);
2977 			rtr_init_rtr_info(&rtr_info, false, &cursor,
2978 					  index, false);
2979 			rtr_info_update_btr(&cursor, &rtr_info);
2980 			mtr_start(&mtr);
2981 			mtr.set_named_space(index->space);
2982 			search_mode &= ~BTR_MODIFY_LEAF;
2983 			search_mode |= BTR_MODIFY_TREE;
2984 			btr_cur_search_to_nth_level(
2985 				index, 0, entry, PAGE_CUR_RTREE_INSERT,
2986 				search_mode,
2987 				&cursor, 0, __FILE__, __LINE__, &mtr);
2988 			mode = BTR_MODIFY_TREE;
2989 		}
2990 
2991 		DBUG_EXECUTE_IF(
2992 			"rtree_test_check_count", {
2993 			goto func_exit;});
2994 
2995 	} else {
2996 		if (dict_table_is_intrinsic(index->table)) {
2997 			btr_cur_search_to_nth_level_with_no_latch(
2998 				index, 0, entry, PAGE_CUR_LE, &cursor,
2999 				__FILE__, __LINE__, &mtr);
3000 			ut_ad(cursor.page_cur.block != NULL);
3001 			ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
3002 		} else {
3003 			btr_cur_search_to_nth_level(
3004 				index, 0, entry, PAGE_CUR_LE,
3005 				search_mode,
3006 				&cursor, 0, __FILE__, __LINE__, &mtr);
3007 		}
3008 	}
3009 
3010 	if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
3011 		ut_ad(!dict_index_is_spatial(index));
3012 		/* The insert was buffered during the search: we are done */
3013 		goto func_exit;
3014 	}
3015 
3016 #ifdef UNIV_DEBUG
3017 	{
3018 		page_t*	page = btr_cur_get_page(&cursor);
3019 		rec_t*	first_rec = page_rec_get_next(
3020 			page_get_infimum_rec(page));
3021 
3022 		ut_ad(page_rec_is_supremum(first_rec)
3023 		      || rec_n_fields_is_sane(index, first_rec, entry));
3024 	}
3025 #endif /* UNIV_DEBUG */
3026 
3027 	n_unique = dict_index_get_n_unique(index);
3028 
3029 	if (dict_index_is_unique(index)
3030 	    && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
3031 		mtr_commit(&mtr);
3032 
3033 		DEBUG_SYNC_C("row_ins_sec_index_unique");
3034 
3035 		if (row_ins_sec_mtr_start_and_check_if_aborted(
3036 			    &mtr, index, check, search_mode)) {
3037 			goto func_exit;
3038 		}
3039 
3040 		err = row_ins_scan_sec_index_for_duplicate(
3041 			flags, index, entry, thr, check, &mtr, offsets_heap);
3042 
3043 		mtr_commit(&mtr);
3044 
3045 		switch (err) {
3046 		case DB_SUCCESS:
3047 			break;
3048 		case DB_DUPLICATE_KEY:
3049 			if (!index->is_committed()) {
3050 				ut_ad(!thr_get_trx(thr)
3051 				      ->dict_operation_lock_mode);
3052 				mutex_enter(&dict_sys->mutex);
3053 				dict_set_corrupted_index_cache_only(index);
3054 				mutex_exit(&dict_sys->mutex);
3055 				/* Do not return any error to the
3056 				caller. The duplicate will be reported
3057 				by ALTER TABLE or CREATE UNIQUE INDEX.
3058 				Unfortunately we cannot report the
3059 				duplicate key value to the DDL thread,
3060 				because the altered_table object is
3061 				private to its call stack. */
3062 				err = DB_SUCCESS;
3063 			}
3064 			/* fall through */
3065 		default:
3066 			if (dict_index_is_spatial(index)) {
3067 				rtr_clean_rtr_info(&rtr_info, true);
3068 			}
3069 			DBUG_RETURN(err);
3070 		}
3071 
3072 		if (row_ins_sec_mtr_start_and_check_if_aborted(
3073 			    &mtr, index, check, search_mode)) {
3074 			goto func_exit;
3075 		}
3076 
3077 		DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
3078 
3079 		/* We did not find a duplicate and we have now
3080 		locked with s-locks the necessary records to
3081 		prevent any insertion of a duplicate by another
3082 		transaction. Let us now reposition the cursor and
3083 		continue the insertion. */
3084 		if (dict_table_is_intrinsic(index->table)) {
3085 			btr_cur_search_to_nth_level_with_no_latch(
3086 				index, 0, entry, PAGE_CUR_LE, &cursor,
3087 				__FILE__, __LINE__, &mtr);
3088 			ut_ad(cursor.page_cur.block != NULL);
3089 			ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
3090 		} else {
3091 			btr_cur_search_to_nth_level(
3092 				index, 0, entry, PAGE_CUR_LE,
3093 				(search_mode
3094 				 & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE)),
3095 				&cursor, 0, __FILE__, __LINE__, &mtr);
3096 		}
3097 	}
3098 
3099 	if (dup_chk_only) {
3100 		goto func_exit;
3101 	}
3102 
3103 	if (row_ins_must_modify_rec(&cursor)) {
3104 		/* If the existing record is being modified and the new record
3105 		is doesn't fit the provided slot then existing record is added
3106 		to free list and new record is inserted. This also means
3107 		cursor that we have cached for SELECT is now invalid. */
3108 		if(index->last_sel_cur) {
3109 			index->last_sel_cur->invalid = true;
3110 		}
3111 
3112 		/* There is already an index entry with a long enough common
3113 		prefix, we must convert the insert into a modify of an
3114 		existing record */
3115 		offsets = rec_get_offsets(
3116 			btr_cur_get_rec(&cursor), index, offsets,
3117 			ULINT_UNDEFINED, &offsets_heap);
3118 
3119 		err = row_ins_sec_index_entry_by_modify(
3120 			flags, mode, &cursor, &offsets,
3121 			offsets_heap, heap, entry, thr, &mtr);
3122 
3123 		if (err == DB_SUCCESS && dict_index_is_spatial(index)
3124 		    && rtr_info.mbr_adj) {
3125 			err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3126 		}
3127 	} else {
3128 		rec_t*		insert_rec;
3129 		big_rec_t*	big_rec;
3130 
3131 		if (mode == BTR_MODIFY_LEAF) {
3132 			err = btr_cur_optimistic_insert(
3133 				flags, &cursor, &offsets, &offsets_heap,
3134 				entry, &insert_rec,
3135 				&big_rec, 0, thr, &mtr);
3136 			if (err == DB_SUCCESS
3137 			    && dict_index_is_spatial(index)
3138 			    && rtr_info.mbr_adj) {
3139 				err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3140 			}
3141 		} else {
3142 			ut_ad(mode == BTR_MODIFY_TREE);
3143 			if (buf_LRU_buf_pool_running_out()) {
3144 
3145 				err = DB_LOCK_TABLE_FULL;
3146 				goto func_exit;
3147 			}
3148 
3149 			err = btr_cur_optimistic_insert(
3150 				flags, &cursor,
3151 				&offsets, &offsets_heap,
3152 				entry, &insert_rec,
3153 				&big_rec, 0, thr, &mtr);
3154 			if (err == DB_FAIL) {
3155 				err = btr_cur_pessimistic_insert(
3156 					flags, &cursor,
3157 					&offsets, &offsets_heap,
3158 					entry, &insert_rec,
3159 					&big_rec, 0, thr, &mtr);
3160 			}
3161 			if (err == DB_SUCCESS
3162 				   && dict_index_is_spatial(index)
3163 				   && rtr_info.mbr_adj) {
3164 				err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3165 			}
3166 		}
3167 
3168 		if (err == DB_SUCCESS && trx_id) {
3169 			page_update_max_trx_id(
3170 				btr_cur_get_block(&cursor),
3171 				btr_cur_get_page_zip(&cursor),
3172 				trx_id, &mtr);
3173 		}
3174 
3175 		ut_ad(!big_rec);
3176 	}
3177 
3178 func_exit:
3179 	if (dict_index_is_spatial(index)) {
3180 		rtr_clean_rtr_info(&rtr_info, true);
3181 	}
3182 
3183 	mtr_commit(&mtr);
3184 	DBUG_RETURN(err);
3185 }
3186 
3187 /***************************************************************//**
3188 Tries to insert the externally stored fields (off-page columns)
3189 of a clustered index entry.
3190 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
3191 dberr_t
row_ins_index_entry_big_rec_func(const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,dict_index_t * index,const char * file,const void * thd,ulint line)3192 row_ins_index_entry_big_rec_func(
3193 /*=============================*/
3194 	const dtuple_t*		entry,	/*!< in/out: index entry to insert */
3195 	const big_rec_t*	big_rec,/*!< in: externally stored fields */
3196 	ulint*			offsets,/*!< in/out: rec offsets */
3197 	mem_heap_t**		heap,	/*!< in/out: memory heap */
3198 	dict_index_t*		index,	/*!< in: index */
3199 	const char*		file,	/*!< in: file name of caller */
3200 #ifndef DBUG_OFF
3201 	const void*		thd,    /*!< in: connection, or NULL */
3202 #endif /* DBUG_OFF */
3203 	ulint			line)	/*!< in: line number of caller */
3204 {
3205 	mtr_t		mtr;
3206 	btr_pcur_t	pcur;
3207 	rec_t*		rec;
3208 	dberr_t		error;
3209 
3210 	ut_ad(dict_index_is_clust(index));
3211 
3212 	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
3213 	DEBUG_SYNC_C("before_insertion_of_blob");
3214 
3215 	mtr_start(&mtr);
3216 	mtr.set_named_space(index->space);
3217 	dict_disable_redo_if_temporary(index->table, &mtr);
3218 
3219 	btr_pcur_open(index, entry, PAGE_CUR_LE, BTR_MODIFY_TREE,
3220 		      &pcur, &mtr);
3221 	rec = btr_pcur_get_rec(&pcur);
3222 	offsets = rec_get_offsets(rec, index, offsets,
3223 				  ULINT_UNDEFINED, heap);
3224 
3225 	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
3226 	error = btr_store_big_rec_extern_fields(
3227 		&pcur, 0, offsets, big_rec, &mtr, BTR_STORE_INSERT);
3228 	DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
3229 
3230 	if (error == DB_SUCCESS
3231 	    && dict_index_is_online_ddl(index)) {
3232 		row_log_table_insert(btr_pcur_get_rec(&pcur), entry,
3233 				     index, offsets);
3234 	}
3235 
3236 	mtr_commit(&mtr);
3237 
3238 	btr_pcur_close(&pcur);
3239 
3240 	return(error);
3241 }
3242 
3243 /***************************************************************//**
3244 Inserts an entry into a clustered index. Tries first optimistic,
3245 then pessimistic descent down the tree. If the entry matches enough
3246 to a delete marked record, performs the insert by updating or delete
3247 unmarking the delete marked record.
3248 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3249 dberr_t
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,ulint n_ext,bool dup_chk_only)3250 row_ins_clust_index_entry(
3251 /*======================*/
3252 	dict_index_t*	index,	/*!< in: clustered index */
3253 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
3254 	que_thr_t*	thr,	/*!< in: query thread */
3255 	ulint		n_ext,	/*!< in: number of externally stored columns */
3256 	bool		dup_chk_only)
3257 				/*!< in: if true, just do duplicate check
3258 				and return. don't execute actual insert. */
3259 {
3260 	dberr_t	err;
3261 	ulint	n_uniq;
3262 
3263 	DBUG_ENTER("row_ins_clust_index_entry");
3264 
3265 	if (!index->table->foreign_set.empty()) {
3266 		err = row_ins_check_foreign_constraints(
3267 			index->table, index, entry, thr);
3268 		if (err != DB_SUCCESS) {
3269 
3270 			DBUG_RETURN(err);
3271 		}
3272 	}
3273 
3274 	n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
3275 
3276 	/* Try first optimistic descent to the B-tree */
3277 	ulint	flags;
3278 
3279 	if (!dict_table_is_intrinsic(index->table)) {
3280 		log_free_check();
3281 		flags = dict_table_is_temporary(index->table)
3282 			? BTR_NO_LOCKING_FLAG
3283 			: 0;
3284 	} else {
3285 		flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3286 	}
3287 
3288 	if (dict_table_is_intrinsic(index->table)
3289 	    && dict_index_is_auto_gen_clust(index)) {
3290 
3291 		/* Check if the memory allocated for intrinsic cache*/
3292 		if(!index->last_ins_cur) {
3293 			dict_allocate_mem_intrinsic_cache(index);
3294 		}
3295 
3296 		err = row_ins_sorted_clust_index_entry(
3297 			BTR_MODIFY_LEAF, index, entry, n_ext, thr);
3298 	} else {
3299 		err = row_ins_clust_index_entry_low(
3300 			flags, BTR_MODIFY_LEAF, index, n_uniq, entry,
3301 			n_ext, thr, dup_chk_only);
3302 	}
3303 
3304 
3305 	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3306 			    "after_row_ins_clust_index_entry_leaf");
3307 
3308 	if (err != DB_FAIL) {
3309 		DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
3310 		DBUG_RETURN(err);
3311 	}
3312 
3313 	/* Try then pessimistic descent to the B-tree */
3314 	if (!dict_table_is_intrinsic(index->table)) {
3315 		log_free_check();
3316 	} else if(!index->last_sel_cur) {
3317 		dict_allocate_mem_intrinsic_cache(index);
3318 		index->last_sel_cur->invalid = true;
3319 	} else {
3320 		index->last_sel_cur->invalid = true;
3321 	}
3322 
3323 	if (dict_table_is_intrinsic(index->table)
3324 	    && dict_index_is_auto_gen_clust(index)) {
3325 		err = row_ins_sorted_clust_index_entry(
3326 			BTR_MODIFY_TREE, index, entry, n_ext, thr);
3327 	} else {
3328 		err = row_ins_clust_index_entry_low(
3329 			flags, BTR_MODIFY_TREE, index, n_uniq, entry,
3330 			n_ext, thr, dup_chk_only);
3331 	}
3332 
3333 	DBUG_RETURN(err);
3334 }
3335 
3336 /***************************************************************//**
3337 Inserts an entry into a secondary index. Tries first optimistic,
3338 then pessimistic descent down the tree. If the entry matches enough
3339 to a delete marked record, performs the insert by updating or delete
3340 unmarking the delete marked record.
3341 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3342 dberr_t
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)3343 row_ins_sec_index_entry(
3344 /*====================*/
3345 	dict_index_t*	index,	/*!< in: secondary index */
3346 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
3347 	que_thr_t*	thr,	/*!< in: query thread */
3348 	bool		dup_chk_only)
3349 				/*!< in: if true, just do duplicate check
3350 				and return. don't execute actual insert. */
3351 {
3352 	dberr_t		err;
3353 	mem_heap_t*	offsets_heap;
3354 	mem_heap_t*	heap;
3355 
3356 	DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
3357 			DBUG_SET("-d,row_ins_sec_index_entry_timeout");
3358 			return(DB_LOCK_WAIT);});
3359 
3360 	if (!index->table->foreign_set.empty()) {
3361 		err = row_ins_check_foreign_constraints(index->table, index,
3362 							entry, thr);
3363 		if (err != DB_SUCCESS) {
3364 
3365 			return(err);
3366 		}
3367 	}
3368 
3369 	ut_ad(thr_get_trx(thr)->id != 0
3370 	      || dict_table_is_intrinsic(index->table));
3371 
3372 	offsets_heap = mem_heap_create(1024);
3373 	heap = mem_heap_create(1024);
3374 
3375 	/* Try first optimistic descent to the B-tree */
3376 
3377 	ulint	flags;
3378 
3379 	if (!dict_table_is_intrinsic(index->table)) {
3380 		log_free_check();
3381 		flags = dict_table_is_temporary(index->table)
3382 			? BTR_NO_LOCKING_FLAG
3383 			: 0;
3384 	} else {
3385 		flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3386 	}
3387 
3388 	err = row_ins_sec_index_entry_low(
3389 		flags, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry,
3390 		0, thr, dup_chk_only);
3391 	if (err == DB_FAIL) {
3392 		mem_heap_empty(heap);
3393 
3394 		/* Try then pessimistic descent to the B-tree */
3395 
3396 		if (!dict_table_is_intrinsic(index->table)) {
3397 			log_free_check();
3398 		} else if(!index->last_sel_cur) {
3399 			dict_allocate_mem_intrinsic_cache(index);
3400 			index->last_sel_cur->invalid = true;
3401 		} else {
3402 			index->last_sel_cur->invalid = true;
3403 		}
3404 
3405 		err = row_ins_sec_index_entry_low(
3406 			flags, BTR_MODIFY_TREE, index,
3407 			offsets_heap, heap, entry, 0, thr,
3408 			dup_chk_only);
3409 	}
3410 
3411 	mem_heap_free(heap);
3412 	mem_heap_free(offsets_heap);
3413 	return(err);
3414 }
3415 
3416 /***************************************************************//**
3417 Inserts an index entry to index. Tries first optimistic, then pessimistic
3418 descent down the tree. If the entry matches enough to a delete marked record,
3419 performs the insert by updating or delete unmarking the delete marked
3420 record.
3421 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3422 static
3423 dberr_t
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)3424 row_ins_index_entry(
3425 /*================*/
3426 	dict_index_t*	index,	/*!< in: index */
3427 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
3428 	que_thr_t*	thr)	/*!< in: query thread */
3429 {
3430 	ut_ad(thr_get_trx(thr)->id != 0);
3431 
3432 	DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
3433 			DBUG_SET("-d,row_ins_index_entry_timeout");
3434 			return(DB_LOCK_WAIT);});
3435 
3436 	if (dict_index_is_clust(index)) {
3437 		return(row_ins_clust_index_entry(index, entry, thr, 0, false));
3438 	} else {
3439 		return(row_ins_sec_index_entry(index, entry, thr, false));
3440 	}
3441 }
3442 
3443 
3444 /*****************************************************************//**
3445 This function generate MBR (Minimum Bounding Box) for spatial objects
3446 and set it to spatial index field. */
3447 static
3448 void
row_ins_spatial_index_entry_set_mbr_field(dfield_t * field,const dfield_t * row_field)3449 row_ins_spatial_index_entry_set_mbr_field(
3450 /*======================================*/
3451 	dfield_t*	field,		/*!< in/out: mbr field */
3452 	const dfield_t*	row_field)	/*!< in: row field */
3453 {
3454 	uchar*		dptr = NULL;
3455 	ulint		dlen = 0;
3456 	double		mbr[SPDIMS * 2];
3457 
3458 	/* This must be a GEOMETRY datatype */
3459 	ut_ad(DATA_GEOMETRY_MTYPE(field->type.mtype));
3460 
3461 	dptr = static_cast<uchar*>(dfield_get_data(row_field));
3462 	dlen = dfield_get_len(row_field);
3463 
3464 	/* obtain the MBR */
3465 	rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
3466 			   static_cast<uint>(dlen - GEO_DATA_HEADER_SIZE),
3467 			   SPDIMS, mbr);
3468 
3469 	/* Set mbr as index entry data */
3470 	dfield_write_mbr(field, mbr);
3471 }
3472 
3473 /** Sets the values of the dtuple fields in entry from the values of appropriate
3474 columns in row.
3475 @param[in]	index	index handler
3476 @param[out]	entry	index entry to make
3477 @param[in]	row	row
3478 
3479 @return DB_SUCCESS if the set is successful */
3480 dberr_t
row_ins_index_entry_set_vals(const dict_index_t * index,dtuple_t * entry,const dtuple_t * row)3481 row_ins_index_entry_set_vals(
3482 	const dict_index_t*	index,
3483 	dtuple_t*		entry,
3484 	const dtuple_t*		row)
3485 {
3486 	ulint	n_fields;
3487 	ulint	i;
3488 	ulint	num_v = dtuple_get_n_v_fields(entry);
3489 
3490 	n_fields = dtuple_get_n_fields(entry);
3491 
3492 	for (i = 0; i < n_fields + num_v; i++) {
3493 		dict_field_t*	ind_field = NULL;
3494 		dfield_t*	field;
3495 		const dfield_t*	row_field;
3496 		ulint		len;
3497 		dict_col_t*	col;
3498 
3499 		if (i >= n_fields) {
3500 			/* This is virtual field */
3501 			field = dtuple_get_nth_v_field(entry, i - n_fields);
3502 			col = &dict_table_get_nth_v_col(
3503 				index->table, i - n_fields)->m_col;
3504 		} else {
3505 			field = dtuple_get_nth_field(entry, i);
3506 			ind_field = dict_index_get_nth_field(index, i);
3507 			col = ind_field->col;
3508 		}
3509 
3510 		if (dict_col_is_virtual(col)) {
3511 			const dict_v_col_t*     v_col
3512 				= reinterpret_cast<const dict_v_col_t*>(col);
3513 			ut_ad(dtuple_get_n_fields(row)
3514 			      == dict_table_get_n_cols(index->table));
3515 			row_field = dtuple_get_nth_v_field(row, v_col->v_pos);
3516 		} else {
3517 			row_field = dtuple_get_nth_field(
3518 				row, ind_field->col->ind);
3519 		}
3520 
3521 		len = dfield_get_len(row_field);
3522 
3523 		/* Check column prefix indexes */
3524 		if (ind_field != NULL && ind_field->prefix_len > 0
3525 		    && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3526 
3527 			const	dict_col_t*	col
3528 				= dict_field_get_col(ind_field);
3529 
3530 			len = dtype_get_at_most_n_mbchars(
3531 				col->prtype, col->mbminmaxlen,
3532 				ind_field->prefix_len,
3533 				len,
3534 				static_cast<const char*>(
3535 					dfield_get_data(row_field)));
3536 
3537 			ut_ad(!dfield_is_ext(row_field));
3538 		}
3539 
3540 		/* Handle spatial index. For the first field, replace
3541 		the data with its MBR (Minimum Bounding Box). */
3542 		if ((i == 0) && dict_index_is_spatial(index)) {
3543 			if (!row_field->data
3544 			    || row_field->len < GEO_DATA_HEADER_SIZE) {
3545 				return(DB_CANT_CREATE_GEOMETRY_OBJECT);
3546 			}
3547 			row_ins_spatial_index_entry_set_mbr_field(
3548 				field, row_field);
3549 			continue;
3550 		}
3551 
3552 		dfield_set_data(field, dfield_get_data(row_field), len);
3553 		if (dfield_is_ext(row_field)) {
3554 			ut_ad(dict_index_is_clust(index));
3555 			dfield_set_ext(field);
3556 		}
3557 	}
3558 
3559 	return(DB_SUCCESS);
3560 }
3561 
3562 /***********************************************************//**
3563 Inserts a single index entry to the table.
3564 @return DB_SUCCESS if operation successfully completed, else error
3565 code or DB_LOCK_WAIT */
3566 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3567 dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3568 row_ins_index_entry_step(
3569 /*=====================*/
3570 	ins_node_t*	node,	/*!< in: row insert node */
3571 	que_thr_t*	thr)	/*!< in: query thread */
3572 {
3573 	dberr_t	err;
3574 
3575 	DBUG_ENTER("row_ins_index_entry_step");
3576 
3577 	ut_ad(dtuple_check_typed(node->row));
3578 
3579 	err = row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3580 
3581 	if (err != DB_SUCCESS) {
3582 		DBUG_RETURN(err);
3583 	}
3584 
3585 	ut_ad(dtuple_check_typed(node->entry));
3586 
3587 	err = row_ins_index_entry(node->index, node->entry, thr);
3588 
3589 	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3590 			    "after_row_ins_index_entry_step");
3591 
3592 	DBUG_RETURN(err);
3593 }
3594 
3595 /***********************************************************//**
3596 Allocates a row id for row and inits the node->index field. */
3597 UNIV_INLINE
3598 void
row_ins_alloc_row_id_step(ins_node_t * node)3599 row_ins_alloc_row_id_step(
3600 /*======================*/
3601 	ins_node_t*	node)	/*!< in: row insert node */
3602 {
3603 	row_id_t	row_id;
3604 
3605 	ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3606 
3607 	if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3608 
3609 		/* No row id is stored if the clustered index is unique */
3610 
3611 		return;
3612 	}
3613 
3614 	/* Fill in row id value to row */
3615 
3616 	row_id = dict_sys_get_new_row_id();
3617 
3618 	dict_sys_write_row_id(node->row_id_buf, row_id);
3619 }
3620 
3621 /***********************************************************//**
3622 Gets a row to insert from the values list. */
3623 UNIV_INLINE
3624 void
row_ins_get_row_from_values(ins_node_t * node)3625 row_ins_get_row_from_values(
3626 /*========================*/
3627 	ins_node_t*	node)	/*!< in: row insert node */
3628 {
3629 	que_node_t*	list_node;
3630 	dfield_t*	dfield;
3631 	dtuple_t*	row;
3632 	ulint		i;
3633 
3634 	/* The field values are copied in the buffers of the select node and
3635 	it is safe to use them until we fetch from select again: therefore
3636 	we can just copy the pointers */
3637 
3638 	row = node->row;
3639 
3640 	i = 0;
3641 	list_node = node->values_list;
3642 
3643 	while (list_node) {
3644 		eval_exp(list_node);
3645 
3646 		dfield = dtuple_get_nth_field(row, i);
3647 		dfield_copy_data(dfield, que_node_get_val(list_node));
3648 
3649 		i++;
3650 		list_node = que_node_get_next(list_node);
3651 	}
3652 }
3653 
3654 /***********************************************************//**
3655 Gets a row to insert from the select list. */
3656 UNIV_INLINE
3657 void
row_ins_get_row_from_select(ins_node_t * node)3658 row_ins_get_row_from_select(
3659 /*========================*/
3660 	ins_node_t*	node)	/*!< in: row insert node */
3661 {
3662 	que_node_t*	list_node;
3663 	dfield_t*	dfield;
3664 	dtuple_t*	row;
3665 	ulint		i;
3666 
3667 	/* The field values are copied in the buffers of the select node and
3668 	it is safe to use them until we fetch from select again: therefore
3669 	we can just copy the pointers */
3670 
3671 	row = node->row;
3672 
3673 	i = 0;
3674 	list_node = node->select->select_list;
3675 
3676 	while (list_node) {
3677 		dfield = dtuple_get_nth_field(row, i);
3678 		dfield_copy_data(dfield, que_node_get_val(list_node));
3679 
3680 		i++;
3681 		list_node = que_node_get_next(list_node);
3682 	}
3683 }
3684 
3685 /***********************************************************//**
3686 Inserts a row to a table.
3687 @return DB_SUCCESS if operation successfully completed, else error
3688 code or DB_LOCK_WAIT */
3689 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3690 dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3691 row_ins(
3692 /*====*/
3693 	ins_node_t*	node,	/*!< in: row insert node */
3694 	que_thr_t*	thr)	/*!< in: query thread */
3695 {
3696 	dberr_t	err;
3697 
3698 	DBUG_ENTER("row_ins");
3699 
3700 	DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));
3701 
3702 	if (node->state == INS_NODE_ALLOC_ROW_ID) {
3703 
3704 		row_ins_alloc_row_id_step(node);
3705 
3706 		node->index = dict_table_get_first_index(node->table);
3707 		node->entry = UT_LIST_GET_FIRST(node->entry_list);
3708 
3709 		if (node->ins_type == INS_SEARCHED) {
3710 
3711 			row_ins_get_row_from_select(node);
3712 
3713 		} else if (node->ins_type == INS_VALUES) {
3714 
3715 			row_ins_get_row_from_values(node);
3716 		}
3717 
3718 		node->state = INS_NODE_INSERT_ENTRIES;
3719 	}
3720 
3721 	ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3722 
3723 	while (node->index != NULL) {
3724 		if (node->index->type != DICT_FTS) {
3725 			err = row_ins_index_entry_step(node, thr);
3726 			switch(err) {
3727 			case DB_SUCCESS:
3728 				break;
3729 			case DB_DUPLICATE_KEY:
3730 				thr_get_trx(thr)->error_state
3731 						= DB_DUPLICATE_KEY;
3732 				thr_get_trx(thr)->error_index
3733 						= node->index;
3734 			//fall through
3735 			default:
3736 				DBUG_RETURN(err);
3737 			}
3738 		}
3739 
3740 		node->index = dict_table_get_next_index(node->index);
3741 		node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3742 
3743 		DBUG_EXECUTE_IF(
3744 			"row_ins_skip_sec",
3745 			node->index = NULL; node->entry = NULL; break;);
3746 
3747 		/* Skip corrupted secondary index and its entry */
3748 		while (node->index && dict_index_is_corrupted(node->index)) {
3749 
3750 			node->index = dict_table_get_next_index(node->index);
3751 			node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3752 		}
3753 
3754 	}
3755 
3756 	ut_ad(node->entry == NULL);
3757 	thr_get_trx(thr)->error_index = NULL;
3758 	node->state = INS_NODE_ALLOC_ROW_ID;
3759 
3760 	DBUG_RETURN(DB_SUCCESS);
3761 }
3762 
3763 /***********************************************************//**
3764 Inserts a row to a table. This is a high-level function used in SQL execution
3765 graphs.
3766 @return query thread to run next or NULL */
3767 que_thr_t*
row_ins_step(que_thr_t * thr)3768 row_ins_step(
3769 /*=========*/
3770 	que_thr_t*	thr)	/*!< in: query thread */
3771 {
3772 	ins_node_t*	node;
3773 	que_node_t*	parent;
3774 	sel_node_t*	sel_node;
3775 	trx_t*		trx;
3776 	dberr_t		err;
3777 
3778 	ut_ad(thr);
3779 
3780 	DEBUG_SYNC_C("innodb_row_ins_step_enter");
3781 
3782 	trx = thr_get_trx(thr);
3783 
3784 	trx_start_if_not_started_xa(trx, true);
3785 
3786 	node = static_cast<ins_node_t*>(thr->run_node);
3787 
3788 	ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3789 	ut_ad(!dict_table_is_intrinsic(node->table));
3790 
3791 	parent = que_node_get_parent(node);
3792 	sel_node = node->select;
3793 
3794 	if (thr->prev_node == parent) {
3795 		node->state = INS_NODE_SET_IX_LOCK;
3796 	}
3797 
3798 	/* If this is the first time this node is executed (or when
3799 	execution resumes after wait for the table IX lock), set an
3800 	IX lock on the table and reset the possible select node. MySQL's
3801 	partitioned table code may also call an insert within the same
3802 	SQL statement AFTER it has used this table handle to do a search.
3803 	This happens, for example, when a row update moves it to another
3804 	partition. In that case, we have already set the IX lock on the
3805 	table during the search operation, and there is no need to set
3806 	it again here. But we must write trx->id to node->trx_id_buf. */
3807 
3808 	memset(node->trx_id_buf, 0, DATA_TRX_ID_LEN);
3809 	trx_write_trx_id(node->trx_id_buf, trx->id);
3810 
3811 	if (node->state == INS_NODE_SET_IX_LOCK) {
3812 
3813 		node->state = INS_NODE_ALLOC_ROW_ID;
3814 
3815 		/* It may be that the current session has not yet started
3816 		its transaction, or it has been committed: */
3817 
3818 		if (trx->id == node->trx_id) {
3819 			/* No need to do IX-locking */
3820 
3821 			goto same_trx;
3822 		}
3823 
3824 		err = lock_table(0, node->table, LOCK_IX, thr);
3825 
3826 		DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3827 				err = DB_LOCK_WAIT;);
3828 
3829 		if (err != DB_SUCCESS) {
3830 
3831 			goto error_handling;
3832 		}
3833 
3834 		node->trx_id = trx->id;
3835 same_trx:
3836 		if (node->ins_type == INS_SEARCHED) {
3837 			/* Reset the cursor */
3838 			sel_node->state = SEL_NODE_OPEN;
3839 
3840 			/* Fetch a row to insert */
3841 
3842 			thr->run_node = sel_node;
3843 
3844 			return(thr);
3845 		}
3846 	}
3847 
3848 	if ((node->ins_type == INS_SEARCHED)
3849 	    && (sel_node->state != SEL_NODE_FETCH)) {
3850 
3851 		ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3852 
3853 		/* No more rows to insert */
3854 		thr->run_node = parent;
3855 
3856 		return(thr);
3857 	}
3858 
3859 	/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3860 
3861 	err = row_ins(node, thr);
3862 
3863 error_handling:
3864 	trx->error_state = err;
3865 
3866 	if (err != DB_SUCCESS) {
3867 		/* err == DB_LOCK_WAIT or SQL error detected */
3868 		return(NULL);
3869 	}
3870 
3871 	/* DO THE TRIGGER ACTIONS HERE */
3872 
3873 	if (node->ins_type == INS_SEARCHED) {
3874 		/* Fetch a row to insert */
3875 
3876 		thr->run_node = sel_node;
3877 	} else {
3878 		thr->run_node = que_node_get_parent(node);
3879 	}
3880 
3881 	return(thr);
3882 }
3883