1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0ins.cc
29 Insert into a table
30 
31 Created 4/20/1996 Heikki Tuuri
32 *******************************************************/
33 
34 #include "ha_prototypes.h"
35 
36 #include "row0ins.h"
37 
38 #ifdef UNIV_NONINL
39 #include "row0ins.ic"
40 #endif
41 
42 #include "dict0dict.h"
43 #include "dict0boot.h"
44 #include "trx0rec.h"
45 #include "trx0undo.h"
46 #include "btr0btr.h"
47 #include "btr0cur.h"
48 #include "mach0data.h"
49 #include "que0que.h"
50 #include "row0upd.h"
51 #include "row0sel.h"
52 #include "row0row.h"
53 #include "row0log.h"
54 #include "rem0cmp.h"
55 #include "lock0lock.h"
56 #include "log0log.h"
57 #include "eval0eval.h"
58 #include "data0data.h"
59 #include "usr0sess.h"
60 #include "buf0lru.h"
61 #include "fts0fts.h"
62 #include "fts0types.h"
63 #include "m_string.h"
64 #include "gis0geo.h"
65 
66 /*************************************************************************
67 IMPORTANT NOTE: Any operation that generates redo MUST check that there
68 is enough space in the redo log before for that operation. This is
69 done by calling log_free_check(). The reason for checking the
70 availability of the redo log space before the start of the operation is
71 that we MUST not hold any synchonization objects when performing the
72 check.
73 If you make a change in this module make sure that no codepath is
74 introduced where a call to log_free_check() is bypassed. */
75 
76 /*********************************************************************//**
77 Creates an insert node struct.
78 @return own: insert node struct */
79 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)80 ins_node_create(
81 /*============*/
82 	ulint		ins_type,	/*!< in: INS_VALUES, ... */
83 	dict_table_t*	table,		/*!< in: table where to insert */
84 	mem_heap_t*	heap)		/*!< in: mem heap where created */
85 {
86 	ins_node_t*	node;
87 
88 	node = static_cast<ins_node_t*>(
89 		mem_heap_alloc(heap, sizeof(ins_node_t)));
90 
91 	node->common.type = QUE_NODE_INSERT;
92 
93 	node->ins_type = ins_type;
94 
95 	node->state = INS_NODE_SET_IX_LOCK;
96 	node->table = table;
97 	node->index = NULL;
98 	node->entry = NULL;
99 
100 	node->select = NULL;
101 
102 	node->trx_id = 0;
103 
104 	node->entry_sys_heap = mem_heap_create(128);
105 
106 	node->magic_n = INS_NODE_MAGIC_N;
107 
108 	return(node);
109 }
110 
111 /***********************************************************//**
112 Creates an entry template for each index of a table. */
113 static
114 void
ins_node_create_entry_list(ins_node_t * node)115 ins_node_create_entry_list(
116 /*=======================*/
117 	ins_node_t*	node)	/*!< in: row insert node */
118 {
119 	dict_index_t*	index;
120 	dtuple_t*	entry;
121 
122 	ut_ad(node->entry_sys_heap);
123 
124 	UT_LIST_INIT(node->entry_list, &dtuple_t::tuple_list);
125 
126 	/* We will include all indexes (include those corrupted
127 	secondary indexes) in the entry list. Filteration of
128 	these corrupted index will be done in row_ins() */
129 
130 	for (index = dict_table_get_first_index(node->table);
131 	     index != 0;
132 	     index = dict_table_get_next_index(index)) {
133 
134 		entry = row_build_index_entry_low(
135 			node->row, NULL, index, node->entry_sys_heap,
136 			ROW_BUILD_FOR_INSERT);
137 
138 		UT_LIST_ADD_LAST(node->entry_list, entry);
139 	}
140 }
141 
142 /*****************************************************************//**
143 Adds system field buffers to a row. */
144 static
145 void
row_ins_alloc_sys_fields(ins_node_t * node)146 row_ins_alloc_sys_fields(
147 /*=====================*/
148 	ins_node_t*	node)	/*!< in: insert node */
149 {
150 	dtuple_t*		row;
151 	dict_table_t*		table;
152 	mem_heap_t*		heap;
153 	const dict_col_t*	col;
154 	dfield_t*		dfield;
155 	byte*			ptr;
156 
157 	row = node->row;
158 	table = node->table;
159 	heap = node->entry_sys_heap;
160 
161 	ut_ad(row && table && heap);
162 	ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
163 
164 	/* allocate buffer to hold the needed system created hidden columns. */
165 	uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN;
166 	if (!dict_table_is_intrinsic(table)) {
167 		len += DATA_ROLL_PTR_LEN;
168 	}
169 	ptr = static_cast<byte*>(mem_heap_zalloc(heap, len));
170 
171 	/* 1. Populate row-id */
172 	col = dict_table_get_sys_col(table, DATA_ROW_ID);
173 
174 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
175 
176 	dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
177 
178 	node->row_id_buf = ptr;
179 
180 	ptr += DATA_ROW_ID_LEN;
181 
182 	/* 2. Populate trx id */
183 	col = dict_table_get_sys_col(table, DATA_TRX_ID);
184 
185 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
186 
187 	dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
188 
189 	node->trx_id_buf = ptr;
190 
191 	ptr += DATA_TRX_ID_LEN;
192 
193 	if (!dict_table_is_intrinsic(table)) {
194 		col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
195 
196 		dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
197 
198 		dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
199 	}
200 }
201 
202 /*********************************************************************//**
203 Sets a new row to insert for an INS_DIRECT node. This function is only used
204 if we have constructed the row separately, which is a rare case; this
205 function is quite slow. */
206 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)207 ins_node_set_new_row(
208 /*=================*/
209 	ins_node_t*	node,	/*!< in: insert node */
210 	dtuple_t*	row)	/*!< in: new row (or first row) for the node */
211 {
212 	node->state = INS_NODE_SET_IX_LOCK;
213 	node->index = NULL;
214 	node->entry = NULL;
215 
216 	node->row = row;
217 
218 	mem_heap_empty(node->entry_sys_heap);
219 
220 	/* Create templates for index entries */
221 
222 	ins_node_create_entry_list(node);
223 
224 	/* Allocate from entry_sys_heap buffers for sys fields */
225 
226 	row_ins_alloc_sys_fields(node);
227 
228 	/* As we allocated a new trx id buf, the trx id should be written
229 	there again: */
230 
231 	node->trx_id = 0;
232 }
233 
234 /*******************************************************************//**
235 Does an insert operation by updating a delete-marked existing record
236 in the index. This situation can occur if the delete-marked record is
237 kept in the index for consistent reads.
238 @return DB_SUCCESS or error code */
239 static MY_ATTRIBUTE((warn_unused_result))
240 dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)241 row_ins_sec_index_entry_by_modify(
242 /*==============================*/
243 	ulint		flags,	/*!< in: undo logging and locking flags */
244 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
245 				depending on whether mtr holds just a leaf
246 				latch or also a tree latch */
247 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
248 	ulint**		offsets,/*!< in/out: offsets on cursor->page_cur.rec */
249 	mem_heap_t*	offsets_heap,
250 				/*!< in/out: memory heap that can be emptied */
251 	mem_heap_t*	heap,	/*!< in/out: memory heap */
252 	const dtuple_t*	entry,	/*!< in: index entry to insert */
253 	que_thr_t*	thr,	/*!< in: query thread */
254 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
255 				latching any further pages */
256 {
257 	big_rec_t*	dummy_big_rec;
258 	upd_t*		update;
259 	rec_t*		rec;
260 	dberr_t		err;
261 
262 	rec = btr_cur_get_rec(cursor);
263 
264 	ut_ad(!dict_index_is_clust(cursor->index));
265 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
266 	ut_ad(!entry->info_bits);
267 
268 	/* We know that in the alphabetical ordering, entry and rec are
269 	identified. But in their binary form there may be differences if
270 	there are char fields in them. Therefore we have to calculate the
271 	difference. */
272 
273 	update = row_upd_build_sec_rec_difference_binary(
274 		rec, cursor->index, *offsets, entry, heap);
275 
276 	if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
277 		/* We should never insert in place of a record that
278 		has not been delete-marked. The only exception is when
279 		online CREATE INDEX copied the changes that we already
280 		made to the clustered index, and completed the
281 		secondary index creation before we got here. In this
282 		case, the change would already be there. The CREATE
283 		INDEX should be waiting for a MySQL meta-data lock
284 		upgrade at least until this INSERT or UPDATE
285 		returns. After that point, set_committed(true)
286 		would be invoked in commit_inplace_alter_table(). */
287 		ut_a(update->n_fields == 0);
288 		ut_a(!cursor->index->is_committed());
289 		ut_ad(!dict_index_is_online_ddl(cursor->index));
290 		return(DB_SUCCESS);
291 	}
292 
293 	if (mode == BTR_MODIFY_LEAF) {
294 		/* Try an optimistic updating of the record, keeping changes
295 		within the page */
296 
297 		/* TODO: pass only *offsets */
298 		err = btr_cur_optimistic_update(
299 			flags | BTR_KEEP_SYS_FLAG, cursor,
300 			offsets, &offsets_heap, update, 0, thr,
301 			thr_get_trx(thr)->id, mtr);
302 		switch (err) {
303 		case DB_OVERFLOW:
304 		case DB_UNDERFLOW:
305 		case DB_ZIP_OVERFLOW:
306 			err = DB_FAIL;
307 		default:
308 			break;
309 		}
310 	} else {
311 		ut_a(mode == BTR_MODIFY_TREE);
312 		if (buf_LRU_buf_pool_running_out()) {
313 
314 			return(DB_LOCK_TABLE_FULL);
315 		}
316 
317 		err = btr_cur_pessimistic_update(
318 			flags | BTR_KEEP_SYS_FLAG, cursor,
319 			offsets, &offsets_heap,
320 			heap, &dummy_big_rec, update, 0,
321 			thr, thr_get_trx(thr)->id, mtr);
322 		ut_ad(!dummy_big_rec);
323 	}
324 
325 	return(err);
326 }
327 
328 /*******************************************************************//**
329 Does an insert operation by delete unmarking and updating a delete marked
330 existing record in the index. This situation can occur if the delete marked
331 record is kept in the index for consistent reads.
332 @return DB_SUCCESS, DB_FAIL, or error code */
333 static MY_ATTRIBUTE((warn_unused_result))
334 dberr_t
row_ins_clust_index_entry_by_modify(btr_pcur_t * pcur,ulint flags,ulint mode,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)335 row_ins_clust_index_entry_by_modify(
336 /*================================*/
337 	btr_pcur_t*	pcur,	/*!< in/out: a persistent cursor pointing
338 				to the clust_rec that is being modified. */
339 	ulint		flags,	/*!< in: undo logging and locking flags */
340 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
341 				depending on whether mtr holds just a leaf
342 				latch or also a tree latch */
343 	ulint**		offsets,/*!< out: offsets on cursor->page_cur.rec */
344 	mem_heap_t**	offsets_heap,
345 				/*!< in/out: pointer to memory heap that can
346 				be emptied, or NULL */
347 	mem_heap_t*	heap,	/*!< in/out: memory heap */
348 	const dtuple_t*	entry,	/*!< in: index entry to insert */
349 	que_thr_t*	thr,	/*!< in: query thread */
350 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
351 				latching any further pages */
352 {
353 	const rec_t*	rec;
354 	upd_t*		update;
355 	dberr_t		err = DB_SUCCESS;
356 	btr_cur_t*	cursor	= btr_pcur_get_btr_cur(pcur);
357 	TABLE*		mysql_table = NULL;
358 	ut_ad(dict_index_is_clust(cursor->index));
359 
360 	rec = btr_cur_get_rec(cursor);
361 
362 	ut_ad(rec_get_deleted_flag(rec,
363 				   dict_table_is_comp(cursor->index->table)));
364 
365 	/* Build an update vector containing all the fields to be modified;
366 	NOTE that this vector may NOT contain system columns trx_id or
367 	roll_ptr */
368 	if (thr->prebuilt != NULL) {
369 		mysql_table = thr->prebuilt->m_mysql_table;
370 		ut_ad(thr->prebuilt->trx == thr_get_trx(thr));
371 	}
372 
373 	update = row_upd_build_difference_binary(
374 		cursor->index, entry, rec, NULL, true,
375 		thr_get_trx(thr), heap, mysql_table, thr->prebuilt, &err);
376 	if (err != DB_SUCCESS) {
377 		return(err);
378 	}
379 
380 	if (mode != BTR_MODIFY_TREE) {
381 		ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
382 
383 		/* Try optimistic updating of the record, keeping changes
384 		within the page */
385 
386 		err = btr_cur_optimistic_update(
387 			flags, cursor, offsets, offsets_heap, update, 0, thr,
388 			thr_get_trx(thr)->id, mtr);
389 		switch (err) {
390 		case DB_OVERFLOW:
391 		case DB_UNDERFLOW:
392 		case DB_ZIP_OVERFLOW:
393 			err = DB_FAIL;
394 		default:
395 			break;
396 		}
397 	} else {
398 		if (buf_LRU_buf_pool_running_out()) {
399 
400 			return(DB_LOCK_TABLE_FULL);
401 
402 		}
403 
404 		big_rec_t*	big_rec	= NULL;
405 
406 		err = btr_cur_pessimistic_update(
407 			flags | BTR_KEEP_POS_FLAG,
408 			cursor, offsets, offsets_heap, heap,
409 			&big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
410 
411 		if (big_rec) {
412 			ut_a(err == DB_SUCCESS);
413 
414 			DEBUG_SYNC_C("before_row_ins_upd_extern");
415 			err = btr_store_big_rec_extern_fields(
416 				pcur, update, *offsets, big_rec, mtr,
417 				BTR_STORE_INSERT_UPDATE);
418 			DEBUG_SYNC_C("after_row_ins_upd_extern");
419 			dtuple_big_rec_free(big_rec);
420 		}
421 	}
422 
423 	return(err);
424 }
425 
426 /*********************************************************************//**
427 Returns TRUE if in a cascaded update/delete an ancestor node of node
428 updates (not DELETE, but UPDATE) table.
429 @return TRUE if an ancestor updates table */
430 static
431 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)432 row_ins_cascade_ancestor_updates_table(
433 /*===================================*/
434 	que_node_t*	node,	/*!< in: node in a query graph */
435 	dict_table_t*	table)	/*!< in: table */
436 {
437 	que_node_t*	parent;
438 
439 	for (parent = que_node_get_parent(node);
440 	     que_node_get_type(parent) == QUE_NODE_UPDATE;
441 	     parent = que_node_get_parent(parent)) {
442 
443 		upd_node_t*	upd_node;
444 
445 		upd_node = static_cast<upd_node_t*>(parent);
446 
447 		if (upd_node->table == table && upd_node->is_delete == FALSE) {
448 
449 			return(TRUE);
450 		}
451 	}
452 
453 	return(FALSE);
454 }
455 
456 /*********************************************************************//**
457 Returns the number of ancestor UPDATE or DELETE nodes of a
458 cascaded update/delete node.
459 @return number of ancestors */
460 static MY_ATTRIBUTE((warn_unused_result))
461 ulint
row_ins_cascade_n_ancestors(que_node_t * node)462 row_ins_cascade_n_ancestors(
463 /*========================*/
464 	que_node_t*	node)	/*!< in: node in a query graph */
465 {
466 	que_node_t*	parent;
467 	ulint		n_ancestors = 0;
468 
469 	for (parent = que_node_get_parent(node);
470 	     que_node_get_type(parent) == QUE_NODE_UPDATE;
471 	     parent = que_node_get_parent(parent)) {
472 
473 		n_ancestors++;
474 	}
475 
476 	return(n_ancestors);
477 }
478 
479 /******************************************************************//**
480 Calculates the update vector node->cascade->update for a child table in
481 a cascaded update.
482 @return number of fields in the calculated update vector; the value
483 can also be 0 if no foreign key fields changed; the returned value is
484 ULINT_UNDEFINED if the column type in the child table is too short to
485 fit the new value in the parent table: that means the update fails */
486 static MY_ATTRIBUTE((warn_unused_result))
487 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)488 row_ins_cascade_calc_update_vec(
489 /*============================*/
490 	upd_node_t*	node,		/*!< in: update node of the parent
491 					table */
492 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
493 					type is != 0 */
494 	mem_heap_t*	heap,		/*!< in: memory heap to use as
495 					temporary storage */
496 	trx_t*		trx,		/*!< in: update transaction */
497 	ibool*		fts_col_affected)
498 					/*!< out: is FTS column affected */
499 {
500 	upd_node_t*     cascade         = node->cascade_node;
501 	dict_table_t*	table		= foreign->foreign_table;
502 	dict_index_t*	index		= foreign->foreign_index;
503 	upd_t*		update;
504 	dict_table_t*	parent_table;
505 	dict_index_t*	parent_index;
506 	upd_t*		parent_update;
507 	ulint		n_fields_updated;
508 	ulint		parent_field_no;
509 	ulint		i;
510 	ulint		j;
511 	ibool		doc_id_updated = FALSE;
512 	ulint		doc_id_pos = 0;
513 	doc_id_t	new_doc_id = FTS_NULL_DOC_ID;
514 
515 	ut_a(node);
516 	ut_a(foreign);
517 	ut_a(cascade);
518 	ut_a(table);
519 	ut_a(index);
520 
521 	/* Calculate the appropriate update vector which will set the fields
522 	in the child index record to the same value (possibly padded with
523 	spaces if the column is a fixed length CHAR or FIXBINARY column) as
524 	the referenced index record will get in the update. */
525 
526 	parent_table = node->table;
527 	ut_a(parent_table == foreign->referenced_table);
528 	parent_index = foreign->referenced_index;
529 	parent_update = node->update;
530 
531 	update = cascade->update;
532 
533 	update->info_bits = 0;
534 
535 	n_fields_updated = 0;
536 
537 	*fts_col_affected = FALSE;
538 
539 	if (table->fts) {
540 		doc_id_pos = dict_table_get_nth_col_pos(
541 			table, table->fts->doc_col);
542 	}
543 
544 	for (i = 0; i < foreign->n_fields; i++) {
545 
546 		parent_field_no = dict_table_get_nth_col_pos(
547 			parent_table,
548 			dict_index_get_nth_col_no(parent_index, i));
549 
550 		for (j = 0; j < parent_update->n_fields; j++) {
551 			const upd_field_t*	parent_ufield
552 				= &parent_update->fields[j];
553 
554 			if (parent_ufield->field_no == parent_field_no) {
555 
556 				ulint			min_size;
557 				const dict_col_t*	col;
558 				ulint			ufield_len;
559 				upd_field_t*		ufield;
560 
561 				col = dict_index_get_nth_col(index, i);
562 
563 				/* A field in the parent index record is
564 				updated. Let us make the update vector
565 				field for the child table. */
566 
567 				ufield = update->fields + n_fields_updated;
568 
569 				ufield->field_no
570 					= dict_table_get_nth_col_pos(
571 					table, dict_col_get_no(col));
572 
573 				ufield->orig_len = 0;
574 				ufield->exp = NULL;
575 
576 				ufield->new_val = parent_ufield->new_val;
577 				ufield_len = dfield_get_len(&ufield->new_val);
578 
579 				/* Clear the "external storage" flag */
580 				dfield_set_len(&ufield->new_val, ufield_len);
581 
582 				/* Do not allow a NOT NULL column to be
583 				updated as NULL */
584 
585 				if (dfield_is_null(&ufield->new_val)
586 				    && (col->prtype & DATA_NOT_NULL)) {
587 
588 					return(ULINT_UNDEFINED);
589 				}
590 
591 				/* If the new value would not fit in the
592 				column, do not allow the update */
593 
594 				if (!dfield_is_null(&ufield->new_val)
595 				    && dtype_get_at_most_n_mbchars(
596 					col->prtype, col->mbminmaxlen,
597 					col->len,
598 					ufield_len,
599 					static_cast<char*>(
600 						dfield_get_data(
601 							&ufield->new_val)))
602 				    < ufield_len) {
603 
604 					return(ULINT_UNDEFINED);
605 				}
606 
607 				/* If the parent column type has a different
608 				length than the child column type, we may
609 				need to pad with spaces the new value of the
610 				child column */
611 
612 				min_size = dict_col_get_min_size(col);
613 
614 				/* Because UNIV_SQL_NULL (the marker
615 				of SQL NULL values) exceeds all possible
616 				values of min_size, the test below will
617 				not hold for SQL NULL columns. */
618 
619 				if (min_size > ufield_len) {
620 
621 					byte*	pad;
622 					ulint	pad_len;
623 					byte*	padded_data;
624 					ulint	mbminlen;
625 
626 					padded_data = static_cast<byte*>(
627 						mem_heap_alloc(
628 							heap, min_size));
629 
630 					pad = padded_data + ufield_len;
631 					pad_len = min_size - ufield_len;
632 
633 					memcpy(padded_data,
634 					       dfield_get_data(&ufield
635 							       ->new_val),
636 					       ufield_len);
637 
638 					mbminlen = dict_col_get_mbminlen(col);
639 
640 					ut_ad(!(ufield_len % mbminlen));
641 					ut_ad(!(min_size % mbminlen));
642 
643 					if (mbminlen == 1
644 					    && dtype_get_charset_coll(
645 						    col->prtype)
646 					    == DATA_MYSQL_BINARY_CHARSET_COLL) {
647 						/* Do not pad BINARY columns */
648 						return(ULINT_UNDEFINED);
649 					}
650 
651 					row_mysql_pad_col(mbminlen,
652 							  pad, pad_len);
653 					dfield_set_data(&ufield->new_val,
654 							padded_data, min_size);
655 				}
656 
657 				/* Check whether the current column has
658 				FTS index on it */
659 				if (table->fts
660 				    && dict_table_is_fts_column(
661 					table->fts->indexes,
662 					dict_col_get_no(col),
663 					dict_col_is_virtual(col))
664 					!= ULINT_UNDEFINED) {
665 					*fts_col_affected = TRUE;
666 				}
667 
668 				/* If Doc ID is updated, check whether the
669 				Doc ID is valid */
670 				if (table->fts
671 				    && ufield->field_no == doc_id_pos) {
672 					doc_id_t	n_doc_id;
673 
674 					n_doc_id =
675 						table->fts->cache->next_doc_id;
676 
677 					new_doc_id = fts_read_doc_id(
678 						static_cast<const byte*>(
679 							dfield_get_data(
680 							&ufield->new_val)));
681 
682 					if (new_doc_id <= 0) {
683 						ib::error() << "FTS Doc ID"
684 							" must be larger than"
685 							" 0";
686 						return(ULINT_UNDEFINED);
687 					}
688 
689 					if (new_doc_id < n_doc_id) {
690 						ib::error() << "FTS Doc ID"
691 							" must be larger than "
692 							<< n_doc_id - 1
693 							<< " for table "
694 							<< table->name;
695 
696 						return(ULINT_UNDEFINED);
697 					}
698 
699 					*fts_col_affected = TRUE;
700 					doc_id_updated = TRUE;
701 				}
702 
703 				n_fields_updated++;
704 			}
705 		}
706 	}
707 
708 	/* Generate a new Doc ID if FTS index columns get updated */
709 	if (table->fts && *fts_col_affected) {
710 		if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
711 			doc_id_t	doc_id;
712 			doc_id_t*	next_doc_id;
713 			upd_field_t*	ufield;
714 
715 			next_doc_id = static_cast<doc_id_t*>(mem_heap_alloc(
716 				heap, sizeof(doc_id_t)));
717 
718 			ut_ad(!doc_id_updated);
719 			ufield = update->fields + n_fields_updated;
720 			fts_get_next_doc_id(table, next_doc_id);
721 			doc_id = fts_update_doc_id(table, ufield, next_doc_id);
722 			n_fields_updated++;
723 			fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
724 		} else  {
725 			if (doc_id_updated) {
726 				ut_ad(new_doc_id);
727 				fts_trx_add_op(trx, table, new_doc_id,
728 					       FTS_INSERT, NULL);
729 			} else {
730 				ib::error() << "FTS Doc ID must be updated"
731 					" along with FTS indexed column for"
732 					" table " << table->name;
733 				return(ULINT_UNDEFINED);
734 			}
735 		}
736 	}
737 
738 	update->n_fields = n_fields_updated;
739 
740 	return(n_fields_updated);
741 }
742 
743 /*********************************************************************//**
744 Set detailed error message associated with foreign key errors for
745 the given transaction. */
746 static
747 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)748 row_ins_set_detailed(
749 /*=================*/
750 	trx_t*		trx,		/*!< in: transaction */
751 	dict_foreign_t*	foreign)	/*!< in: foreign key constraint */
752 {
753 	ut_ad(!srv_read_only_mode);
754 
755 	mutex_enter(&srv_misc_tmpfile_mutex);
756 	rewind(srv_misc_tmpfile);
757 
758 	if (os_file_set_eof(srv_misc_tmpfile)) {
759 		ut_print_name(srv_misc_tmpfile, trx,
760 			      foreign->foreign_table_name);
761 		dict_print_info_on_foreign_key_in_create_format(
762 			srv_misc_tmpfile, trx, foreign, FALSE);
763 		trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
764 	} else {
765 		trx_set_detailed_error(trx, "temp file operation failed");
766 	}
767 
768 	mutex_exit(&srv_misc_tmpfile_mutex);
769 }
770 
771 /*********************************************************************//**
772 Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
773 and displays information about the given transaction.
774 The caller must release dict_foreign_err_mutex. */
775 static
776 void
row_ins_foreign_trx_print(trx_t * trx)777 row_ins_foreign_trx_print(
778 /*======================*/
779 	trx_t*	trx)	/*!< in: transaction */
780 {
781 	ulint	n_rec_locks;
782 	ulint	n_trx_locks;
783 	ulint	heap_size;
784 
785 	if (srv_read_only_mode) {
786 		return;
787 	}
788 
789 	lock_mutex_enter();
790 	n_rec_locks = lock_number_of_rows_locked(&trx->lock);
791 	n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
792 	heap_size = mem_heap_get_size(trx->lock.lock_heap);
793 	lock_mutex_exit();
794 
795 	trx_sys_mutex_enter();
796 
797 	mutex_enter(&dict_foreign_err_mutex);
798 	rewind(dict_foreign_err_file);
799 	ut_print_timestamp(dict_foreign_err_file);
800 	fputs(" Transaction:\n", dict_foreign_err_file);
801 
802 	trx_print_low(dict_foreign_err_file, trx, 600,
803 		      n_rec_locks, n_trx_locks, heap_size);
804 
805 	trx_sys_mutex_exit();
806 
807 	ut_ad(mutex_own(&dict_foreign_err_mutex));
808 }
809 
810 /*********************************************************************//**
811 Reports a foreign key error associated with an update or a delete of a
812 parent table index entry. */
813 static
814 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)815 row_ins_foreign_report_err(
816 /*=======================*/
817 	const char*	errstr,		/*!< in: error string from the viewpoint
818 					of the parent table */
819 	que_thr_t*	thr,		/*!< in: query thread whose run_node
820 					is an update node */
821 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
822 	const rec_t*	rec,		/*!< in: a matching index record in the
823 					child table */
824 	const dtuple_t*	entry)		/*!< in: index entry in the parent
825 					table */
826 {
827 	if (srv_read_only_mode) {
828 		return;
829 	}
830 
831 	FILE*	ef	= dict_foreign_err_file;
832 	trx_t*	trx	= thr_get_trx(thr);
833 
834 	row_ins_set_detailed(trx, foreign);
835 
836 	row_ins_foreign_trx_print(trx);
837 
838 	fputs("Foreign key constraint fails for table ", ef);
839 	ut_print_name(ef, trx, foreign->foreign_table_name);
840 	fputs(":\n", ef);
841 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
842 							TRUE);
843 	putc('\n', ef);
844 	fputs(errstr, ef);
845 	fprintf(ef, " in parent table, in index %s",
846 		foreign->referenced_index->name());
847 	if (entry) {
848 		fputs(" tuple:\n", ef);
849 		dtuple_print(ef, entry);
850 	}
851 	fputs("\nBut in child table ", ef);
852 	ut_print_name(ef, trx, foreign->foreign_table_name);
853 	fprintf(ef, ", in index %s", foreign->foreign_index->name());
854 	if (rec) {
855 		fputs(", there is a record:\n", ef);
856 		rec_print(ef, rec, foreign->foreign_index);
857 	} else {
858 		fputs(", the record is not available\n", ef);
859 	}
860 	putc('\n', ef);
861 
862 	mutex_exit(&dict_foreign_err_mutex);
863 }
864 
865 /*********************************************************************//**
866 Reports a foreign key error to dict_foreign_err_file when we are trying
867 to add an index entry to a child table. Note that the adding may be the result
868 of an update, too. */
869 static
870 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)871 row_ins_foreign_report_add_err(
872 /*===========================*/
873 	trx_t*		trx,		/*!< in: transaction */
874 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
875 	const rec_t*	rec,		/*!< in: a record in the parent table:
876 					it does not match entry because we
877 					have an error! */
878 	const dtuple_t*	entry)		/*!< in: index entry to insert in the
879 					child table */
880 {
881 	if (srv_read_only_mode) {
882 		return;
883 	}
884 
885 	FILE*	ef	= dict_foreign_err_file;
886 
887 	row_ins_set_detailed(trx, foreign);
888 
889 	row_ins_foreign_trx_print(trx);
890 
891 	fputs("Foreign key constraint fails for table ", ef);
892 	ut_print_name(ef, trx, foreign->foreign_table_name);
893 	fputs(":\n", ef);
894 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
895 							TRUE);
896 	fprintf(ef, "\nTrying to add in child table, in index %s",
897 		foreign->foreign_index->name());
898 	if (entry) {
899 		fputs(" tuple:\n", ef);
900 		/* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
901 		It would be better to only display the user columns. */
902 		dtuple_print(ef, entry);
903 	}
904 	fputs("\nBut in parent table ", ef);
905 	ut_print_name(ef, trx, foreign->referenced_table_name);
906 	fprintf(ef, ", in index %s,\n"
907 		"the closest match we can find is record:\n",
908 		foreign->referenced_index->name());
909 	if (rec && page_rec_is_supremum(rec)) {
910 		/* If the cursor ended on a supremum record, it is better
911 		to report the previous record in the error message, so that
912 		the user gets a more descriptive error message. */
913 		rec = page_rec_get_prev_const(rec);
914 	}
915 
916 	if (rec) {
917 		rec_print(ef, rec, foreign->referenced_index);
918 	}
919 	putc('\n', ef);
920 
921 	mutex_exit(&dict_foreign_err_mutex);
922 }
923 
924 /*********************************************************************//**
925 Invalidate the query cache for the given table. */
926 static
927 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)928 row_ins_invalidate_query_cache(
929 /*===========================*/
930 	que_thr_t*	thr,		/*!< in: query thread whose run_node
931 					is an update node */
932 	const char*	name)		/*!< in: table name prefixed with
933 					database name and a '/' character */
934 {
935 	ulint	len = strlen(name) + 1;
936 	innobase_invalidate_query_cache(thr_get_trx(thr), name, len);
937 }
938 
939 /** Fill virtual column information in cascade node for the child table.
940 @param[out]	cascade		child update node
941 @param[in]	rec		clustered rec of child table
942 @param[in]	index		clustered index of child table
943 @param[in]	node		parent update node
944 @param[in]	foreign		foreign key information
945 @param[out]	err		error code. */
946 static
947 void
row_ins_foreign_fill_virtual(upd_node_t * cascade,const rec_t * rec,dict_index_t * index,upd_node_t * node,dict_foreign_t * foreign,dberr_t * err)948 row_ins_foreign_fill_virtual(
949 	upd_node_t*		cascade,
950 	const rec_t*		rec,
951 	dict_index_t*		index,
952 	upd_node_t*		node,
953 	dict_foreign_t*		foreign,
954 	dberr_t*		err)
955 {
956 	row_ext_t*	ext;
957 	THD*		thd = current_thd;
958 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
959 	mem_heap_t*	v_heap = NULL;
960 	upd_t*		update = cascade->update;
961 	rec_offs_init(offsets_);
962 	const ulint*	offsets =
963 		rec_get_offsets(rec, index, offsets_,
964 				ULINT_UNDEFINED, &update->heap);
965 	ulint		n_v_fld = index->table->n_v_def;
966 	ulint		n_diff;
967 	upd_field_t*	upd_field;
968 	dict_vcol_set*	v_cols = foreign->v_cols;
969 	row_prebuilt_t* prebuilt =
970 		static_cast<que_thr_t*>(node->common.parent)->prebuilt;
971 
972 	update->old_vrow = row_build(
973 		ROW_COPY_POINTERS, index, rec,
974 		offsets, index->table, NULL, NULL,
975 		&ext, update->heap);
976 
977 	n_diff = update->n_fields;
978 
979 	update->n_fields += n_v_fld;
980 
981 	if (index->table->vc_templ == NULL) {
982 		/** This can occur when there is a cascading
983 		delete or update after restart. */
984 		innobase_init_vc_templ(index->table);
985 	}
986 
987 	for (ulint i = 0; i < n_v_fld; i++) {
988 
989 		dict_v_col_t*     col = dict_table_get_nth_v_col(
990 				index->table, i);
991 
992 		dict_vcol_set::iterator it = v_cols->find(col);
993 
994 		if (it == v_cols->end()) {
995 			continue;
996 		}
997 
998 		dfield_t*	vfield = innobase_get_computed_value(
999 				update->old_vrow, col, index,
1000 				&v_heap, update->heap, NULL, thd, NULL,
1001 				NULL, NULL, NULL, prebuilt);
1002 
1003 		if (vfield == NULL) {
1004 			*err = DB_COMPUTE_VALUE_FAILED;
1005 			goto func_exit;
1006 		}
1007 
1008 		upd_field = upd_get_nth_field(update, n_diff);
1009 
1010 		upd_field->old_v_val = static_cast<dfield_t*>(
1011 				mem_heap_alloc(update->heap,
1012 					sizeof *upd_field->old_v_val));
1013 
1014 		dfield_copy(upd_field->old_v_val, vfield);
1015 
1016 		upd_field_set_v_field_no(upd_field, i, index);
1017 
1018 		if (node->is_delete
1019 		    ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1020 		    : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1021 			uint32_t col_match_count =
1022 			    dict_vcol_base_is_foreign_key(col, foreign);
1023 			if (col_match_count == col->num_base) {
1024 				/* If all base columns of virtual col are
1025 				in FK */
1026 				dfield_set_null(&upd_field->new_val);
1027 			} else if (col_match_count == 0) {
1028 				/* If no base column of virtual col is in FK */
1029 				dfield_copy(&(upd_field->new_val), vfield);
1030 			} else {
1031 				/* If at least one base column of virtual col
1032 				is in FK */
1033 				for (uint32_t j = 0; j < col->num_base; j++) {
1034 					dict_col_t *base_col = col->base_col[j];
1035 					uint32_t col_no = base_col->ind;
1036 					dfield_t *row_field =
1037 					  innobase_get_field_from_update_vector(
1038 					  foreign, node->update, col_no);
1039 					if (row_field != NULL) {
1040 						dfield_set_null(row_field);
1041 					}
1042 				}
1043 				dfield_t *new_vfield = innobase_get_computed_value(
1044 				    update->old_vrow, col, index, &v_heap,
1045 				    update->heap, NULL, thd, NULL,
1046 				    NULL, node->update, foreign, prebuilt);
1047 				dfield_copy(&(upd_field->new_val), new_vfield);
1048 			}
1049 
1050 		}
1051 
1052 		if (!node->is_delete
1053 		    && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1054 
1055 			dfield_t* new_vfield = innobase_get_computed_value(
1056 					update->old_vrow, col, index,
1057 					&v_heap, update->heap, NULL, thd,
1058 					NULL, NULL, node->update, foreign,
1059 					prebuilt);
1060 
1061 			if (new_vfield == NULL) {
1062 				*err = DB_COMPUTE_VALUE_FAILED;
1063 				goto func_exit;
1064 			}
1065 
1066 			dfield_copy(&(upd_field->new_val), new_vfield);
1067 		}
1068 
1069 		n_diff++;
1070 	}
1071 
1072 	update->n_fields = n_diff;
1073 	*err = DB_SUCCESS;
1074 
1075 func_exit:
1076 	if (v_heap) {
1077 		mem_heap_free(v_heap);
1078 	}
1079 }
1080 
1081 /*********************************************************************//**
1082 Perform referential actions or checks when a parent row is deleted or updated
1083 and the constraint had an ON DELETE or ON UPDATE condition which was not
1084 RESTRICT.
1085 @return DB_SUCCESS, DB_LOCK_WAIT, or error code */
1086 static MY_ATTRIBUTE((warn_unused_result))
1087 dberr_t
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)1088 row_ins_foreign_check_on_constraint(
1089 /*================================*/
1090 	que_thr_t*	thr,		/*!< in: query thread whose run_node
1091 					is an update node */
1092 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
1093 					type is != 0 */
1094 	btr_pcur_t*	pcur,		/*!< in: cursor placed on a matching
1095 					index record in the child table */
1096 	dtuple_t*	entry,		/*!< in: index entry in the parent
1097 					table */
1098 	mtr_t*		mtr)		/*!< in: mtr holding the latch of pcur
1099 					page */
1100 {
1101 	upd_node_t*	node;
1102 	upd_node_t*	cascade;
1103 	dict_table_t*	table		= foreign->foreign_table;
1104 	dict_index_t*	index;
1105 	dict_index_t*	clust_index;
1106 	dtuple_t*	ref;
1107 	const rec_t*	rec;
1108 	const rec_t*	clust_rec;
1109 	const buf_block_t* clust_block;
1110 	upd_t*		update;
1111 	ulint		n_to_update;
1112 	dberr_t		err;
1113 	ulint		i;
1114 	trx_t*		trx;
1115 	mem_heap_t*	tmp_heap	= NULL;
1116 	doc_id_t	doc_id = FTS_NULL_DOC_ID;
1117 	ibool		fts_col_affacted = FALSE;
1118 
1119 	DBUG_ENTER("row_ins_foreign_check_on_constraint");
1120 	ut_a(thr);
1121 	ut_a(foreign);
1122 	ut_a(pcur);
1123 	ut_a(mtr);
1124 
1125 	trx = thr_get_trx(thr);
1126 
1127 	/* Since we are going to delete or update a row, we have to invalidate
1128 	the MySQL query cache for table. A deadlock of threads is not possible
1129 	here because the caller of this function does not hold any latches with
1130 	the mutex rank above the lock_sys_t::mutex. The query cache mutex
1131 	has a rank just above the lock_sys_t::mutex. */
1132 
1133 	row_ins_invalidate_query_cache(thr, table->name.m_name);
1134 
1135 	node = static_cast<upd_node_t*>(thr->run_node);
1136 
1137 	if (node->is_delete && 0 == (foreign->type
1138 				     & (DICT_FOREIGN_ON_DELETE_CASCADE
1139 					| DICT_FOREIGN_ON_DELETE_SET_NULL))) {
1140 
1141 		row_ins_foreign_report_err("Trying to delete",
1142 					   thr, foreign,
1143 					   btr_pcur_get_rec(pcur), entry);
1144 
1145 		DBUG_RETURN(DB_ROW_IS_REFERENCED);
1146 	}
1147 
1148 	if (!node->is_delete && 0 == (foreign->type
1149 				      & (DICT_FOREIGN_ON_UPDATE_CASCADE
1150 					 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
1151 
1152 		/* This is an UPDATE */
1153 
1154 		row_ins_foreign_report_err("Trying to update",
1155 					   thr, foreign,
1156 					   btr_pcur_get_rec(pcur), entry);
1157 
1158 		DBUG_RETURN(DB_ROW_IS_REFERENCED);
1159 	}
1160 
1161 	if (node->cascade_node == NULL) {
1162 		node->cascade_heap = mem_heap_create(128);
1163 		node->cascade_node = row_create_update_node_for_mysql(
1164 			table, node->cascade_heap);
1165 		que_node_set_parent(node->cascade_node, node);
1166 
1167 	}
1168 	cascade = node->cascade_node;
1169 	cascade->table = table;
1170 	cascade->foreign = foreign;
1171 
1172 	if (node->is_delete
1173 	    && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1174 		cascade->is_delete = TRUE;
1175 	} else {
1176 		cascade->is_delete = FALSE;
1177 
1178 		if (foreign->n_fields > cascade->update_n_fields) {
1179 			/* We have to make the update vector longer */
1180 
1181 			cascade->update = upd_create(foreign->n_fields,
1182 						     node->cascade_heap);
1183 			cascade->update_n_fields = foreign->n_fields;
1184 		}
1185 	}
1186 
1187 	/* We do not allow cyclic cascaded updating (DELETE is allowed,
1188 	but not UPDATE) of the same table, as this can lead to an infinite
1189 	cycle. Check that we are not updating the same table which is
1190 	already being modified in this cascade chain. We have to check
1191 	this also because the modification of the indexes of a 'parent'
1192 	table may still be incomplete, and we must avoid seeing the indexes
1193 	of the parent table in an inconsistent state! */
1194 
1195 	if (!cascade->is_delete
1196 	    && row_ins_cascade_ancestor_updates_table(cascade, table)) {
1197 
1198 		/* We do not know if this would break foreign key
1199 		constraints, but play safe and return an error */
1200 
1201 		err = DB_ROW_IS_REFERENCED;
1202 
1203 		row_ins_foreign_report_err(
1204 			"Trying an update, possibly causing a cyclic"
1205 			" cascaded update\n"
1206 			"in the child table,", thr, foreign,
1207 			btr_pcur_get_rec(pcur), entry);
1208 
1209 		goto nonstandard_exit_func;
1210 	}
1211 
1212 	if (row_ins_cascade_n_ancestors(cascade) >= FK_MAX_CASCADE_DEL) {
1213 		err = DB_FOREIGN_EXCEED_MAX_CASCADE;
1214 
1215 		row_ins_foreign_report_err(
1216 			"Trying a too deep cascaded delete or update\n",
1217 			thr, foreign, btr_pcur_get_rec(pcur), entry);
1218 
1219 		goto nonstandard_exit_func;
1220 	}
1221 
1222 	index = btr_pcur_get_btr_cur(pcur)->index;
1223 
1224 	ut_a(index == foreign->foreign_index);
1225 
1226 	rec = btr_pcur_get_rec(pcur);
1227 
1228 	tmp_heap = mem_heap_create(256);
1229 
1230 	if (dict_index_is_clust(index)) {
1231 		/* pcur is already positioned in the clustered index of
1232 		the child table */
1233 
1234 		clust_index = index;
1235 		clust_rec = rec;
1236 		clust_block = btr_pcur_get_block(pcur);
1237 	} else {
1238 		/* We have to look for the record in the clustered index
1239 		in the child table */
1240 
1241 		clust_index = dict_table_get_first_index(table);
1242 
1243 		ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1244 					tmp_heap);
1245 		btr_pcur_open_with_no_init(clust_index, ref,
1246 					   PAGE_CUR_LE, BTR_SEARCH_LEAF,
1247 					   cascade->pcur, 0, mtr);
1248 
1249 		clust_rec = btr_pcur_get_rec(cascade->pcur);
1250 		clust_block = btr_pcur_get_block(cascade->pcur);
1251 
1252 		if (!page_rec_is_user_rec(clust_rec)
1253 		    || btr_pcur_get_low_match(cascade->pcur)
1254 		    < dict_index_get_n_unique(clust_index)) {
1255 
1256 			ib::error() << "In cascade of a foreign key op index "
1257 				<< index->name
1258 				<< " of table " << index->table->name;
1259 
1260 			fputs("InnoDB: record ", stderr);
1261 			rec_print(stderr, rec, index);
1262 			fputs("\n"
1263 			      "InnoDB: clustered record ", stderr);
1264 			rec_print(stderr, clust_rec, clust_index);
1265 			fputs("\n"
1266 			      "InnoDB: Submit a detailed bug report to"
1267 			      " http://bugs.mysql.com\n", stderr);
1268 			ut_ad(0);
1269 			err = DB_SUCCESS;
1270 
1271 			goto nonstandard_exit_func;
1272 		}
1273 	}
1274 
1275 	/* Set an X-lock on the row to delete or update in the child table */
1276 
1277 	err = lock_table(0, table, LOCK_IX, thr);
1278 
1279 	if (err == DB_SUCCESS) {
1280 		/* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1281 		we already have a normal shared lock on the appropriate
1282 		gap if the search criterion was not unique */
1283 
1284 		err = lock_clust_rec_read_check_and_lock_alt(
1285 			0, clust_block, clust_rec, clust_index,
1286 			LOCK_X, LOCK_REC_NOT_GAP, thr);
1287 	}
1288 
1289 	if (err != DB_SUCCESS) {
1290 
1291 		goto nonstandard_exit_func;
1292 	}
1293 
1294 	if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1295 		/* This can happen if there is a circular reference of
1296 		rows such that cascading delete comes to delete a row
1297 		already in the process of being delete marked */
1298 		err = DB_SUCCESS;
1299 
1300 		goto nonstandard_exit_func;
1301 	}
1302 
1303 
1304 	if (table->fts) {
1305 		doc_id = fts_get_doc_id_from_rec(table, clust_rec,
1306 						 clust_index, tmp_heap);
1307 	}
1308 
1309 	/* A cascade delete from the parent table triggers delete on the child
1310 	table. Before a clustered index record is deleted in the child table,
1311 	a copy of row is built to remove secondary index records. This copy of
1312 	the row requires virtual columns to be materialized. Hence, if child
1313 	table has any virtual columns, we have to initialize virtual column
1314 	template */
1315 	if (cascade->is_delete && dict_table_get_n_v_cols(table) > 0
1316 	    && table->vc_templ == NULL) {
1317 		innobase_init_vc_templ(table);
1318 	}
1319 	if (node->is_delete
1320 	    ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1321 	    : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1322 		/* Build the appropriate update vector which sets
1323 		foreign->n_fields first fields in rec to SQL NULL */
1324 
1325 		update = cascade->update;
1326 
1327 		update->info_bits = 0;
1328 		update->n_fields = foreign->n_fields;
1329 		UNIV_MEM_INVALID(update->fields,
1330 				 update->n_fields * sizeof *update->fields);
1331 
1332 		for (i = 0; i < foreign->n_fields; i++) {
1333 			upd_field_t*	ufield = &update->fields[i];
1334 			ulint		col_no = dict_index_get_nth_col_no(
1335 						index, i);
1336 
1337 			ufield->field_no = dict_table_get_nth_col_pos(
1338 				table, col_no);
1339 			dict_col_t*	col = dict_table_get_nth_col(
1340 				table, col_no);
1341 			dict_col_copy_type(col, dfield_get_type(&ufield->new_val));
1342 
1343 			ufield->orig_len = 0;
1344 			ufield->exp = NULL;
1345 			dfield_set_null(&ufield->new_val);
1346 
1347 			if (table->fts && dict_table_is_fts_column(
1348 				table->fts->indexes,
1349 				dict_index_get_nth_col_no(index, i),
1350 				dict_col_is_virtual(
1351 					dict_index_get_nth_col(index, i)))
1352 			    != ULINT_UNDEFINED) {
1353 				fts_col_affacted = TRUE;
1354 			}
1355 		}
1356 
1357 		if (fts_col_affacted) {
1358 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1359 		}
1360 
1361 		if (foreign->v_cols != NULL
1362 		    && foreign->v_cols->size() > 0) {
1363 			row_ins_foreign_fill_virtual(
1364 				cascade, clust_rec, clust_index,
1365 				node, foreign, &err);
1366 
1367 			if (err != DB_SUCCESS) {
1368 				goto nonstandard_exit_func;
1369 			}
1370 		}
1371 
1372 	} else if (table->fts && cascade->is_delete) {
1373 		/* DICT_FOREIGN_ON_DELETE_CASCADE case */
1374 		for (i = 0; i < foreign->n_fields; i++) {
1375 			if (table->fts && dict_table_is_fts_column(
1376 				table->fts->indexes,
1377 				dict_index_get_nth_col_no(index, i),
1378 				dict_col_is_virtual(
1379 					dict_index_get_nth_col(index, i)))
1380 			    != ULINT_UNDEFINED) {
1381 				fts_col_affacted = TRUE;
1382 			}
1383 		}
1384 
1385 		if (fts_col_affacted) {
1386 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1387 		}
1388 	}
1389 
1390 	if (!node->is_delete
1391 	    && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1392 
1393 		/* Build the appropriate update vector which sets changing
1394 		foreign->n_fields first fields in rec to new values */
1395 
1396 		n_to_update = row_ins_cascade_calc_update_vec(
1397 			node, foreign, tmp_heap,
1398 			trx, &fts_col_affacted);
1399 
1400 
1401 		if (foreign->v_cols != NULL
1402 		    && foreign->v_cols->size() > 0) {
1403 			row_ins_foreign_fill_virtual(
1404 				cascade, clust_rec, clust_index,
1405 				node, foreign, &err);
1406 
1407 			if (err != DB_SUCCESS) {
1408 				goto nonstandard_exit_func;
1409 			}
1410 		}
1411 
1412 		if (n_to_update == ULINT_UNDEFINED) {
1413 			err = DB_ROW_IS_REFERENCED;
1414 
1415 			row_ins_foreign_report_err(
1416 				"Trying a cascaded update where the"
1417 				" updated value in the child\n"
1418 				"table would not fit in the length"
1419 				" of the column, or the value would\n"
1420 				"be NULL and the column is"
1421 				" declared as not NULL in the child table,",
1422 				thr, foreign, btr_pcur_get_rec(pcur), entry);
1423 
1424 			goto nonstandard_exit_func;
1425 		}
1426 
1427 		if (cascade->update->n_fields == 0) {
1428 
1429 			/* The update does not change any columns referred
1430 			to in this foreign key constraint: no need to do
1431 			anything */
1432 
1433 			err = DB_SUCCESS;
1434 
1435 			goto nonstandard_exit_func;
1436 		}
1437 
1438 		/* Mark the old Doc ID as deleted */
1439 		if (fts_col_affacted) {
1440 			ut_ad(table->fts);
1441 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1442 		}
1443 	}
1444 
1445 	/* Store pcur position and initialize or store the cascade node
1446 	pcur stored position */
1447 
1448 	btr_pcur_store_position(pcur, mtr);
1449 
1450 	if (index == clust_index) {
1451 		btr_pcur_copy_stored_position(cascade->pcur, pcur);
1452 	} else {
1453 		btr_pcur_store_position(cascade->pcur, mtr);
1454 	}
1455 
1456 	mtr_commit(mtr);
1457 
1458 	ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1459 
1460 	cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1461 
1462 	err = row_update_cascade_for_mysql(thr, cascade,
1463                                            foreign->foreign_table);
1464 
1465 	/* Release the data dictionary latch for a while, so that we do not
1466 	starve other threads from doing CREATE TABLE etc. if we have a huge
1467 	cascaded operation running. The counter n_foreign_key_checks_running
1468 	will prevent other users from dropping or ALTERing the table when we
1469 	release the latch. */
1470 
1471 	row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1472 
1473 	DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1474 
1475 	row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1476 
1477 	mtr_start(mtr);
1478 
1479 	/* Restore pcur position */
1480 
1481 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1482 
1483 	if (tmp_heap) {
1484 		mem_heap_free(tmp_heap);
1485 	}
1486 
1487 	DBUG_RETURN(err);
1488 
1489 nonstandard_exit_func:
1490 
1491 	if (tmp_heap) {
1492 		mem_heap_free(tmp_heap);
1493 	}
1494 
1495 	btr_pcur_store_position(pcur, mtr);
1496 
1497 	mtr_commit(mtr);
1498 	mtr_start(mtr);
1499 
1500 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1501 
1502 	DBUG_RETURN(err);
1503 }
1504 
1505 /*********************************************************************//**
1506 Sets a shared lock on a record. Used in locking possible duplicate key
1507 records and also in checking foreign key constraints.
1508 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1509 static
1510 dberr_t
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1511 row_ins_set_shared_rec_lock(
1512 /*========================*/
1513 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1514 					LOCK_REC_NOT_GAP type lock */
1515 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1516 	const rec_t*		rec,	/*!< in: record */
1517 	dict_index_t*		index,	/*!< in: index */
1518 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1519 	que_thr_t*		thr)	/*!< in: query thread */
1520 {
1521 	dberr_t	err;
1522 
1523 	ut_ad(rec_offs_validate(rec, index, offsets));
1524 
1525 	if (dict_index_is_clust(index)) {
1526 		err = lock_clust_rec_read_check_and_lock(
1527 			0, block, rec, index, offsets, LOCK_S, type, thr);
1528 	} else {
1529 		err = lock_sec_rec_read_check_and_lock(
1530 			0, block, rec, index, offsets, LOCK_S, type, thr);
1531 	}
1532 
1533 	return(err);
1534 }
1535 
1536 /*********************************************************************//**
1537 Sets a exclusive lock on a record. Used in locking possible duplicate key
1538 records
1539 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1540 static
1541 dberr_t
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1542 row_ins_set_exclusive_rec_lock(
1543 /*===========================*/
1544 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1545 					LOCK_REC_NOT_GAP type lock */
1546 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1547 	const rec_t*		rec,	/*!< in: record */
1548 	dict_index_t*		index,	/*!< in: index */
1549 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1550 	que_thr_t*		thr)	/*!< in: query thread */
1551 {
1552 	dberr_t	err;
1553 
1554 	ut_ad(rec_offs_validate(rec, index, offsets));
1555 
1556 	if (dict_index_is_clust(index)) {
1557 		err = lock_clust_rec_read_check_and_lock(
1558 			0, block, rec, index, offsets, LOCK_X, type, thr);
1559 	} else {
1560 		err = lock_sec_rec_read_check_and_lock(
1561 			0, block, rec, index, offsets, LOCK_X, type, thr);
1562 	}
1563 
1564 	return(err);
1565 }
1566 
1567 /* Decrement a counter in the destructor. */
1568 class ib_dec_in_dtor {
1569 public:
ib_dec_in_dtor(ulint & c)1570 	ib_dec_in_dtor(ulint& c): counter(c) {}
~ib_dec_in_dtor()1571 	~ib_dec_in_dtor() {
1572 		os_atomic_decrement_ulint(&counter, 1);
1573 	}
1574 private:
1575 	ulint&		counter;
1576 };
1577 
1578 /***************************************************************//**
1579 Checks if foreign key constraint fails for an index entry. Sets shared locks
1580 which lock either the success or the failure of the constraint. NOTE that
1581 the caller must have a shared latch on dict_operation_lock.
1582 @return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1583 dberr_t
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1584 row_ins_check_foreign_constraint(
1585 /*=============================*/
1586 	ibool		check_ref,/*!< in: TRUE if we want to check that
1587 				the referenced table is ok, FALSE if we
1588 				want to check the foreign key table */
1589 	dict_foreign_t*	foreign,/*!< in: foreign constraint; NOTE that the
1590 				tables mentioned in it must be in the
1591 				dictionary cache if they exist at all */
1592 	dict_table_t*	table,	/*!< in: if check_ref is TRUE, then the foreign
1593 				table, else the referenced table */
1594 	dtuple_t*	entry,	/*!< in: index entry for index */
1595 	que_thr_t*	thr)	/*!< in: query thread */
1596 {
1597 	dberr_t		err;
1598 	upd_node_t*	upd_node;
1599 	dict_table_t*	check_table;
1600 	dict_index_t*	check_index;
1601 	ulint		n_fields_cmp;
1602 	btr_pcur_t	pcur;
1603 	int		cmp;
1604 	mtr_t		mtr;
1605 	trx_t*		trx		= thr_get_trx(thr);
1606 	mem_heap_t*	heap		= NULL;
1607 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1608 	ulint*		offsets		= offsets_;
1609 
1610 	bool		skip_gap_lock;
1611 
1612 	skip_gap_lock = (trx->isolation_level <= TRX_ISO_READ_COMMITTED);
1613 
1614 	DBUG_ENTER("row_ins_check_foreign_constraint");
1615 
1616 	rec_offs_init(offsets_);
1617 
1618 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_S));
1619 
1620 	err = DB_SUCCESS;
1621 
1622 	if (trx->check_foreigns == FALSE) {
1623 		/* The user has suppressed foreign key checks currently for
1624 		this session */
1625 		goto exit_func;
1626 	}
1627 
1628 	/* If any of the foreign key fields in entry is SQL NULL, we
1629 	suppress the foreign key check: this is compatible with Oracle,
1630 	for example */
1631 	for (ulint i = 0; i < foreign->n_fields; i++) {
1632 		if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1633 			goto exit_func;
1634 		}
1635 	}
1636 
1637 	if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1638 		upd_node = static_cast<upd_node_t*>(thr->run_node);
1639 
1640 		if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1641 			/* If a cascaded update is done as defined by a
1642 			foreign key constraint, do not check that
1643 			constraint for the child row. In ON UPDATE CASCADE
1644 			the update of the parent row is only half done when
1645 			we come here: if we would check the constraint here
1646 			for the child row it would fail.
1647 
1648 			A QUESTION remains: if in the child table there are
1649 			several constraints which refer to the same parent
1650 			table, we should merge all updates to the child as
1651 			one update? And the updates can be contradictory!
1652 			Currently we just perform the update associated
1653 			with each foreign key constraint, one after
1654 			another, and the user has problems predicting in
1655 			which order they are performed. */
1656 
1657 			goto exit_func;
1658 		}
1659 	}
1660 
1661 	if (check_ref) {
1662 		check_table = foreign->referenced_table;
1663 		check_index = foreign->referenced_index;
1664 	} else {
1665 		check_table = foreign->foreign_table;
1666 		check_index = foreign->foreign_index;
1667 	}
1668 
1669 	if (check_table == NULL
1670 	    || check_table->file_unreadable
1671 	    || check_index == NULL
1672 	    || fil_space_is_being_truncated(check_table->space)) {
1673 
1674 		if (!srv_read_only_mode && check_ref) {
1675 			FILE*	ef = dict_foreign_err_file;
1676 
1677 			row_ins_set_detailed(trx, foreign);
1678 
1679 			row_ins_foreign_trx_print(trx);
1680 
1681 			fputs("Foreign key constraint fails for table ", ef);
1682 			ut_print_name(ef, trx,
1683 				      foreign->foreign_table_name);
1684 			fputs(":\n", ef);
1685 			dict_print_info_on_foreign_key_in_create_format(
1686 				ef, trx, foreign, TRUE);
1687 			fprintf(ef, "\nTrying to add to index %s tuple:\n",
1688 				foreign->foreign_index->name());
1689 			dtuple_print(ef, entry);
1690 			fputs("\nBut the parent table ", ef);
1691 			ut_print_name(ef, trx,
1692 				      foreign->referenced_table_name);
1693 			fputs("\nor its .ibd file does"
1694 			      " not currently exist!, or"
1695 			      " is undergoing truncate!\n", ef);
1696 			mutex_exit(&dict_foreign_err_mutex);
1697 
1698 			err = DB_NO_REFERENCED_ROW;
1699 		}
1700 
1701 		goto exit_func;
1702 	}
1703 
1704 	if (check_table != table) {
1705 		/* We already have a LOCK_IX on table, but not necessarily
1706 		on check_table */
1707 
1708 		err = lock_table(0, check_table, LOCK_IS, thr);
1709 
1710 		if (err != DB_SUCCESS) {
1711 
1712 			goto do_possible_lock_wait;
1713 		}
1714 	}
1715 
1716 	mtr_start(&mtr);
1717 
1718 	/* Store old value on n_fields_cmp */
1719 
1720 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1721 
1722 	dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1723 
1724 	btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1725 		      BTR_SEARCH_LEAF, &pcur, &mtr);
1726 
1727 	/* Scan index records and check if there is a matching record */
1728 
1729 	do {
1730 		const rec_t*		rec = btr_pcur_get_rec(&pcur);
1731 		const buf_block_t*	block = btr_pcur_get_block(&pcur);
1732 
1733 		SRV_CORRUPT_TABLE_CHECK(block,
1734 		{
1735 			err = DB_CORRUPTION;
1736 			goto exit_loop;
1737 		});
1738 
1739 		if (page_rec_is_infimum(rec)) {
1740 
1741 			continue;
1742 		}
1743 
1744 		offsets = rec_get_offsets(rec, check_index,
1745 					  offsets, ULINT_UNDEFINED, &heap);
1746 
1747 		if (page_rec_is_supremum(rec)) {
1748 
1749 			if (skip_gap_lock) {
1750 
1751 				continue;
1752 			}
1753 
1754 			err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1755 							  rec, check_index,
1756 							  offsets, thr);
1757 			switch (err) {
1758 			case DB_SUCCESS_LOCKED_REC:
1759 			case DB_SUCCESS:
1760 				continue;
1761 			default:
1762 				goto end_scan;
1763 			}
1764 		}
1765 
1766 		cmp = cmp_dtuple_rec(entry, rec, offsets);
1767 
1768 		if (cmp == 0) {
1769 
1770 			ulint	lock_type;
1771 
1772 			lock_type = skip_gap_lock
1773 				? LOCK_REC_NOT_GAP
1774 				: LOCK_ORDINARY;
1775 
1776 			if (rec_get_deleted_flag(rec,
1777 						 rec_offs_comp(offsets))) {
1778 				err = row_ins_set_shared_rec_lock(
1779 					lock_type, block,
1780 					rec, check_index, offsets, thr);
1781 				switch (err) {
1782 				case DB_SUCCESS_LOCKED_REC:
1783 				case DB_SUCCESS:
1784 					break;
1785 				default:
1786 					goto end_scan;
1787 				}
1788 			} else {
1789 				/* Found a matching record. Lock only
1790 				a record because we can allow inserts
1791 				into gaps */
1792 
1793 				err = row_ins_set_shared_rec_lock(
1794 					LOCK_REC_NOT_GAP, block,
1795 					rec, check_index, offsets, thr);
1796 
1797 				switch (err) {
1798 				case DB_SUCCESS_LOCKED_REC:
1799 				case DB_SUCCESS:
1800 					break;
1801 				default:
1802 					goto end_scan;
1803 				}
1804 
1805 				if (check_ref) {
1806 					err = DB_SUCCESS;
1807 
1808 					goto end_scan;
1809 				} else if (foreign->type != 0) {
1810 					/* There is an ON UPDATE or ON DELETE
1811 					condition: check them in a separate
1812 					function */
1813 
1814 					err = row_ins_foreign_check_on_constraint(
1815 						thr, foreign, &pcur, entry,
1816 						&mtr);
1817 					if (err != DB_SUCCESS) {
1818 						/* Since reporting a plain
1819 						"duplicate key" error
1820 						message to the user in
1821 						cases where a long CASCADE
1822 						operation would lead to a
1823 						duplicate key in some
1824 						other table is very
1825 						confusing, map duplicate
1826 						key errors resulting from
1827 						FK constraints to a
1828 						separate error code. */
1829 
1830 						if (err == DB_DUPLICATE_KEY) {
1831 							err = DB_FOREIGN_DUPLICATE_KEY;
1832 						}
1833 
1834 						goto end_scan;
1835 					}
1836 
1837 					/* row_ins_foreign_check_on_constraint
1838 					may have repositioned pcur on a
1839 					different block */
1840 					block = btr_pcur_get_block(&pcur);
1841 				} else {
1842 					row_ins_foreign_report_err(
1843 						"Trying to delete or update",
1844 						thr, foreign, rec, entry);
1845 
1846 					err = DB_ROW_IS_REFERENCED;
1847 					goto end_scan;
1848 				}
1849 			}
1850 		} else {
1851 			ut_a(cmp < 0);
1852 
1853 			err = DB_SUCCESS;
1854 
1855 			if (!skip_gap_lock) {
1856 				err = row_ins_set_shared_rec_lock(
1857 					LOCK_GAP, block,
1858 					rec, check_index, offsets, thr);
1859 			}
1860 
1861 			switch (err) {
1862 			case DB_SUCCESS_LOCKED_REC:
1863 			case DB_SUCCESS:
1864 				if (check_ref) {
1865 					err = DB_NO_REFERENCED_ROW;
1866 					row_ins_foreign_report_add_err(
1867 						trx, foreign, rec, entry);
1868 				} else {
1869 					err = DB_SUCCESS;
1870 				}
1871 			default:
1872 				break;
1873 			}
1874 
1875 			goto end_scan;
1876 		}
1877 	} while (btr_pcur_move_to_next(&pcur, &mtr));
1878 
1879 exit_loop:
1880 	if (check_ref) {
1881 		row_ins_foreign_report_add_err(
1882 			trx, foreign, btr_pcur_get_rec(&pcur), entry);
1883 		err = DB_NO_REFERENCED_ROW;
1884 	} else {
1885 		err = DB_SUCCESS;
1886 	}
1887 
1888 end_scan:
1889 	btr_pcur_close(&pcur);
1890 
1891 	mtr_commit(&mtr);
1892 
1893 	/* Restore old value */
1894 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1895 
1896 do_possible_lock_wait:
1897 	if (err == DB_LOCK_WAIT) {
1898 		/* An object that will correctly decrement the FK check counter
1899 		when it goes out of this scope. */
1900 		ib_dec_in_dtor	dec(check_table->n_foreign_key_checks_running);
1901 
1902 		trx->error_state = err;
1903 
1904 		que_thr_stop_for_mysql(thr);
1905 
1906 		thr->lock_state = QUE_THR_LOCK_ROW;
1907 
1908 		/* To avoid check_table being dropped, increment counter */
1909 		os_atomic_increment_ulint(
1910 			&check_table->n_foreign_key_checks_running, 1);
1911 
1912 		trx_kill_blocking(trx);
1913 
1914 		lock_wait_suspend_thread(thr);
1915 
1916 		thr->lock_state = QUE_THR_LOCK_NOLOCK;
1917 
1918 		if(trx->error_state != DB_SUCCESS) {
1919 		    err = trx->error_state;
1920 		    goto exit_func;
1921 		}
1922 
1923 		DBUG_PRINT("to_be_dropped",
1924 			   ("table: %s", check_table->name.m_name));
1925 		if (check_table->to_be_dropped) {
1926 			/* The table is being dropped. We shall timeout
1927 			this operation */
1928 			err = DB_LOCK_WAIT_TIMEOUT;
1929 
1930 			goto exit_func;
1931 		}
1932 	}
1933 
1934 
1935 exit_func:
1936 	if (heap != NULL) {
1937 		mem_heap_free(heap);
1938 	}
1939 
1940 	DEBUG_SYNC_C("finished_scanning_index");
1941 	DBUG_RETURN(err);
1942 }
1943 
1944 /***************************************************************//**
1945 Checks if foreign key constraints fail for an index entry. If index
1946 is not mentioned in any constraint, this function does nothing,
1947 Otherwise does searches to the indexes of referenced tables and
1948 sets shared locks which lock either the success or the failure of
1949 a constraint.
1950 @return DB_SUCCESS or error code */
1951 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1952 dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1953 row_ins_check_foreign_constraints(
1954 /*==============================*/
1955 	dict_table_t*	table,	/*!< in: table */
1956 	dict_index_t*	index,	/*!< in: index */
1957 	dtuple_t*	entry,	/*!< in: index entry for index */
1958 	que_thr_t*	thr)	/*!< in: query thread */
1959 {
1960 	dict_foreign_t*	foreign;
1961 	dberr_t		err;
1962 	trx_t*		trx;
1963 	ibool		got_s_lock	= FALSE;
1964 
1965 	trx = thr_get_trx(thr);
1966 
1967 	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1968 			    "foreign_constraint_check_for_ins");
1969 
1970 	for (dict_foreign_set::iterator it = table->foreign_set.begin();
1971 	     it != table->foreign_set.end();
1972 	     ++it) {
1973 
1974 		foreign = *it;
1975 
1976 		if (foreign->foreign_index == index) {
1977 			dict_table_t*	ref_table = NULL;
1978 			dict_table_t*   foreign_table = foreign->foreign_table;
1979 			dict_table_t*	referenced_table
1980 						= foreign->referenced_table;
1981 
1982 			if (referenced_table == NULL) {
1983 
1984 				ref_table = dict_table_open_on_name(
1985 					foreign->referenced_table_name_lookup,
1986 					FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1987 			}
1988 
1989 			if (0 == trx->dict_operation_lock_mode) {
1990 				got_s_lock = TRUE;
1991 
1992 				row_mysql_freeze_data_dictionary(trx);
1993 			}
1994 
1995                         if (referenced_table) {
1996 				os_atomic_increment_ulint(
1997 					&foreign_table->n_foreign_key_checks_running, 1);
1998                         }
1999 
2000 			/* NOTE that if the thread ends up waiting for a lock
2001 			we will release dict_operation_lock temporarily!
2002 			But the counter on the table protects the referenced
2003 			table from being dropped while the check is running. */
2004 
2005 			err = row_ins_check_foreign_constraint(
2006 				TRUE, foreign, table, entry, thr);
2007 
2008                         if (referenced_table) {
2009 				os_atomic_decrement_ulint(
2010 					&foreign_table->n_foreign_key_checks_running, 1);
2011                         }
2012 
2013 			if (got_s_lock) {
2014 				row_mysql_unfreeze_data_dictionary(trx);
2015 			}
2016 
2017 			if (ref_table != NULL) {
2018 				dict_table_close(ref_table, FALSE, FALSE);
2019 			}
2020 
2021 			if (err != DB_SUCCESS) {
2022 
2023 				return(err);
2024 			}
2025 		}
2026 	}
2027 
2028 	return(DB_SUCCESS);
2029 }
2030 
2031 /***************************************************************//**
2032 Checks if a unique key violation to rec would occur at the index entry
2033 insert.
2034 @return TRUE if error */
2035 static
2036 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)2037 row_ins_dupl_error_with_rec(
2038 /*========================*/
2039 	const rec_t*	rec,	/*!< in: user record; NOTE that we assume
2040 				that the caller already has a record lock on
2041 				the record! */
2042 	const dtuple_t*	entry,	/*!< in: entry to insert */
2043 	dict_index_t*	index,	/*!< in: index */
2044 	const ulint*	offsets)/*!< in: rec_get_offsets(rec, index) */
2045 {
2046 	ulint	matched_fields;
2047 	ulint	n_unique;
2048 	ulint	i;
2049 
2050 	ut_ad(rec_offs_validate(rec, index, offsets));
2051 
2052 	n_unique = dict_index_get_n_unique(index);
2053 
2054 	matched_fields = 0;
2055 
2056 	cmp_dtuple_rec_with_match(entry, rec, offsets, &matched_fields);
2057 
2058 	if (matched_fields < n_unique) {
2059 
2060 		return(FALSE);
2061 	}
2062 
2063 	/* In a unique secondary index we allow equal key values if they
2064 	contain SQL NULLs */
2065 
2066 	if (!dict_index_is_clust(index) && !index->nulls_equal) {
2067 
2068 		for (i = 0; i < n_unique; i++) {
2069 			if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
2070 
2071 				return(FALSE);
2072 			}
2073 		}
2074 	}
2075 
2076 	return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2077 }
2078 
2079 /***************************************************************//**
2080 Scans a unique non-clustered index at a given index entry to determine
2081 whether a uniqueness violation has occurred for the key value of the entry.
2082 Set shared locks on possible duplicate records.
2083 @return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
2084 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2085 dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)2086 row_ins_scan_sec_index_for_duplicate(
2087 /*=================================*/
2088 	ulint		flags,	/*!< in: undo logging and locking flags */
2089 	dict_index_t*	index,	/*!< in: non-clustered unique index */
2090 	dtuple_t*	entry,	/*!< in: index entry */
2091 	que_thr_t*	thr,	/*!< in: query thread */
2092 	bool		s_latch,/*!< in: whether index->lock is being held */
2093 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
2094 	mem_heap_t*	offsets_heap)
2095 				/*!< in/out: memory heap that can be emptied */
2096 {
2097 	ulint		n_unique;
2098 	int		cmp;
2099 	ulint		n_fields_cmp;
2100 	btr_pcur_t	pcur;
2101 	dberr_t		err		= DB_SUCCESS;
2102 	ulint		allow_duplicates;
2103 	ulint*		offsets		= NULL;
2104 	DBUG_ENTER("row_ins_scan_sec_index_for_duplicate");
2105 
2106 
2107 	ut_ad(s_latch == rw_lock_own_flagged(
2108 			&index->lock, RW_LOCK_FLAG_S | RW_LOCK_FLAG_SX));
2109 
2110 	n_unique = dict_index_get_n_unique(index);
2111 
2112 	/* If the secondary index is unique, but one of the fields in the
2113 	n_unique first fields is NULL, a unique key violation cannot occur,
2114 	since we define NULL != NULL in this case */
2115 
2116 	if (!index->nulls_equal) {
2117 		for (ulint i = 0; i < n_unique; i++) {
2118 			if (UNIV_SQL_NULL == dfield_get_len(
2119 					dtuple_get_nth_field(entry, i))) {
2120 
2121 				DBUG_RETURN(DB_SUCCESS);
2122 			}
2123 		}
2124 	}
2125 
2126 	/* Store old value on n_fields_cmp */
2127 
2128 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
2129 
2130 	dtuple_set_n_fields_cmp(entry, n_unique);
2131 
2132 	btr_pcur_open(index, entry, PAGE_CUR_GE,
2133 		      s_latch
2134 		      ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED
2135 		      : BTR_SEARCH_LEAF,
2136 		      &pcur, mtr);
2137 
2138 	allow_duplicates = thr_get_trx(thr)->duplicates;
2139 
2140 	/* Scan index records and check if there is a duplicate */
2141 
2142 	do {
2143 		const rec_t*		rec	= btr_pcur_get_rec(&pcur);
2144 		const buf_block_t*	block	= btr_pcur_get_block(&pcur);
2145 		const ulint		lock_type = LOCK_ORDINARY;
2146 
2147 		if (page_rec_is_infimum(rec)) {
2148 
2149 			continue;
2150 		}
2151 
2152 		offsets = rec_get_offsets(rec, index, offsets,
2153 					  ULINT_UNDEFINED, &offsets_heap);
2154 
2155 		if (flags & BTR_NO_LOCKING_FLAG) {
2156 			/* Set no locks when applying log
2157 			in online table rebuild. */
2158 		} else if (allow_duplicates) {
2159 
2160 			/* If the SQL-query will update or replace
2161 			duplicate key we will take X-lock for
2162 			duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2163 			INSERT ON DUPLICATE KEY UPDATE). */
2164 
2165 			err = row_ins_set_exclusive_rec_lock(
2166 				lock_type, block, rec, index, offsets, thr);
2167 		} else {
2168 
2169 			err = row_ins_set_shared_rec_lock(
2170 				lock_type, block, rec, index, offsets, thr);
2171 		}
2172 
2173 		switch (err) {
2174 		case DB_SUCCESS_LOCKED_REC:
2175 			err = DB_SUCCESS;
2176 		case DB_SUCCESS:
2177 			break;
2178 		default:
2179 			goto end_scan;
2180 		}
2181 
2182 		if (page_rec_is_supremum(rec)) {
2183 
2184 			continue;
2185 		}
2186 
2187 		cmp = cmp_dtuple_rec(entry, rec, offsets);
2188 
2189 		if (cmp == 0 && !index->allow_duplicates) {
2190 			if (row_ins_dupl_error_with_rec(rec, entry,
2191 							index, offsets)) {
2192 				err = DB_DUPLICATE_KEY;
2193 
2194 				thr_get_trx(thr)->error_index = index;
2195 
2196 				/* If the duplicate is on hidden FTS_DOC_ID,
2197 				state so in the error log */
2198 				if (index == index->table->fts_doc_id_index
2199 				    && DICT_TF2_FLAG_IS_SET(
2200 					index->table,
2201 					DICT_TF2_FTS_HAS_DOC_ID)) {
2202 
2203 					ib::error() << "Duplicate FTS_DOC_ID"
2204 						" value on table "
2205 						<< index->table->name;
2206 				}
2207 
2208 				goto end_scan;
2209 			}
2210 		} else {
2211 			ut_a(cmp < 0 || index->allow_duplicates);
2212 			goto end_scan;
2213 		}
2214 	} while (btr_pcur_move_to_next(&pcur, mtr));
2215 
2216 end_scan:
2217 	/* Restore old value */
2218 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2219 
2220 	DBUG_RETURN(err);
2221 }
2222 
2223 /** Checks for a duplicate when the table is being rebuilt online.
2224 @retval DB_SUCCESS when no duplicate is detected
2225 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2226 a newer version of entry (the entry should not be inserted)
2227 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2228 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2229 dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,ulint * offsets)2230 row_ins_duplicate_online(
2231 /*=====================*/
2232 	ulint		n_uniq,	/*!< in: offset of DB_TRX_ID */
2233 	const dtuple_t*	entry,	/*!< in: entry that is being inserted */
2234 	const rec_t*	rec,	/*!< in: clustered index record */
2235 	ulint*		offsets)/*!< in/out: rec_get_offsets(rec) */
2236 {
2237 	ulint	fields	= 0;
2238 
2239 	/* During rebuild, there should not be any delete-marked rows
2240 	in the new table. */
2241 	ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2242 	ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2243 
2244 	/* Compare the PRIMARY KEY fields and the
2245 	DB_TRX_ID, DB_ROLL_PTR. */
2246 	cmp_dtuple_rec_with_match_low(
2247 		entry, rec, offsets, n_uniq + 2, &fields);
2248 
2249 	if (fields < n_uniq) {
2250 		/* Not a duplicate. */
2251 		return(DB_SUCCESS);
2252 	}
2253 
2254 	if (fields == n_uniq + 2) {
2255 		/* rec is an exact match of entry. */
2256 		return(DB_SUCCESS_LOCKED_REC);
2257 	}
2258 
2259 	return(DB_DUPLICATE_KEY);
2260 }
2261 
2262 /** Checks for a duplicate when the table is being rebuilt online.
2263 @retval DB_SUCCESS when no duplicate is detected
2264 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2265 a newer version of entry (the entry should not be inserted)
2266 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2267 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2268 dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2269 row_ins_duplicate_error_in_clust_online(
2270 /*====================================*/
2271 	ulint		n_uniq,	/*!< in: offset of DB_TRX_ID */
2272 	const dtuple_t*	entry,	/*!< in: entry that is being inserted */
2273 	const btr_cur_t*cursor,	/*!< in: cursor on insert position */
2274 	ulint**		offsets,/*!< in/out: rec_get_offsets(rec) */
2275 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2276 {
2277 	dberr_t		err	= DB_SUCCESS;
2278 	const rec_t*	rec	= btr_cur_get_rec(cursor);
2279 
2280 	if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2281 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2282 					   ULINT_UNDEFINED, heap);
2283 		err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2284 		if (err != DB_SUCCESS) {
2285 			return(err);
2286 		}
2287 	}
2288 
2289 	rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2290 
2291 	if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2292 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2293 					   ULINT_UNDEFINED, heap);
2294 		err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2295 	}
2296 
2297 	return(err);
2298 }
2299 
2300 /***************************************************************//**
2301 Checks if a unique key violation error would occur at an index entry
2302 insert. Sets shared locks on possible duplicate records. Works only
2303 for a clustered index!
2304 @retval DB_SUCCESS if no error
2305 @retval DB_DUPLICATE_KEY if error,
2306 @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2307 record
2308 @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2309 in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2310 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2311 dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2312 row_ins_duplicate_error_in_clust(
2313 /*=============================*/
2314 	ulint		flags,	/*!< in: undo logging and locking flags */
2315 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
2316 	const dtuple_t*	entry,	/*!< in: entry to insert */
2317 	que_thr_t*	thr,	/*!< in: query thread */
2318 	mtr_t*		mtr)	/*!< in: mtr */
2319 {
2320 	dberr_t	err;
2321 	rec_t*	rec;
2322 	ulint	n_unique;
2323 	trx_t*	trx		= thr_get_trx(thr);
2324 	mem_heap_t*heap		= NULL;
2325 	ulint	offsets_[REC_OFFS_NORMAL_SIZE];
2326 	ulint*	offsets		= offsets_;
2327 	rec_offs_init(offsets_);
2328 
2329 	UT_NOT_USED(mtr);
2330 
2331 	ut_ad(dict_index_is_clust(cursor->index));
2332 
2333 	/* NOTE: For unique non-clustered indexes there may be any number
2334 	of delete marked records with the same value for the non-clustered
2335 	index key (remember multiversioning), and which differ only in
2336 	the row refererence part of the index record, containing the
2337 	clustered index key fields. For such a secondary index record,
2338 	to avoid race condition, we must FIRST do the insertion and after
2339 	that check that the uniqueness condition is not breached! */
2340 
2341 	/* NOTE: A problem is that in the B-tree node pointers on an
2342 	upper level may match more to the entry than the actual existing
2343 	user records on the leaf level. So, even if low_match would suggest
2344 	that a duplicate key violation may occur, this may not be the case. */
2345 
2346 	n_unique = dict_index_get_n_unique(cursor->index);
2347 
2348 	if (cursor->low_match >= n_unique) {
2349 
2350 		rec = btr_cur_get_rec(cursor);
2351 
2352 		if (!page_rec_is_infimum(rec)) {
2353 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2354 						  ULINT_UNDEFINED, &heap);
2355 
2356 			/* We set a lock on the possible duplicate: this
2357 			is needed in logical logging of MySQL to make
2358 			sure that in roll-forward we get the same duplicate
2359 			errors as in original execution */
2360 
2361 			if (flags & BTR_NO_LOCKING_FLAG) {
2362 				/* Do nothing if no-locking is set */
2363 				err = DB_SUCCESS;
2364 			} else if (trx->duplicates) {
2365 
2366 				/* If the SQL-query will update or replace
2367 				duplicate key we will take X-lock for
2368 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2369 				INSERT ON DUPLICATE KEY UPDATE). */
2370 
2371 				err = row_ins_set_exclusive_rec_lock(
2372 					LOCK_REC_NOT_GAP,
2373 					btr_cur_get_block(cursor),
2374 					rec, cursor->index, offsets, thr);
2375 			} else {
2376 
2377 				err = row_ins_set_shared_rec_lock(
2378 					LOCK_REC_NOT_GAP,
2379 					btr_cur_get_block(cursor), rec,
2380 					cursor->index, offsets, thr);
2381 			}
2382 
2383 			switch (err) {
2384 			case DB_SUCCESS_LOCKED_REC:
2385 			case DB_SUCCESS:
2386 				break;
2387 			default:
2388 				goto func_exit;
2389 			}
2390 
2391 			if (row_ins_dupl_error_with_rec(
2392 				    rec, entry, cursor->index, offsets)) {
2393 duplicate:
2394 				trx->error_index = cursor->index;
2395 				err = DB_DUPLICATE_KEY;
2396 				goto func_exit;
2397 			}
2398 		}
2399 	}
2400 
2401 	if (cursor->up_match >= n_unique) {
2402 
2403 		rec = page_rec_get_next(btr_cur_get_rec(cursor));
2404 
2405 		if (!page_rec_is_supremum(rec)) {
2406 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2407 						  ULINT_UNDEFINED, &heap);
2408 
2409 			if (trx->duplicates) {
2410 
2411 				/* If the SQL-query will update or replace
2412 				duplicate key we will take X-lock for
2413 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2414 				INSERT ON DUPLICATE KEY UPDATE). */
2415 
2416 				err = row_ins_set_exclusive_rec_lock(
2417 					LOCK_REC_NOT_GAP,
2418 					btr_cur_get_block(cursor),
2419 					rec, cursor->index, offsets, thr);
2420 			} else {
2421 
2422 				err = row_ins_set_shared_rec_lock(
2423 					LOCK_REC_NOT_GAP,
2424 					btr_cur_get_block(cursor),
2425 					rec, cursor->index, offsets, thr);
2426 			}
2427 
2428 			switch (err) {
2429 			case DB_SUCCESS_LOCKED_REC:
2430 			case DB_SUCCESS:
2431 				break;
2432 			default:
2433 				goto func_exit;
2434 			}
2435 
2436 			if (row_ins_dupl_error_with_rec(
2437 				    rec, entry, cursor->index, offsets)) {
2438 				goto duplicate;
2439 			}
2440 		}
2441 
2442 		/* This should never happen */
2443 		ut_error;
2444 	}
2445 
2446 	err = DB_SUCCESS;
2447 func_exit:
2448 	if (UNIV_LIKELY_NULL(heap)) {
2449 		mem_heap_free(heap);
2450 	}
2451 	return(err);
2452 }
2453 
2454 /***************************************************************//**
2455 Checks if an index entry has long enough common prefix with an
2456 existing record so that the intended insert of the entry must be
2457 changed to a modify of the existing record. In the case of a clustered
2458 index, the prefix must be n_unique fields long. In the case of a
2459 secondary index, all fields must be equal.  InnoDB never updates
2460 secondary index records in place, other than clearing or setting the
2461 delete-mark flag. We could be able to update the non-unique fields
2462 of a unique secondary index record by checking the cursor->up_match,
2463 but we do not do so, because it could have some locking implications.
2464 @return TRUE if the existing record should be updated; FALSE if not */
2465 UNIV_INLINE
2466 ibool
row_ins_must_modify_rec(const btr_cur_t * cursor)2467 row_ins_must_modify_rec(
2468 /*====================*/
2469 	const btr_cur_t*	cursor)	/*!< in: B-tree cursor */
2470 {
2471 	/* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2472 	Because node pointers on upper levels of the B-tree may match more
2473 	to entry than to actual user records on the leaf level, we
2474 	have to check if the candidate record is actually a user record.
2475 	A clustered index node pointer contains index->n_unique first fields,
2476 	and a secondary index node pointer contains all index fields. */
2477 
2478 	return(cursor->low_match
2479 	       >= dict_index_get_n_unique_in_tree(cursor->index)
2480 	       && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2481 }
2482 
2483 /***************************************************************//**
2484 Tries to insert an entry into a clustered index, ignoring foreign key
2485 constraints. If a record with the same unique key is found, the other
2486 record is necessarily marked deleted by a committed transaction, or a
2487 unique key violation error occurs. The delete marked record is then
2488 updated to an existing record, and we must write an undo log record on
2489 the delete marked record.
2490 @retval DB_SUCCESS on success
2491 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2492 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2493 @return error code */
2494 dberr_t
row_ins_clust_index_entry_low(ulint flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,ulint n_ext,que_thr_t * thr,bool dup_chk_only)2495 row_ins_clust_index_entry_low(
2496 /*==========================*/
2497 	ulint		flags,	/*!< in: undo logging and locking flags */
2498 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2499 				depending on whether we wish optimistic or
2500 				pessimistic descent down the index tree */
2501 	dict_index_t*	index,	/*!< in: clustered index */
2502 	ulint		n_uniq,	/*!< in: 0 or index->n_uniq */
2503 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2504 	ulint		n_ext,	/*!< in: number of externally stored columns */
2505 	que_thr_t*	thr,	/*!< in: query thread */
2506 	bool		dup_chk_only)
2507 				/*!< in: if true, just do duplicate check
2508 				and return. don't execute actual insert. */
2509 {
2510 	btr_pcur_t	pcur;
2511 	btr_cur_t*	cursor;
2512 	dberr_t		err		= DB_SUCCESS;
2513 	big_rec_t*	big_rec		= NULL;
2514 	mtr_t		mtr;
2515 	mem_heap_t*	offsets_heap	= NULL;
2516 	ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2517 	ulint*          offsets         = offsets_;
2518 	rec_offs_init(offsets_);
2519 
2520 	DBUG_ENTER("row_ins_clust_index_entry_low");
2521 
2522 	ut_ad(dict_index_is_clust(index));
2523 	ut_ad(!dict_index_is_unique(index)
2524 	      || n_uniq == dict_index_get_n_unique(index));
2525 	ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2526 	ut_ad(!thr_get_trx(thr)->in_rollback);
2527 
2528 	mtr_start(&mtr);
2529 	mtr.set_named_space(index->space);
2530 
2531 	if (dict_table_is_temporary(index->table)) {
2532 		/* Disable REDO logging as the lifetime of temp-tables is
2533 		limited to server or connection lifetime and so REDO
2534 		information is not needed on restart for recovery.
2535 		Disable locking as temp-tables are local to a connection. */
2536 
2537 		ut_ad(flags & BTR_NO_LOCKING_FLAG);
2538 		ut_ad(!dict_table_is_intrinsic(index->table)
2539 		      || (flags & BTR_NO_UNDO_LOG_FLAG));
2540 
2541 		mtr.set_log_mode(MTR_LOG_NO_REDO);
2542 	}
2543 
2544 	if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2545 		mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
2546 		mtr_s_lock(dict_index_get_lock(index), &mtr);
2547 	}
2548 
2549 	/* Note that we use PAGE_CUR_LE as the search mode, because then
2550 	the function will return in both low_match and up_match of the
2551 	cursor sensible values */
2552 	err = btr_pcur_open(index, entry, PAGE_CUR_LE, mode, &pcur, &mtr);
2553 
2554 	if (err != DB_SUCCESS) {
2555 		index->table->set_file_unreadable();
2556 		mtr.commit();
2557 		goto func_exit;
2558 	}
2559 
2560 	cursor = btr_pcur_get_btr_cur(&pcur);
2561 	cursor->thr = thr;
2562 
2563 	ut_ad(!dict_table_is_intrinsic(index->table)
2564 	      || cursor->page_cur.block->made_dirty_with_no_latch);
2565 
2566 #ifdef UNIV_DEBUG
2567 	{
2568 		page_t*	page = btr_cur_get_page(cursor);
2569 		rec_t*	first_rec = page_rec_get_next(
2570 			page_get_infimum_rec(page));
2571 
2572 		ut_ad(page_rec_is_supremum(first_rec)
2573 		      || rec_n_fields_is_sane(index, first_rec, entry));
2574 	}
2575 #endif /* UNIV_DEBUG */
2576 
2577 	/* Allowing duplicates in clustered index is currently enabled
2578 	only for intrinsic table and caller understand the limited
2579 	operation that can be done in this case. */
2580 	ut_ad(!index->allow_duplicates
2581 	      || (index->allow_duplicates
2582 		  && dict_table_is_intrinsic(index->table)));
2583 
2584 	if (!index->allow_duplicates
2585 	    && n_uniq
2586 	    && (cursor->up_match >= n_uniq || cursor->low_match >= n_uniq)) {
2587 
2588 		if (flags
2589 		    == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2590 			| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
2591 			/* Set no locks when applying log
2592 			in online table rebuild. Only check for duplicates. */
2593 			err = row_ins_duplicate_error_in_clust_online(
2594 				n_uniq, entry, cursor,
2595 				&offsets, &offsets_heap);
2596 
2597 			switch (err) {
2598 			case DB_SUCCESS:
2599 				break;
2600 			default:
2601 				ut_ad(0);
2602 				/* fall through */
2603 			case DB_SUCCESS_LOCKED_REC:
2604 			case DB_DUPLICATE_KEY:
2605 				thr_get_trx(thr)->error_index = cursor->index;
2606 			}
2607 		} else {
2608 			/* Note that the following may return also
2609 			DB_LOCK_WAIT */
2610 
2611 			err = row_ins_duplicate_error_in_clust(
2612 				flags, cursor, entry, thr, &mtr);
2613 		}
2614 
2615 		if (err != DB_SUCCESS) {
2616 err_exit:
2617 			mtr_commit(&mtr);
2618 			goto func_exit;
2619 		}
2620 	}
2621 
2622 	if (dup_chk_only) {
2623 		mtr_commit(&mtr);
2624 		goto func_exit;
2625 	}
2626 
2627 	/* Note: Allowing duplicates would qualify for modification of
2628 	an existing record as the new entry is exactly same as old entry.
2629 	Avoid this check if allow duplicates is enabled. */
2630 	if (!index->allow_duplicates && row_ins_must_modify_rec(cursor)) {
2631 		/* There is already an index entry with a long enough common
2632 		prefix, we must convert the insert into a modify of an
2633 		existing record */
2634 		mem_heap_t*	entry_heap	= mem_heap_create(1024);
2635 
2636 		/* If the existing record is being modified and the new record
2637 		doesn't fit the provided slot then existing record is added
2638 		to free list and new record is inserted. This also means
2639 		cursor that we have cached for SELECT is now invalid. */
2640 		if(index->last_sel_cur) {
2641 			index->last_sel_cur->invalid = true;
2642 		}
2643 
2644 		err = row_ins_clust_index_entry_by_modify(
2645 			&pcur, flags, mode, &offsets, &offsets_heap,
2646 			entry_heap, entry, thr, &mtr);
2647 
2648 		if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2649 			row_log_table_insert(btr_cur_get_rec(cursor), entry,
2650 					     index, offsets);
2651 		}
2652 
2653 		mtr_commit(&mtr);
2654 		mem_heap_free(entry_heap);
2655 	} else {
2656 		rec_t*	insert_rec;
2657 
2658 		if (mode != BTR_MODIFY_TREE) {
2659 			ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2660 			      == BTR_MODIFY_LEAF);
2661 			err = btr_cur_optimistic_insert(
2662 				flags, cursor, &offsets, &offsets_heap,
2663 				entry, &insert_rec, &big_rec,
2664 				n_ext, thr, &mtr);
2665 		} else {
2666 			if (buf_LRU_buf_pool_running_out()) {
2667 
2668 				err = DB_LOCK_TABLE_FULL;
2669 				goto err_exit;
2670 			}
2671 
2672 			DEBUG_SYNC_C("before_insert_pessimitic_row_ins_clust");
2673 
2674 			err = btr_cur_optimistic_insert(
2675 				flags, cursor,
2676 				&offsets, &offsets_heap,
2677 				entry, &insert_rec, &big_rec,
2678 				n_ext, thr, &mtr);
2679 
2680 			if (err == DB_FAIL) {
2681 				err = btr_cur_pessimistic_insert(
2682 					flags, cursor,
2683 					&offsets, &offsets_heap,
2684 					entry, &insert_rec, &big_rec,
2685 					n_ext, thr, &mtr);
2686 			}
2687 		}
2688 
2689 		if (big_rec != NULL) {
2690 			mtr_commit(&mtr);
2691 
2692 			/* Online table rebuild could read (and
2693 			ignore) the incomplete record at this point.
2694 			If online rebuild is in progress, the
2695 			row_ins_index_entry_big_rec() will write log. */
2696 
2697 			DBUG_EXECUTE_IF(
2698 				"row_ins_extern_checkpoint",
2699 				log_make_checkpoint_at(
2700 					LSN_MAX, TRUE););
2701 			err = row_ins_index_entry_big_rec(
2702 				entry, big_rec, offsets, &offsets_heap, index,
2703 				thr_get_trx(thr)->mysql_thd,
2704 				__FILE__, __LINE__);
2705 			dtuple_convert_back_big_rec(index, entry, big_rec);
2706 		} else {
2707 			if (err == DB_SUCCESS
2708 			    && dict_index_is_online_ddl(index)) {
2709 				row_log_table_insert(
2710 					insert_rec, entry, index, offsets);
2711 			}
2712 
2713 			mtr_commit(&mtr);
2714 		}
2715 	}
2716 
2717 func_exit:
2718 	if (offsets_heap != NULL) {
2719 		mem_heap_free(offsets_heap);
2720 	}
2721 
2722 	btr_pcur_close(&pcur);
2723 
2724 	DBUG_RETURN(err);
2725 }
2726 
2727 /** This is a specialized function meant for direct insertion to
2728 auto-generated clustered index based on cached position from
2729 last successful insert. To be used when data is sorted.
2730 
2731 @param[in]	mode	BTR_MODIFY_LEAF or BTR_MODIFY_TREE.
2732 			depending on whether we wish optimistic or
2733 			pessimistic descent down the index tree
2734 @param[in,out]	index	clustered index
2735 @param[in,out]	entry	index entry to insert
2736 @param[in]	thr	query thread
2737 
2738 @return error code */
2739 static
2740 dberr_t
row_ins_sorted_clust_index_entry(ulint mode,dict_index_t * index,dtuple_t * entry,ulint n_ext,que_thr_t * thr)2741 row_ins_sorted_clust_index_entry(
2742 	ulint		mode,
2743 	dict_index_t*	index,
2744 	dtuple_t*	entry,
2745 	ulint		n_ext,
2746 	que_thr_t*	thr)
2747 {
2748 	dberr_t		err;
2749 	mtr_t*		mtr;
2750 	const bool	commit_mtr	= mode == BTR_MODIFY_TREE;
2751 
2752 	mem_heap_t*	offsets_heap	= NULL;
2753 	ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2754 	ulint*          offsets         = offsets_;
2755 	rec_offs_init(offsets_);
2756 
2757 	DBUG_ENTER("row_ins_sorted_clust_index_entry");
2758 
2759 	ut_ad(index->last_ins_cur != NULL);
2760 	ut_ad(dict_index_is_clust(index));
2761 	ut_ad(dict_table_is_intrinsic(index->table));
2762 	ut_ad(dict_index_is_auto_gen_clust(index));
2763 
2764 	btr_cur_t	cursor;
2765 	cursor.thr = thr;
2766 	mtr = &index->last_ins_cur->mtr;
2767 
2768 	/* Search for position if tree needs to be split or if last position
2769 	is not cached. */
2770 	if (mode == BTR_MODIFY_TREE
2771 	    || index->last_ins_cur->rec == NULL
2772 	    || index->last_ins_cur->disable_caching) {
2773 
2774 		/* Commit the previous mtr. */
2775 		index->last_ins_cur->release();
2776 
2777 		mtr_start(mtr);
2778 		mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
2779 
2780 		btr_cur_search_to_nth_level_with_no_latch(
2781 			index, 0, entry, PAGE_CUR_LE, &cursor,
2782 			__FILE__, __LINE__, mtr);
2783 		ut_ad(cursor.page_cur.block != NULL);
2784 		ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2785 	} else {
2786 		cursor.index = index;
2787 
2788 		cursor.page_cur.index = index;
2789 
2790 		cursor.page_cur.rec = index->last_ins_cur->rec;
2791 
2792 		cursor.page_cur.block = index->last_ins_cur->block;
2793 	}
2794 
2795 	const ulint	flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
2796 
2797 	for (;;) {
2798 		rec_t*		insert_rec;
2799 		big_rec_t*	big_rec		= NULL;
2800 
2801 		if (mode != BTR_MODIFY_TREE) {
2802 			ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2803 				== BTR_MODIFY_LEAF);
2804 
2805 			err = btr_cur_optimistic_insert(
2806 				flags, &cursor, &offsets, &offsets_heap, entry,
2807 				&insert_rec, &big_rec, n_ext, thr, mtr);
2808 			if (err != DB_SUCCESS) {
2809 				break;
2810 			}
2811 		} else {
2812 			/* TODO: Check if this is needed for intrinsic table. */
2813 			if (buf_LRU_buf_pool_running_out()) {
2814 				err = DB_LOCK_TABLE_FULL;
2815 				break;
2816 			}
2817 
2818 			err = btr_cur_optimistic_insert(
2819 				flags, &cursor, &offsets, &offsets_heap, entry,
2820 				&insert_rec, &big_rec, n_ext, thr, mtr);
2821 
2822 			if (err == DB_FAIL) {
2823 				err = btr_cur_pessimistic_insert(
2824 					flags, &cursor, &offsets, &offsets_heap,
2825 					entry, &insert_rec, &big_rec, n_ext,
2826 					thr, mtr);
2827 			}
2828 		}
2829 
2830 		if (big_rec != NULL) {
2831 			/* If index involves big-record optimization is
2832 			turned-off. */
2833 			index->last_ins_cur->release();
2834 			index->last_ins_cur->disable_caching = true;
2835 
2836 			err = row_ins_index_entry_big_rec(
2837 				entry, big_rec, offsets, &offsets_heap, index,
2838 				thr_get_trx(thr)->mysql_thd, __FILE__, __LINE__);
2839 
2840 			dtuple_convert_back_big_rec(index, entry, big_rec);
2841 
2842 		} else if (err == DB_SUCCESS ) {
2843 			if (!commit_mtr
2844 			    && !index->last_ins_cur->disable_caching) {
2845 				index->last_ins_cur->rec = insert_rec;
2846 
2847 				index->last_ins_cur->block
2848 					= cursor.page_cur.block;
2849 			} else {
2850 				index->last_ins_cur->release();
2851 			}
2852 		}
2853 
2854 		break;
2855 	}
2856 
2857 	if (err != DB_SUCCESS) {
2858 		index->last_ins_cur->release();
2859 	}
2860 
2861 	if (offsets_heap != NULL) {
2862 		mem_heap_free(offsets_heap);
2863 	}
2864 
2865 	DBUG_RETURN(err);
2866 }
2867 
2868 /** Start a mini-transaction and check if the index will be dropped.
2869 @param[in,out]	mtr		mini-transaction
2870 @param[in,out]	index		secondary index
2871 @param[in]	check		whether to check
2872 @param[in]	search_mode	flags
2873 @return true if the index is to be dropped */
2874 static MY_ATTRIBUTE((warn_unused_result))
2875 bool
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2876 row_ins_sec_mtr_start_and_check_if_aborted(
2877 	mtr_t*		mtr,
2878 	dict_index_t*	index,
2879 	bool		check,
2880 	ulint		search_mode)
2881 {
2882 	ut_ad(!dict_index_is_clust(index));
2883 	ut_ad(mtr->is_named_space(index->space));
2884 
2885 	const mtr_log_t	log_mode = mtr->get_log_mode();
2886 
2887 	mtr_start(mtr);
2888 	mtr->set_named_space(index->space);
2889 	mtr->set_log_mode(log_mode);
2890 
2891 	if (!check) {
2892 		return(false);
2893 	}
2894 
2895 	if (search_mode & BTR_ALREADY_S_LATCHED) {
2896 		mtr_s_lock(dict_index_get_lock(index), mtr);
2897 	} else {
2898 		mtr_sx_lock(dict_index_get_lock(index), mtr);
2899 	}
2900 
2901 	switch (index->online_status) {
2902 	case ONLINE_INDEX_ABORTED:
2903 	case ONLINE_INDEX_ABORTED_DROPPED:
2904 		ut_ad(!index->is_committed());
2905 		return(true);
2906 	case ONLINE_INDEX_COMPLETE:
2907 		return(false);
2908 	case ONLINE_INDEX_CREATION:
2909 		break;
2910 	}
2911 
2912 	ut_error;
2913 	return(true);
2914 }
2915 
2916 /***************************************************************//**
2917 Tries to insert an entry into a secondary index. If a record with exactly the
2918 same fields is found, the other record is necessarily marked deleted.
2919 It is then unmarked. Otherwise, the entry is just inserted to the index.
2920 @retval DB_SUCCESS on success
2921 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2922 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2923 @return error code */
2924 dberr_t
row_ins_sec_index_entry_low(ulint flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr,bool dup_chk_only)2925 row_ins_sec_index_entry_low(
2926 /*========================*/
2927 	ulint		flags,	/*!< in: undo logging and locking flags */
2928 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2929 				depending on whether we wish optimistic or
2930 				pessimistic descent down the index tree */
2931 	dict_index_t*	index,	/*!< in: secondary index */
2932 	mem_heap_t*	offsets_heap,
2933 				/*!< in/out: memory heap that can be emptied */
2934 	mem_heap_t*	heap,	/*!< in/out: memory heap */
2935 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2936 	trx_id_t	trx_id,	/*!< in: PAGE_MAX_TRX_ID during
2937 				row_log_table_apply(), or 0 */
2938 	que_thr_t*	thr,	/*!< in: query thread */
2939 	bool		dup_chk_only)
2940 				/*!< in: if true, just do duplicate check
2941 				and return. don't execute actual insert. */
2942 {
2943 	DBUG_ENTER("row_ins_sec_index_entry_low");
2944 
2945 	btr_cur_t	cursor;
2946 	ulint		search_mode	= mode;
2947 	dberr_t		err		= DB_SUCCESS;
2948 	ulint		n_unique;
2949 	mtr_t		mtr;
2950 	ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2951 	ulint*          offsets         = offsets_;
2952 	rec_offs_init(offsets_);
2953 	rtr_info_t	rtr_info;
2954 
2955 	ut_ad(!dict_index_is_clust(index));
2956 	ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2957 
2958 	cursor.thr = thr;
2959 	cursor.rtr_info = NULL;
2960 	ut_ad(thr_get_trx(thr)->id != 0
2961 	      || dict_table_is_intrinsic(index->table));
2962 
2963 	mtr_start(&mtr);
2964 	mtr.set_named_space(index->space);
2965 
2966 	if (dict_table_is_temporary(index->table)) {
2967 		/* Disable REDO logging as the lifetime of temp-tables is
2968 		limited to server or connection lifetime and so REDO
2969 		information is not needed on restart for recovery.
2970 		Disable locking as temp-tables are local to a connection. */
2971 
2972 		ut_ad(flags & BTR_NO_LOCKING_FLAG);
2973 		ut_ad(!dict_table_is_intrinsic(index->table)
2974 		      || (flags & BTR_NO_UNDO_LOG_FLAG));
2975 
2976 		mtr.set_log_mode(MTR_LOG_NO_REDO);
2977 	} else if (!dict_index_is_spatial(index)) {
2978 		/* Enable insert buffering if it's neither temp-table
2979 		nor spatial index. */
2980 		search_mode |= BTR_INSERT;
2981 	}
2982 
2983 	/* Ensure that we acquire index->lock when inserting into an
2984 	index with index->online_status == ONLINE_INDEX_COMPLETE, but
2985 	could still be subject to rollback_inplace_alter_table().
2986 	This prevents a concurrent change of index->online_status.
2987 	The memory object cannot be freed as long as we have an open
2988 	reference to the table, or index->table->n_ref_count > 0. */
2989 	const bool	check = !index->is_committed();
2990 	if (check) {
2991 		DEBUG_SYNC_C("row_ins_sec_index_enter");
2992 		if (mode == BTR_MODIFY_LEAF) {
2993 			search_mode |= BTR_ALREADY_S_LATCHED;
2994 			mtr_s_lock(dict_index_get_lock(index), &mtr);
2995 		} else {
2996 			mtr_sx_lock(dict_index_get_lock(index), &mtr);
2997 		}
2998 
2999 		if (row_log_online_op_try(
3000 			    index, entry, thr_get_trx(thr)->id)) {
3001 			goto func_exit;
3002 		}
3003 	}
3004 
3005 	/* Note that we use PAGE_CUR_LE as the search mode, because then
3006 	the function will return in both low_match and up_match of the
3007 	cursor sensible values */
3008 
3009 	if (!thr_get_trx(thr)->check_unique_secondary) {
3010 		search_mode |= BTR_IGNORE_SEC_UNIQUE;
3011 	}
3012 
3013 	if (dict_index_is_spatial(index)) {
3014 		cursor.index = index;
3015 		rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
3016 		rtr_info_update_btr(&cursor, &rtr_info);
3017 
3018 		err = btr_cur_search_to_nth_level(
3019 			index, 0, entry, PAGE_CUR_RTREE_INSERT,
3020 			search_mode,
3021 			&cursor, 0, __FILE__, __LINE__, &mtr);
3022 
3023 		if (mode == BTR_MODIFY_LEAF && rtr_info.mbr_adj) {
3024 			mtr_commit(&mtr);
3025 			rtr_clean_rtr_info(&rtr_info, true);
3026 			rtr_init_rtr_info(&rtr_info, false, &cursor,
3027 					  index, false);
3028 			rtr_info_update_btr(&cursor, &rtr_info);
3029 			mtr_start(&mtr);
3030 			mtr.set_named_space(index->space);
3031 			search_mode &= ~BTR_MODIFY_LEAF;
3032 			search_mode |= BTR_MODIFY_TREE;
3033 			err = btr_cur_search_to_nth_level(
3034 				index, 0, entry, PAGE_CUR_RTREE_INSERT,
3035 				search_mode,
3036 				&cursor, 0, __FILE__, __LINE__, &mtr);
3037 			mode = BTR_MODIFY_TREE;
3038 		}
3039 
3040 		DBUG_EXECUTE_IF(
3041 			"rtree_test_check_count", {
3042 			goto func_exit;});
3043 
3044 	} else {
3045 		if (dict_table_is_intrinsic(index->table)) {
3046 			btr_cur_search_to_nth_level_with_no_latch(
3047 				index, 0, entry, PAGE_CUR_LE, &cursor,
3048 				__FILE__, __LINE__, &mtr);
3049 			ut_ad(cursor.page_cur.block != NULL);
3050 			ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
3051 		} else {
3052 			err = btr_cur_search_to_nth_level(
3053 				index, 0, entry, PAGE_CUR_LE,
3054 				search_mode,
3055 				&cursor, 0, __FILE__, __LINE__, &mtr);
3056 		}
3057 	}
3058 
3059 	if (err != DB_SUCCESS) {
3060 		if (err == DB_DECRYPTION_FAILED) {
3061 			ib::warn() << "Table is encrypted but encryption service or"
3062 				      " used key_id is not available. "
3063 				      " Can't continue reading table.";
3064 			index->table->set_file_unreadable();
3065 		}
3066 		goto func_exit;
3067 	}
3068 
3069 	if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
3070 		ut_ad(!dict_index_is_spatial(index));
3071 		/* The insert was buffered during the search: we are done */
3072 		goto func_exit;
3073 	}
3074 
3075 #ifdef UNIV_DEBUG
3076 	{
3077 		page_t*	page = btr_cur_get_page(&cursor);
3078 		rec_t*	first_rec = page_rec_get_next(
3079 			page_get_infimum_rec(page));
3080 
3081 		ut_ad(page_rec_is_supremum(first_rec)
3082 		      || rec_n_fields_is_sane(index, first_rec, entry));
3083 	}
3084 #endif /* UNIV_DEBUG */
3085 
3086 	n_unique = dict_index_get_n_unique(index);
3087 
3088 	if (dict_index_is_unique(index)
3089 	    && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
3090 		mtr_commit(&mtr);
3091 
3092 		DEBUG_SYNC_C("row_ins_sec_index_unique");
3093 
3094 		if (row_ins_sec_mtr_start_and_check_if_aborted(
3095 			    &mtr, index, check, search_mode)) {
3096 			goto func_exit;
3097 		}
3098 
3099 		err = row_ins_scan_sec_index_for_duplicate(
3100 			flags, index, entry, thr, check, &mtr, offsets_heap);
3101 
3102 		mtr_commit(&mtr);
3103 
3104 		switch (err) {
3105 		case DB_SUCCESS:
3106 			break;
3107 		case DB_DUPLICATE_KEY:
3108 			if (!index->is_committed()) {
3109 				ut_ad(!thr_get_trx(thr)
3110 				      ->dict_operation_lock_mode);
3111 				mutex_enter(&dict_sys->mutex);
3112 				dict_set_corrupted_index_cache_only(index);
3113 				mutex_exit(&dict_sys->mutex);
3114 				/* Do not return any error to the
3115 				caller. The duplicate will be reported
3116 				by ALTER TABLE or CREATE UNIQUE INDEX.
3117 				Unfortunately we cannot report the
3118 				duplicate key value to the DDL thread,
3119 				because the altered_table object is
3120 				private to its call stack. */
3121 				err = DB_SUCCESS;
3122 			}
3123 			/* fall through */
3124 		default:
3125 			if (dict_index_is_spatial(index)) {
3126 				rtr_clean_rtr_info(&rtr_info, true);
3127 			}
3128 			DBUG_RETURN(err);
3129 		}
3130 
3131 		if (row_ins_sec_mtr_start_and_check_if_aborted(
3132 			    &mtr, index, check, search_mode)) {
3133 			goto func_exit;
3134 		}
3135 
3136 		DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
3137 
3138 		/* We did not find a duplicate and we have now
3139 		locked with s-locks the necessary records to
3140 		prevent any insertion of a duplicate by another
3141 		transaction. Let us now reposition the cursor and
3142 		continue the insertion. */
3143 		if (dict_table_is_intrinsic(index->table)) {
3144 			btr_cur_search_to_nth_level_with_no_latch(
3145 				index, 0, entry, PAGE_CUR_LE, &cursor,
3146 				__FILE__, __LINE__, &mtr);
3147 			ut_ad(cursor.page_cur.block != NULL);
3148 			ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
3149 		} else {
3150 			btr_cur_search_to_nth_level(
3151 				index, 0, entry, PAGE_CUR_LE,
3152 				(search_mode
3153 				 & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE)),
3154 				&cursor, 0, __FILE__, __LINE__, &mtr);
3155 		}
3156 	}
3157 
3158 	if (dup_chk_only) {
3159 		goto func_exit;
3160 	}
3161 
3162 	if (row_ins_must_modify_rec(&cursor)) {
3163 		/* If the existing record is being modified and the new record
3164 		is doesn't fit the provided slot then existing record is added
3165 		to free list and new record is inserted. This also means
3166 		cursor that we have cached for SELECT is now invalid. */
3167 		if(index->last_sel_cur) {
3168 			index->last_sel_cur->invalid = true;
3169 		}
3170 
3171 		/* There is already an index entry with a long enough common
3172 		prefix, we must convert the insert into a modify of an
3173 		existing record */
3174 		offsets = rec_get_offsets(
3175 			btr_cur_get_rec(&cursor), index, offsets,
3176 			ULINT_UNDEFINED, &offsets_heap);
3177 
3178 		err = row_ins_sec_index_entry_by_modify(
3179 			flags, mode, &cursor, &offsets,
3180 			offsets_heap, heap, entry, thr, &mtr);
3181 
3182 		if (err == DB_SUCCESS && dict_index_is_spatial(index)
3183 		    && rtr_info.mbr_adj) {
3184 			err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3185 		}
3186 	} else {
3187 		rec_t*		insert_rec;
3188 		big_rec_t*	big_rec;
3189 
3190 		if (mode == BTR_MODIFY_LEAF) {
3191 			err = btr_cur_optimistic_insert(
3192 				flags, &cursor, &offsets, &offsets_heap,
3193 				entry, &insert_rec,
3194 				&big_rec, 0, thr, &mtr);
3195 			if (err == DB_SUCCESS
3196 			    && dict_index_is_spatial(index)
3197 			    && rtr_info.mbr_adj) {
3198 				err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3199 			}
3200 		} else {
3201 			ut_ad(mode == BTR_MODIFY_TREE);
3202 			if (buf_LRU_buf_pool_running_out()) {
3203 
3204 				err = DB_LOCK_TABLE_FULL;
3205 				goto func_exit;
3206 			}
3207 
3208 			err = btr_cur_optimistic_insert(
3209 				flags, &cursor,
3210 				&offsets, &offsets_heap,
3211 				entry, &insert_rec,
3212 				&big_rec, 0, thr, &mtr);
3213 			if (err == DB_FAIL) {
3214 				err = btr_cur_pessimistic_insert(
3215 					flags, &cursor,
3216 					&offsets, &offsets_heap,
3217 					entry, &insert_rec,
3218 					&big_rec, 0, thr, &mtr);
3219 			}
3220 			if (err == DB_SUCCESS
3221 				   && dict_index_is_spatial(index)
3222 				   && rtr_info.mbr_adj) {
3223 				err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3224 			}
3225 		}
3226 
3227 		if (err == DB_SUCCESS && trx_id) {
3228 			page_update_max_trx_id(
3229 				btr_cur_get_block(&cursor),
3230 				btr_cur_get_page_zip(&cursor),
3231 				trx_id, &mtr);
3232 		}
3233 
3234 		ut_ad(!big_rec);
3235 	}
3236 
3237 func_exit:
3238 	if (dict_index_is_spatial(index)) {
3239 		rtr_clean_rtr_info(&rtr_info, true);
3240 	}
3241 
3242 	mtr_commit(&mtr);
3243 	DBUG_RETURN(err);
3244 }
3245 
3246 /***************************************************************//**
3247 Tries to insert the externally stored fields (off-page columns)
3248 of a clustered index entry.
3249 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
3250 dberr_t
row_ins_index_entry_big_rec_func(const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,dict_index_t * index,const char * file,const void * thd,ulint line)3251 row_ins_index_entry_big_rec_func(
3252 /*=============================*/
3253 	const dtuple_t*		entry,	/*!< in/out: index entry to insert */
3254 	const big_rec_t*	big_rec,/*!< in: externally stored fields */
3255 	ulint*			offsets,/*!< in/out: rec offsets */
3256 	mem_heap_t**		heap,	/*!< in/out: memory heap */
3257 	dict_index_t*		index,	/*!< in: index */
3258 	const char*		file,	/*!< in: file name of caller */
3259 #ifndef NDEBUG
3260 	const void*		thd,    /*!< in: connection, or NULL */
3261 #endif /* NDEBUG */
3262 	ulint			line)	/*!< in: line number of caller */
3263 {
3264 	mtr_t		mtr;
3265 	btr_pcur_t	pcur;
3266 	rec_t*		rec;
3267 	dberr_t		error;
3268 
3269 	ut_ad(dict_index_is_clust(index));
3270 
3271 	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
3272 	DEBUG_SYNC_C("before_insertion_of_blob");
3273 
3274 	mtr_start(&mtr);
3275 	mtr.set_named_space(index->space);
3276 	dict_disable_redo_if_temporary(index->table, &mtr);
3277 
3278 	btr_pcur_open(index, entry, PAGE_CUR_LE, BTR_MODIFY_TREE,
3279 		      &pcur, &mtr);
3280 	rec = btr_pcur_get_rec(&pcur);
3281 	offsets = rec_get_offsets(rec, index, offsets,
3282 				  ULINT_UNDEFINED, heap);
3283 
3284 	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
3285 	error = btr_store_big_rec_extern_fields(
3286 		&pcur, 0, offsets, big_rec, &mtr, BTR_STORE_INSERT);
3287 	DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
3288 
3289 	if (error == DB_SUCCESS
3290 	    && dict_index_is_online_ddl(index)) {
3291 		row_log_table_insert(btr_pcur_get_rec(&pcur), entry,
3292 				     index, offsets);
3293 	}
3294 
3295 	mtr_commit(&mtr);
3296 
3297 	btr_pcur_close(&pcur);
3298 
3299 	return(error);
3300 }
3301 
3302 /***************************************************************//**
3303 Inserts an entry into a clustered index. Tries first optimistic,
3304 then pessimistic descent down the tree. If the entry matches enough
3305 to a delete marked record, performs the insert by updating or delete
3306 unmarking the delete marked record.
3307 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3308 dberr_t
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,ulint n_ext,bool dup_chk_only)3309 row_ins_clust_index_entry(
3310 /*======================*/
3311 	dict_index_t*	index,	/*!< in: clustered index */
3312 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
3313 	que_thr_t*	thr,	/*!< in: query thread */
3314 	ulint		n_ext,	/*!< in: number of externally stored columns */
3315 	bool		dup_chk_only)
3316 				/*!< in: if true, just do duplicate check
3317 				and return. don't execute actual insert. */
3318 {
3319 	dberr_t	err;
3320 	ulint	n_uniq;
3321 
3322 	DBUG_ENTER("row_ins_clust_index_entry");
3323 
3324 	if (!index->table->foreign_set.empty()) {
3325 		err = row_ins_check_foreign_constraints(
3326 			index->table, index, entry, thr);
3327 		if (err != DB_SUCCESS) {
3328 
3329 			DBUG_RETURN(err);
3330 		}
3331 	}
3332 
3333 	n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
3334 
3335 	/* Try first optimistic descent to the B-tree */
3336 	ulint	flags;
3337 
3338 	if (!dict_table_is_intrinsic(index->table)) {
3339 		log_free_check();
3340 		flags = dict_table_is_temporary(index->table)
3341 			? BTR_NO_LOCKING_FLAG
3342 			: 0;
3343 	} else {
3344 		flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3345 	}
3346 
3347 	if (dict_table_is_intrinsic(index->table)
3348 	    && dict_index_is_auto_gen_clust(index)) {
3349 
3350 		/* Check if the memory allocated for intrinsic cache*/
3351 		if(!index->last_ins_cur) {
3352 			dict_allocate_mem_intrinsic_cache(index);
3353 		}
3354 
3355 		err = row_ins_sorted_clust_index_entry(
3356 			BTR_MODIFY_LEAF, index, entry, n_ext, thr);
3357 	} else {
3358 		err = row_ins_clust_index_entry_low(
3359 			flags, BTR_MODIFY_LEAF, index, n_uniq, entry,
3360 			n_ext, thr, dup_chk_only);
3361 	}
3362 
3363 
3364 	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3365 			    "after_row_ins_clust_index_entry_leaf");
3366 
3367 	if (err != DB_FAIL) {
3368 		DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
3369 		DBUG_RETURN(err);
3370 	}
3371 
3372 	/* Try then pessimistic descent to the B-tree */
3373 	if (!dict_table_is_intrinsic(index->table)) {
3374 		log_free_check();
3375 	} else if(!index->last_sel_cur) {
3376 		dict_allocate_mem_intrinsic_cache(index);
3377 		index->last_sel_cur->invalid = true;
3378 	} else {
3379 		index->last_sel_cur->invalid = true;
3380 	}
3381 
3382 	if (dict_table_is_intrinsic(index->table)
3383 	    && dict_index_is_auto_gen_clust(index)) {
3384 		err = row_ins_sorted_clust_index_entry(
3385 			BTR_MODIFY_TREE, index, entry, n_ext, thr);
3386 	} else {
3387 		err = row_ins_clust_index_entry_low(
3388 			flags, BTR_MODIFY_TREE, index, n_uniq, entry,
3389 			n_ext, thr, dup_chk_only);
3390 	}
3391 
3392 	DBUG_RETURN(err);
3393 }
3394 
3395 /***************************************************************//**
3396 Inserts an entry into a secondary index. Tries first optimistic,
3397 then pessimistic descent down the tree. If the entry matches enough
3398 to a delete marked record, performs the insert by updating or delete
3399 unmarking the delete marked record.
3400 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3401 dberr_t
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)3402 row_ins_sec_index_entry(
3403 /*====================*/
3404 	dict_index_t*	index,	/*!< in: secondary index */
3405 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
3406 	que_thr_t*	thr,	/*!< in: query thread */
3407 	bool		dup_chk_only)
3408 				/*!< in: if true, just do duplicate check
3409 				and return. don't execute actual insert. */
3410 {
3411 	dberr_t		err;
3412 	mem_heap_t*	offsets_heap;
3413 	mem_heap_t*	heap;
3414 
3415 	DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
3416 			DBUG_SET("-d,row_ins_sec_index_entry_timeout");
3417 			return(DB_LOCK_WAIT);});
3418 
3419 	if (!index->table->foreign_set.empty()) {
3420 		err = row_ins_check_foreign_constraints(index->table, index,
3421 							entry, thr);
3422 		if (err != DB_SUCCESS) {
3423 
3424 			return(err);
3425 		}
3426 	}
3427 
3428 	ut_ad(thr_get_trx(thr)->id != 0
3429 	      || dict_table_is_intrinsic(index->table));
3430 
3431 	offsets_heap = mem_heap_create(1024);
3432 	heap = mem_heap_create(1024);
3433 
3434 	/* Try first optimistic descent to the B-tree */
3435 
3436 	ulint	flags;
3437 
3438 	if (!dict_table_is_intrinsic(index->table)) {
3439 		log_free_check();
3440 		flags = dict_table_is_temporary(index->table)
3441 			? BTR_NO_LOCKING_FLAG
3442 			: 0;
3443 	} else {
3444 		flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3445 	}
3446 
3447 	err = row_ins_sec_index_entry_low(
3448 		flags, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry,
3449 		0, thr, dup_chk_only);
3450 	if (err == DB_FAIL) {
3451 		mem_heap_empty(heap);
3452 
3453 		/* Try then pessimistic descent to the B-tree */
3454 
3455 		if (!dict_table_is_intrinsic(index->table)) {
3456 			log_free_check();
3457 		} else if(!index->last_sel_cur) {
3458 			dict_allocate_mem_intrinsic_cache(index);
3459 			index->last_sel_cur->invalid = true;
3460 		} else {
3461 			index->last_sel_cur->invalid = true;
3462 		}
3463 
3464 		err = row_ins_sec_index_entry_low(
3465 			flags, BTR_MODIFY_TREE, index,
3466 			offsets_heap, heap, entry, 0, thr,
3467 			dup_chk_only);
3468 	}
3469 
3470 	mem_heap_free(heap);
3471 	mem_heap_free(offsets_heap);
3472 	return(err);
3473 }
3474 
3475 /***************************************************************//**
3476 Inserts an index entry to index. Tries first optimistic, then pessimistic
3477 descent down the tree. If the entry matches enough to a delete marked record,
3478 performs the insert by updating or delete unmarking the delete marked
3479 record.
3480 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3481 static
3482 dberr_t
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)3483 row_ins_index_entry(
3484 /*================*/
3485 	dict_index_t*	index,	/*!< in: index */
3486 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
3487 	que_thr_t*	thr)	/*!< in: query thread */
3488 {
3489 	ut_ad(thr_get_trx(thr)->id != 0);
3490 
3491 	DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
3492 			DBUG_SET("-d,row_ins_index_entry_timeout");
3493 			return(DB_LOCK_WAIT);});
3494 
3495 	if (dict_index_is_clust(index)) {
3496 		return(row_ins_clust_index_entry(index, entry, thr, 0, false));
3497 	} else {
3498 		return(row_ins_sec_index_entry(index, entry, thr, false));
3499 	}
3500 }
3501 
3502 
3503 /*****************************************************************//**
3504 This function generate MBR (Minimum Bounding Box) for spatial objects
3505 and set it to spatial index field. */
3506 static
3507 void
row_ins_spatial_index_entry_set_mbr_field(dfield_t * field,const dfield_t * row_field)3508 row_ins_spatial_index_entry_set_mbr_field(
3509 /*======================================*/
3510 	dfield_t*	field,		/*!< in/out: mbr field */
3511 	const dfield_t*	row_field)	/*!< in: row field */
3512 {
3513 	uchar*		dptr = NULL;
3514 	ulint		dlen = 0;
3515 	double		mbr[SPDIMS * 2];
3516 
3517 	/* This must be a GEOMETRY datatype */
3518 	ut_ad(DATA_GEOMETRY_MTYPE(field->type.mtype));
3519 
3520 	dptr = static_cast<uchar*>(dfield_get_data(row_field));
3521 	dlen = dfield_get_len(row_field);
3522 
3523 	/* obtain the MBR */
3524 	rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
3525 			   static_cast<uint>(dlen - GEO_DATA_HEADER_SIZE),
3526 			   SPDIMS, mbr);
3527 
3528 	/* Set mbr as index entry data */
3529 	dfield_write_mbr(field, mbr);
3530 }
3531 
3532 /** Sets the values of the dtuple fields in entry from the values of appropriate
3533 columns in row.
3534 @param[in]	index	index handler
3535 @param[out]	entry	index entry to make
3536 @param[in]	row	row
3537 
3538 @return DB_SUCCESS if the set is successful */
3539 dberr_t
row_ins_index_entry_set_vals(const dict_index_t * index,dtuple_t * entry,const dtuple_t * row)3540 row_ins_index_entry_set_vals(
3541 	const dict_index_t*	index,
3542 	dtuple_t*		entry,
3543 	const dtuple_t*		row)
3544 {
3545 	ulint	n_fields;
3546 	ulint	i;
3547 	ulint	num_v = dtuple_get_n_v_fields(entry);
3548 
3549 	n_fields = dtuple_get_n_fields(entry);
3550 
3551 	for (i = 0; i < n_fields + num_v; i++) {
3552 		dict_field_t*	ind_field = NULL;
3553 		dfield_t*	field;
3554 		const dfield_t*	row_field;
3555 		ulint		len;
3556 		dict_col_t*	col;
3557 
3558 		if (i >= n_fields) {
3559 			/* This is virtual field */
3560 			field = dtuple_get_nth_v_field(entry, i - n_fields);
3561 			col = &dict_table_get_nth_v_col(
3562 				index->table, i - n_fields)->m_col;
3563 		} else {
3564 			field = dtuple_get_nth_field(entry, i);
3565 			ind_field = dict_index_get_nth_field(index, i);
3566 			col = ind_field->col;
3567 		}
3568 
3569 		if (dict_col_is_virtual(col)) {
3570 			const dict_v_col_t*     v_col
3571 				= reinterpret_cast<const dict_v_col_t*>(col);
3572 			ut_ad(dtuple_get_n_fields(row)
3573 			      == dict_table_get_n_cols(index->table));
3574 			row_field = dtuple_get_nth_v_field(row, v_col->v_pos);
3575 		} else {
3576 			row_field = dtuple_get_nth_field(
3577 				row, ind_field->col->ind);
3578 		}
3579 
3580 		len = dfield_get_len(row_field);
3581 
3582 		/* Check column prefix indexes */
3583 		if (ind_field != NULL && ind_field->prefix_len > 0
3584 		    && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3585 
3586 			const	dict_col_t*	col
3587 				= dict_field_get_col(ind_field);
3588 
3589 			len = dtype_get_at_most_n_mbchars(
3590 				col->prtype, col->mbminmaxlen,
3591 				ind_field->prefix_len,
3592 				len,
3593 				static_cast<const char*>(
3594 					dfield_get_data(row_field)));
3595 
3596 			ut_ad(!dfield_is_ext(row_field));
3597 		}
3598 
3599 		/* Handle spatial index. For the first field, replace
3600 		the data with its MBR (Minimum Bounding Box). */
3601 		if ((i == 0) && dict_index_is_spatial(index)) {
3602 			if (!row_field->data
3603 			    || row_field->len < GEO_DATA_HEADER_SIZE) {
3604 				return(DB_CANT_CREATE_GEOMETRY_OBJECT);
3605 			}
3606 			row_ins_spatial_index_entry_set_mbr_field(
3607 				field, row_field);
3608 			continue;
3609 		}
3610 
3611 		dfield_set_data(field, dfield_get_data(row_field), len);
3612 		if (dfield_is_ext(row_field)) {
3613 			ut_ad(dict_index_is_clust(index));
3614 			dfield_set_ext(field);
3615 		}
3616 	}
3617 
3618 	return(DB_SUCCESS);
3619 }
3620 
3621 /***********************************************************//**
3622 Inserts a single index entry to the table.
3623 @return DB_SUCCESS if operation successfully completed, else error
3624 code or DB_LOCK_WAIT */
3625 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3626 dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3627 row_ins_index_entry_step(
3628 /*=====================*/
3629 	ins_node_t*	node,	/*!< in: row insert node */
3630 	que_thr_t*	thr)	/*!< in: query thread */
3631 {
3632 	dberr_t	err;
3633 
3634 	DBUG_ENTER("row_ins_index_entry_step");
3635 
3636 	ut_ad(dtuple_check_typed(node->row));
3637 
3638 	err = row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3639 
3640 	if (err != DB_SUCCESS) {
3641 		DBUG_RETURN(err);
3642 	}
3643 
3644 	ut_ad(dtuple_check_typed(node->entry));
3645 
3646 	err = row_ins_index_entry(node->index, node->entry, thr);
3647 
3648 	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3649 			    "after_row_ins_index_entry_step");
3650 
3651 	DBUG_RETURN(err);
3652 }
3653 
3654 /***********************************************************//**
3655 Allocates a row id for row and inits the node->index field. */
3656 UNIV_INLINE
3657 void
row_ins_alloc_row_id_step(ins_node_t * node)3658 row_ins_alloc_row_id_step(
3659 /*======================*/
3660 	ins_node_t*	node)	/*!< in: row insert node */
3661 {
3662 	row_id_t	row_id;
3663 
3664 	ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3665 
3666 	if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3667 
3668 		/* No row id is stored if the clustered index is unique */
3669 
3670 		return;
3671 	}
3672 
3673 	/* Fill in row id value to row */
3674 
3675 	row_id = dict_sys_get_new_row_id();
3676 
3677 	dict_sys_write_row_id(node->row_id_buf, row_id);
3678 }
3679 
3680 /***********************************************************//**
3681 Gets a row to insert from the values list. */
3682 UNIV_INLINE
3683 void
row_ins_get_row_from_values(ins_node_t * node)3684 row_ins_get_row_from_values(
3685 /*========================*/
3686 	ins_node_t*	node)	/*!< in: row insert node */
3687 {
3688 	que_node_t*	list_node;
3689 	dfield_t*	dfield;
3690 	dtuple_t*	row;
3691 	ulint		i;
3692 
3693 	/* The field values are copied in the buffers of the select node and
3694 	it is safe to use them until we fetch from select again: therefore
3695 	we can just copy the pointers */
3696 
3697 	row = node->row;
3698 
3699 	i = 0;
3700 	list_node = node->values_list;
3701 
3702 	while (list_node) {
3703 		eval_exp(list_node);
3704 
3705 		dfield = dtuple_get_nth_field(row, i);
3706 		dfield_copy_data(dfield, que_node_get_val(list_node));
3707 
3708 		i++;
3709 		list_node = que_node_get_next(list_node);
3710 	}
3711 }
3712 
3713 /***********************************************************//**
3714 Gets a row to insert from the select list. */
3715 UNIV_INLINE
3716 void
row_ins_get_row_from_select(ins_node_t * node)3717 row_ins_get_row_from_select(
3718 /*========================*/
3719 	ins_node_t*	node)	/*!< in: row insert node */
3720 {
3721 	que_node_t*	list_node;
3722 	dfield_t*	dfield;
3723 	dtuple_t*	row;
3724 	ulint		i;
3725 
3726 	/* The field values are copied in the buffers of the select node and
3727 	it is safe to use them until we fetch from select again: therefore
3728 	we can just copy the pointers */
3729 
3730 	row = node->row;
3731 
3732 	i = 0;
3733 	list_node = node->select->select_list;
3734 
3735 	while (list_node) {
3736 		dfield = dtuple_get_nth_field(row, i);
3737 		dfield_copy_data(dfield, que_node_get_val(list_node));
3738 
3739 		i++;
3740 		list_node = que_node_get_next(list_node);
3741 	}
3742 }
3743 
3744 /***********************************************************//**
3745 Inserts a row to a table.
3746 @return DB_SUCCESS if operation successfully completed, else error
3747 code or DB_LOCK_WAIT */
3748 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3749 dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3750 row_ins(
3751 /*====*/
3752 	ins_node_t*	node,	/*!< in: row insert node */
3753 	que_thr_t*	thr)	/*!< in: query thread */
3754 {
3755 	dberr_t	err;
3756 
3757 	DBUG_ENTER("row_ins");
3758 
3759 	DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));
3760 
3761 	if (node->state == INS_NODE_ALLOC_ROW_ID) {
3762 
3763 		row_ins_alloc_row_id_step(node);
3764 
3765 		node->index = dict_table_get_first_index(node->table);
3766 		node->entry = UT_LIST_GET_FIRST(node->entry_list);
3767 
3768 		if (node->ins_type == INS_SEARCHED) {
3769 
3770 			row_ins_get_row_from_select(node);
3771 
3772 		} else if (node->ins_type == INS_VALUES) {
3773 
3774 			row_ins_get_row_from_values(node);
3775 		}
3776 
3777 		node->state = INS_NODE_INSERT_ENTRIES;
3778 	}
3779 
3780 	ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3781 
3782 	while (node->index != NULL) {
3783 		if (node->index->type != DICT_FTS) {
3784 			err = row_ins_index_entry_step(node, thr);
3785 			switch(err) {
3786 			case DB_SUCCESS:
3787 				break;
3788 			case DB_DUPLICATE_KEY:
3789 				thr_get_trx(thr)->error_state
3790 						= DB_DUPLICATE_KEY;
3791 				thr_get_trx(thr)->error_index
3792 						= node->index;
3793 			//fall through
3794 			default:
3795 				DBUG_RETURN(err);
3796 			}
3797 		}
3798 
3799 		node->index = dict_table_get_next_index(node->index);
3800 		node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3801 
3802 		DBUG_EXECUTE_IF(
3803 			"row_ins_skip_sec",
3804 			node->index = NULL; node->entry = NULL; break;);
3805 
3806 		/* Skip corrupted secondary index and its entry */
3807 		while (node->index && dict_index_is_corrupted(node->index)) {
3808 
3809 			node->index = dict_table_get_next_index(node->index);
3810 			node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3811 		}
3812 
3813 	}
3814 
3815 	ut_ad(node->entry == NULL);
3816 	thr_get_trx(thr)->error_index = NULL;
3817 	node->state = INS_NODE_ALLOC_ROW_ID;
3818 
3819 	DBUG_RETURN(DB_SUCCESS);
3820 }
3821 
3822 /***********************************************************//**
3823 Inserts a row to a table. This is a high-level function used in SQL execution
3824 graphs.
3825 @return query thread to run next or NULL */
3826 que_thr_t*
row_ins_step(que_thr_t * thr)3827 row_ins_step(
3828 /*=========*/
3829 	que_thr_t*	thr)	/*!< in: query thread */
3830 {
3831 	ins_node_t*	node;
3832 	que_node_t*	parent;
3833 	sel_node_t*	sel_node;
3834 	trx_t*		trx;
3835 	dberr_t		err;
3836 
3837 	ut_ad(thr);
3838 
3839 	DEBUG_SYNC_C("innodb_row_ins_step_enter");
3840 
3841 	trx = thr_get_trx(thr);
3842 
3843 	trx_start_if_not_started_xa(trx, true);
3844 
3845 	node = static_cast<ins_node_t*>(thr->run_node);
3846 
3847 	ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3848 	ut_ad(!dict_table_is_intrinsic(node->table));
3849 
3850 	parent = que_node_get_parent(node);
3851 	sel_node = node->select;
3852 
3853 	if (thr->prev_node == parent) {
3854 		node->state = INS_NODE_SET_IX_LOCK;
3855 	}
3856 
3857 	/* If this is the first time this node is executed (or when
3858 	execution resumes after wait for the table IX lock), set an
3859 	IX lock on the table and reset the possible select node. MySQL's
3860 	partitioned table code may also call an insert within the same
3861 	SQL statement AFTER it has used this table handle to do a search.
3862 	This happens, for example, when a row update moves it to another
3863 	partition. In that case, we have already set the IX lock on the
3864 	table during the search operation, and there is no need to set
3865 	it again here. But we must write trx->id to node->trx_id_buf. */
3866 
3867 	memset(node->trx_id_buf, 0, DATA_TRX_ID_LEN);
3868 	trx_write_trx_id(node->trx_id_buf, trx->id);
3869 
3870 	if (node->state == INS_NODE_SET_IX_LOCK) {
3871 
3872 		node->state = INS_NODE_ALLOC_ROW_ID;
3873 
3874 		/* It may be that the current session has not yet started
3875 		its transaction, or it has been committed: */
3876 
3877 		if (trx->id == node->trx_id) {
3878 			/* No need to do IX-locking */
3879 
3880 			goto same_trx;
3881 		}
3882 
3883 		err = lock_table(0, node->table, LOCK_IX, thr);
3884 
3885 		DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3886 				err = DB_LOCK_WAIT;);
3887 
3888 		if (err != DB_SUCCESS) {
3889 
3890 			goto error_handling;
3891 		}
3892 
3893 		node->trx_id = trx->id;
3894 same_trx:
3895 		if (node->ins_type == INS_SEARCHED) {
3896 			/* Reset the cursor */
3897 			sel_node->state = SEL_NODE_OPEN;
3898 
3899 			/* Fetch a row to insert */
3900 
3901 			thr->run_node = sel_node;
3902 
3903 			return(thr);
3904 		}
3905 	}
3906 
3907 	if ((node->ins_type == INS_SEARCHED)
3908 	    && (sel_node->state != SEL_NODE_FETCH)) {
3909 
3910 		ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3911 
3912 		/* No more rows to insert */
3913 		thr->run_node = parent;
3914 
3915 		return(thr);
3916 	}
3917 
3918 	/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3919 
3920 	err = row_ins(node, thr);
3921 
3922 error_handling:
3923 	trx->error_state = err;
3924 
3925 	if (err != DB_SUCCESS) {
3926 		/* err == DB_LOCK_WAIT or SQL error detected */
3927 		return(NULL);
3928 	}
3929 
3930 	/* DO THE TRIGGER ACTIONS HERE */
3931 
3932 	if (node->ins_type == INS_SEARCHED) {
3933 		/* Fetch a row to insert */
3934 
3935 		thr->run_node = sel_node;
3936 	} else {
3937 		thr->run_node = que_node_get_parent(node);
3938 	}
3939 
3940 	return(thr);
3941 }
3942