1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0ins.cc
29 Insert into a table
30 
31 Created 4/20/1996 Heikki Tuuri
32 *******************************************************/
33 
34 #include "row0ins.h"
35 
36 #ifdef UNIV_NONINL
37 #include "row0ins.ic"
38 #endif
39 
40 #include "ha_prototypes.h"
41 #include "dict0dict.h"
42 #include "dict0boot.h"
43 #include "trx0rec.h"
44 #include "trx0undo.h"
45 #include "btr0btr.h"
46 #include "btr0cur.h"
47 #include "mach0data.h"
48 #include "que0que.h"
49 #include "row0upd.h"
50 #include "row0sel.h"
51 #include "row0row.h"
52 #include "row0log.h"
53 #include "rem0cmp.h"
54 #include "lock0lock.h"
55 #include "log0log.h"
56 #include "eval0eval.h"
57 #include "data0data.h"
58 #include "usr0sess.h"
59 #include "buf0lru.h"
60 #include "fts0fts.h"
61 #include "fts0types.h"
62 #include "m_string.h"
63 
64 /*************************************************************************
65 IMPORTANT NOTE: Any operation that generates redo MUST check that there
66 is enough space in the redo log before for that operation. This is
67 done by calling log_free_check(). The reason for checking the
68 availability of the redo log space before the start of the operation is
69 that we MUST not hold any synchonization objects when performing the
70 check.
71 If you make a change in this module make sure that no codepath is
72 introduced where a call to log_free_check() is bypassed. */
73 
74 /*********************************************************************//**
75 Creates an insert node struct.
76 @return	own: insert node struct */
77 UNIV_INTERN
78 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)79 ins_node_create(
80 /*============*/
81 	ulint		ins_type,	/*!< in: INS_VALUES, ... */
82 	dict_table_t*	table,		/*!< in: table where to insert */
83 	mem_heap_t*	heap)		/*!< in: mem heap where created */
84 {
85 	ins_node_t*	node;
86 
87 	node = static_cast<ins_node_t*>(
88 		mem_heap_alloc(heap, sizeof(ins_node_t)));
89 
90 	node->common.type = QUE_NODE_INSERT;
91 
92 	node->ins_type = ins_type;
93 
94 	node->state = INS_NODE_SET_IX_LOCK;
95 	node->table = table;
96 	node->index = NULL;
97 	node->entry = NULL;
98 
99 	node->select = NULL;
100 
101 	node->trx_id = 0;
102 
103 	node->entry_sys_heap = mem_heap_create(128);
104 
105 	node->magic_n = INS_NODE_MAGIC_N;
106 
107 	return(node);
108 }
109 
110 /***********************************************************//**
111 Creates an entry template for each index of a table. */
112 static
113 void
ins_node_create_entry_list(ins_node_t * node)114 ins_node_create_entry_list(
115 /*=======================*/
116 	ins_node_t*	node)	/*!< in: row insert node */
117 {
118 	dict_index_t*	index;
119 	dtuple_t*	entry;
120 
121 	ut_ad(node->entry_sys_heap);
122 
123 	UT_LIST_INIT(node->entry_list);
124 
125 	/* We will include all indexes (include those corrupted
126 	secondary indexes) in the entry list. Filteration of
127 	these corrupted index will be done in row_ins() */
128 
129 	for (index = dict_table_get_first_index(node->table);
130 	     index != 0;
131 	     index = dict_table_get_next_index(index)) {
132 
133 		entry = row_build_index_entry(
134 			node->row, NULL, index, node->entry_sys_heap);
135 
136 		UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
137 	}
138 }
139 
140 /*****************************************************************//**
141 Adds system field buffers to a row. */
142 static
143 void
row_ins_alloc_sys_fields(ins_node_t * node)144 row_ins_alloc_sys_fields(
145 /*=====================*/
146 	ins_node_t*	node)	/*!< in: insert node */
147 {
148 	dtuple_t*		row;
149 	dict_table_t*		table;
150 	mem_heap_t*		heap;
151 	const dict_col_t*	col;
152 	dfield_t*		dfield;
153 	byte*			ptr;
154 
155 	row = node->row;
156 	table = node->table;
157 	heap = node->entry_sys_heap;
158 
159 	ut_ad(row && table && heap);
160 	ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
161 
162 	/* allocate buffer to hold the needed system created hidden columns. */
163 	uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
164 	ptr = static_cast<byte*>(mem_heap_zalloc(heap, len));
165 
166 	/* 1. Populate row-id */
167 	col = dict_table_get_sys_col(table, DATA_ROW_ID);
168 
169 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
170 
171 	dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
172 
173 	node->row_id_buf = ptr;
174 
175 	ptr += DATA_ROW_ID_LEN;
176 
177 	/* 2. Populate trx id */
178 	col = dict_table_get_sys_col(table, DATA_TRX_ID);
179 
180 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
181 
182 	dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
183 
184 	node->trx_id_buf = ptr;
185 
186 	ptr += DATA_TRX_ID_LEN;
187 
188 	/* 3. Populate roll ptr */
189 
190 	col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
191 
192 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
193 
194 	dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
195 }
196 
197 /*********************************************************************//**
198 Sets a new row to insert for an INS_DIRECT node. This function is only used
199 if we have constructed the row separately, which is a rare case; this
200 function is quite slow. */
201 UNIV_INTERN
202 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)203 ins_node_set_new_row(
204 /*=================*/
205 	ins_node_t*	node,	/*!< in: insert node */
206 	dtuple_t*	row)	/*!< in: new row (or first row) for the node */
207 {
208 	node->state = INS_NODE_SET_IX_LOCK;
209 	node->index = NULL;
210 	node->entry = NULL;
211 
212 	node->row = row;
213 
214 	mem_heap_empty(node->entry_sys_heap);
215 
216 	/* Create templates for index entries */
217 
218 	ins_node_create_entry_list(node);
219 
220 	/* Allocate from entry_sys_heap buffers for sys fields */
221 
222 	row_ins_alloc_sys_fields(node);
223 
224 	/* As we allocated a new trx id buf, the trx id should be written
225 	there again: */
226 
227 	node->trx_id = 0;
228 }
229 
230 /*******************************************************************//**
231 Does an insert operation by updating a delete-marked existing record
232 in the index. This situation can occur if the delete-marked record is
233 kept in the index for consistent reads.
234 @return	DB_SUCCESS or error code */
235 static MY_ATTRIBUTE((nonnull, warn_unused_result))
236 dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)237 row_ins_sec_index_entry_by_modify(
238 /*==============================*/
239 	ulint		flags,	/*!< in: undo logging and locking flags */
240 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
241 				depending on whether mtr holds just a leaf
242 				latch or also a tree latch */
243 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
244 	ulint**		offsets,/*!< in/out: offsets on cursor->page_cur.rec */
245 	mem_heap_t*	offsets_heap,
246 				/*!< in/out: memory heap that can be emptied */
247 	mem_heap_t*	heap,	/*!< in/out: memory heap */
248 	const dtuple_t*	entry,	/*!< in: index entry to insert */
249 	que_thr_t*	thr,	/*!< in: query thread */
250 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
251 				latching any further pages */
252 {
253 	big_rec_t*	dummy_big_rec;
254 	upd_t*		update;
255 	rec_t*		rec;
256 	dberr_t		err;
257 
258 	rec = btr_cur_get_rec(cursor);
259 
260 	ut_ad(!dict_index_is_clust(cursor->index));
261 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
262 	ut_ad(!entry->info_bits);
263 
264 	/* We know that in the alphabetical ordering, entry and rec are
265 	identified. But in their binary form there may be differences if
266 	there are char fields in them. Therefore we have to calculate the
267 	difference. */
268 
269 	update = row_upd_build_sec_rec_difference_binary(
270 		rec, cursor->index, *offsets, entry, heap);
271 
272 	/* If operating in fake_change mode then flow will not mark the record
273 	deleted but will still assume it and take delete-mark path. Condition
274 	below has a different path if record is not marked deleted but we need
275 	to still by-pass it given that original flow has taken this path for
276 	fake_change mode execution assuming record is delete-marked. */
277 	if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))
278 	    && UNIV_UNLIKELY(!thr_get_trx(thr)->fake_changes)) {
279 		/* We should never insert in place of a record that
280 		has not been delete-marked. The only exception is when
281 		online CREATE INDEX copied the changes that we already
282 		made to the clustered index, and completed the
283 		secondary index creation before we got here. In this
284 		case, the change would already be there. The CREATE
285 		INDEX should be waiting for a MySQL meta-data lock
286 		upgrade at least until this INSERT or UPDATE
287 		returns. After that point, the TEMP_INDEX_PREFIX
288 		would be dropped from the index name in
289 		commit_inplace_alter_table(). */
290 		ut_a(update->n_fields == 0);
291 		ut_a(*cursor->index->name == TEMP_INDEX_PREFIX);
292 		ut_ad(!dict_index_is_online_ddl(cursor->index));
293 		return(DB_SUCCESS);
294 	}
295 
296 	if (mode == BTR_MODIFY_LEAF) {
297 		/* Try an optimistic updating of the record, keeping changes
298 		within the page */
299 
300 		/* TODO: pass only *offsets */
301 		err = btr_cur_optimistic_update(
302 			flags | BTR_KEEP_SYS_FLAG, cursor,
303 			offsets, &offsets_heap, update, 0, thr,
304 			thr_get_trx(thr)->id, mtr);
305 		switch (err) {
306 		case DB_OVERFLOW:
307 		case DB_UNDERFLOW:
308 		case DB_ZIP_OVERFLOW:
309 			err = DB_FAIL;
310 		default:
311 			break;
312 		}
313 	} else {
314 		ut_a(mode == BTR_MODIFY_TREE);
315 		if (buf_LRU_buf_pool_running_out()) {
316 
317 			return(DB_LOCK_TABLE_FULL);
318 		}
319 
320 		err = btr_cur_pessimistic_update(
321 			flags | BTR_KEEP_SYS_FLAG, cursor,
322 			offsets, &offsets_heap,
323 			heap, &dummy_big_rec, update, 0,
324 			thr, thr_get_trx(thr)->id, mtr);
325 		ut_ad(!dummy_big_rec);
326 	}
327 
328 	return(err);
329 }
330 
331 /*******************************************************************//**
332 Does an insert operation by delete unmarking and updating a delete marked
333 existing record in the index. This situation can occur if the delete marked
334 record is kept in the index for consistent reads.
335 @return	DB_SUCCESS, DB_FAIL, or error code */
336 static MY_ATTRIBUTE((nonnull, warn_unused_result))
337 dberr_t
row_ins_clust_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,big_rec_t ** big_rec,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)338 row_ins_clust_index_entry_by_modify(
339 /*================================*/
340 	ulint		flags,	/*!< in: undo logging and locking flags */
341 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
342 				depending on whether mtr holds just a leaf
343 				latch or also a tree latch */
344 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
345 	ulint**		offsets,/*!< out: offsets on cursor->page_cur.rec */
346 	mem_heap_t**	offsets_heap,
347 				/*!< in/out: pointer to memory heap that can
348 				be emptied, or NULL */
349 	mem_heap_t*	heap,	/*!< in/out: memory heap */
350 	big_rec_t**	big_rec,/*!< out: possible big rec vector of fields
351 				which have to be stored externally by the
352 				caller */
353 	const dtuple_t*	entry,	/*!< in: index entry to insert */
354 	que_thr_t*	thr,	/*!< in: query thread */
355 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
356 				latching any further pages */
357 {
358 	const rec_t*	rec;
359 	const upd_t*	update;
360 	dberr_t		err;
361 
362 	ut_ad(dict_index_is_clust(cursor->index));
363 
364 	*big_rec = NULL;
365 
366 	rec = btr_cur_get_rec(cursor);
367 
368 	ut_ad(rec_get_deleted_flag(rec,
369 				   dict_table_is_comp(cursor->index->table)));
370 
371 	/* Build an update vector containing all the fields to be modified;
372 	NOTE that this vector may NOT contain system columns trx_id or
373 	roll_ptr */
374 
375 	update = row_upd_build_difference_binary(
376 		cursor->index, entry, rec, NULL, true,
377 		thr_get_trx(thr), heap);
378 	if (mode != BTR_MODIFY_TREE) {
379 		ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
380 
381 		/* Try optimistic updating of the record, keeping changes
382 		within the page */
383 
384 		err = btr_cur_optimistic_update(
385 			flags, cursor, offsets, offsets_heap, update, 0, thr,
386 			thr_get_trx(thr)->id, mtr);
387 		switch (err) {
388 		case DB_OVERFLOW:
389 		case DB_UNDERFLOW:
390 		case DB_ZIP_OVERFLOW:
391 			err = DB_FAIL;
392 		default:
393 			break;
394 		}
395 	} else {
396 		if (buf_LRU_buf_pool_running_out()) {
397 
398 			return(DB_LOCK_TABLE_FULL);
399 
400 		}
401 		err = btr_cur_pessimistic_update(
402 			flags | BTR_KEEP_POS_FLAG,
403 			cursor, offsets, offsets_heap, heap,
404 			big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
405 	}
406 
407 	return(err);
408 }
409 
410 /*********************************************************************//**
411 Returns TRUE if in a cascaded update/delete an ancestor node of node
412 updates (not DELETE, but UPDATE) table.
413 @return	TRUE if an ancestor updates table */
414 static
415 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)416 row_ins_cascade_ancestor_updates_table(
417 /*===================================*/
418 	que_node_t*	node,	/*!< in: node in a query graph */
419 	dict_table_t*	table)	/*!< in: table */
420 {
421 	que_node_t*	parent;
422 
423 	for (parent = que_node_get_parent(node);
424 	     que_node_get_type(parent) == QUE_NODE_UPDATE;
425 	     parent = que_node_get_parent(parent)) {
426 
427 		upd_node_t*	upd_node;
428 
429 		upd_node = static_cast<upd_node_t*>(parent);
430 
431 		if (upd_node->table == table && upd_node->is_delete == FALSE) {
432 
433 			return(TRUE);
434 		}
435 	}
436 
437 	return(FALSE);
438 }
439 
440 /*********************************************************************//**
441 Returns the number of ancestor UPDATE or DELETE nodes of a
442 cascaded update/delete node.
443 @return	number of ancestors */
444 static MY_ATTRIBUTE((nonnull, warn_unused_result))
445 ulint
row_ins_cascade_n_ancestors(que_node_t * node)446 row_ins_cascade_n_ancestors(
447 /*========================*/
448 	que_node_t*	node)	/*!< in: node in a query graph */
449 {
450 	que_node_t*	parent;
451 	ulint		n_ancestors = 0;
452 
453 	for (parent = que_node_get_parent(node);
454 	     que_node_get_type(parent) == QUE_NODE_UPDATE;
455 	     parent = que_node_get_parent(parent)) {
456 
457 		n_ancestors++;
458 	}
459 
460 	return(n_ancestors);
461 }
462 
463 /******************************************************************//**
464 Calculates the update vector node->cascade->update for a child table in
465 a cascaded update.
466 @return number of fields in the calculated update vector; the value
467 can also be 0 if no foreign key fields changed; the returned value is
468 ULINT_UNDEFINED if the column type in the child table is too short to
469 fit the new value in the parent table: that means the update fails */
470 static MY_ATTRIBUTE((nonnull, warn_unused_result))
471 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)472 row_ins_cascade_calc_update_vec(
473 /*============================*/
474 	upd_node_t*	node,		/*!< in: update node of the parent
475 					table */
476 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
477 					type is != 0 */
478 	mem_heap_t*	heap,		/*!< in: memory heap to use as
479 					temporary storage */
480 	trx_t*		trx,		/*!< in: update transaction */
481 	ibool*		fts_col_affected)/*!< out: is FTS column affected */
482 {
483 	upd_node_t*	cascade		= node->cascade_node;
484 	dict_table_t*	table		= foreign->foreign_table;
485 	dict_index_t*	index		= foreign->foreign_index;
486 	upd_t*		update;
487 	dict_table_t*	parent_table;
488 	dict_index_t*	parent_index;
489 	upd_t*		parent_update;
490 	ulint		n_fields_updated;
491 	ulint		parent_field_no;
492 	ulint		i;
493 	ulint		j;
494 	ibool		doc_id_updated = FALSE;
495 	ulint		doc_id_pos = 0;
496 	doc_id_t	new_doc_id = FTS_NULL_DOC_ID;
497 
498 	ut_a(node);
499 	ut_a(foreign);
500 	ut_a(cascade);
501 	ut_a(table);
502 	ut_a(index);
503 
504 	/* Calculate the appropriate update vector which will set the fields
505 	in the child index record to the same value (possibly padded with
506 	spaces if the column is a fixed length CHAR or FIXBINARY column) as
507 	the referenced index record will get in the update. */
508 
509 	parent_table = node->table;
510 	ut_a(parent_table == foreign->referenced_table);
511 	parent_index = foreign->referenced_index;
512 	parent_update = node->update;
513 
514 	update = cascade->update;
515 
516 	update->info_bits = 0;
517 	update->n_fields = foreign->n_fields;
518 
519 	n_fields_updated = 0;
520 
521 	*fts_col_affected = FALSE;
522 
523 	if (table->fts) {
524 		doc_id_pos = dict_table_get_nth_col_pos(
525 			table, table->fts->doc_col);
526 	}
527 
528 	for (i = 0; i < foreign->n_fields; i++) {
529 
530 		parent_field_no = dict_table_get_nth_col_pos(
531 			parent_table,
532 			dict_index_get_nth_col_no(parent_index, i));
533 
534 		for (j = 0; j < parent_update->n_fields; j++) {
535 			const upd_field_t*	parent_ufield
536 				= &parent_update->fields[j];
537 
538 			if (parent_ufield->field_no == parent_field_no) {
539 
540 				ulint			min_size;
541 				const dict_col_t*	col;
542 				ulint			ufield_len;
543 				upd_field_t*		ufield;
544 
545 				col = dict_index_get_nth_col(index, i);
546 
547 				/* A field in the parent index record is
548 				updated. Let us make the update vector
549 				field for the child table. */
550 
551 				ufield = update->fields + n_fields_updated;
552 
553 				ufield->field_no
554 					= dict_table_get_nth_col_pos(
555 					table, dict_col_get_no(col));
556 
557 				ufield->orig_len = 0;
558 				ufield->exp = NULL;
559 
560 				ufield->new_val = parent_ufield->new_val;
561 				ufield_len = dfield_get_len(&ufield->new_val);
562 
563 				/* Clear the "external storage" flag */
564 				dfield_set_len(&ufield->new_val, ufield_len);
565 
566 				/* Do not allow a NOT NULL column to be
567 				updated as NULL */
568 
569 				if (dfield_is_null(&ufield->new_val)
570 				    && (col->prtype & DATA_NOT_NULL)) {
571 
572 					return(ULINT_UNDEFINED);
573 				}
574 
575 				/* If the new value would not fit in the
576 				column, do not allow the update */
577 
578 				if (!dfield_is_null(&ufield->new_val)
579 				    && dtype_get_at_most_n_mbchars(
580 					col->prtype, col->mbminmaxlen,
581 					col->len,
582 					ufield_len,
583 					static_cast<char*>(
584 						dfield_get_data(
585 							&ufield->new_val)))
586 				    < ufield_len) {
587 
588 					return(ULINT_UNDEFINED);
589 				}
590 
591 				/* If the parent column type has a different
592 				length than the child column type, we may
593 				need to pad with spaces the new value of the
594 				child column */
595 
596 				min_size = dict_col_get_min_size(col);
597 
598 				/* Because UNIV_SQL_NULL (the marker
599 				of SQL NULL values) exceeds all possible
600 				values of min_size, the test below will
601 				not hold for SQL NULL columns. */
602 
603 				if (min_size > ufield_len) {
604 
605 					byte*	pad;
606 					ulint	pad_len;
607 					byte*	padded_data;
608 					ulint	mbminlen;
609 
610 					padded_data = static_cast<byte*>(
611 						mem_heap_alloc(
612 							heap, min_size));
613 
614 					pad = padded_data + ufield_len;
615 					pad_len = min_size - ufield_len;
616 
617 					memcpy(padded_data,
618 					       dfield_get_data(&ufield
619 							       ->new_val),
620 					       ufield_len);
621 
622 					mbminlen = dict_col_get_mbminlen(col);
623 
624 					ut_ad(!(ufield_len % mbminlen));
625 					ut_ad(!(min_size % mbminlen));
626 
627 					if (mbminlen == 1
628 					    && dtype_get_charset_coll(
629 						    col->prtype)
630 					    == DATA_MYSQL_BINARY_CHARSET_COLL) {
631 						/* Do not pad BINARY columns */
632 						return(ULINT_UNDEFINED);
633 					}
634 
635 					row_mysql_pad_col(mbminlen,
636 							  pad, pad_len);
637 					dfield_set_data(&ufield->new_val,
638 							padded_data, min_size);
639 				}
640 
641 				/* Check whether the current column has
642 				FTS index on it */
643 				if (table->fts
644 				    && dict_table_is_fts_column(
645 					table->fts->indexes,
646 					dict_col_get_no(col))
647 					!= ULINT_UNDEFINED) {
648 					*fts_col_affected = TRUE;
649 				}
650 
651 				/* If Doc ID is updated, check whether the
652 				Doc ID is valid */
653 				if (table->fts
654 				    && ufield->field_no == doc_id_pos) {
655 					doc_id_t	n_doc_id;
656 
657 					n_doc_id =
658 						table->fts->cache->next_doc_id;
659 
660 					new_doc_id = fts_read_doc_id(
661 						static_cast<const byte*>(
662 							dfield_get_data(
663 							&ufield->new_val)));
664 
665 					if (new_doc_id <= 0) {
666 						fprintf(stderr,
667 							"InnoDB: FTS Doc ID "
668 							"must be larger than "
669 							"0 \n");
670 						return(ULINT_UNDEFINED);
671 					}
672 
673 					if (new_doc_id < n_doc_id) {
674 						fprintf(stderr,
675 						       "InnoDB: FTS Doc ID "
676 						       "must be larger than "
677 						       IB_ID_FMT" for table",
678 						       n_doc_id -1);
679 
680 						ut_print_name(stderr, trx,
681 							      TRUE,
682 							      table->name);
683 
684 						putc('\n', stderr);
685 						return(ULINT_UNDEFINED);
686 					}
687 
688 					*fts_col_affected = TRUE;
689 					doc_id_updated = TRUE;
690 				}
691 
692 				n_fields_updated++;
693 			}
694 		}
695 	}
696 
697 	/* Generate a new Doc ID if FTS index columns get updated */
698 	if (table->fts && *fts_col_affected) {
699 		if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
700 			doc_id_t	doc_id;
701                         upd_field_t*	ufield;
702 
703 			ut_ad(!doc_id_updated);
704 			ufield = update->fields + n_fields_updated;
705 			fts_get_next_doc_id(table, &trx->fts_next_doc_id);
706 			doc_id = fts_update_doc_id(table, ufield,
707 						   &trx->fts_next_doc_id);
708 			n_fields_updated++;
709 			fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
710 		} else  {
711 			if (doc_id_updated) {
712 				ut_ad(new_doc_id);
713 				fts_trx_add_op(trx, table, new_doc_id,
714 					       FTS_INSERT, NULL);
715 			} else {
716 				fprintf(stderr, "InnoDB: FTS Doc ID must be "
717 					"updated along with FTS indexed "
718 					"column for table ");
719 				ut_print_name(stderr, trx, TRUE, table->name);
720 				putc('\n', stderr);
721 				return(ULINT_UNDEFINED);
722 			}
723 		}
724 	}
725 
726 	update->n_fields = n_fields_updated;
727 
728 	return(n_fields_updated);
729 }
730 
731 /*********************************************************************//**
732 Set detailed error message associated with foreign key errors for
733 the given transaction. */
734 static
735 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)736 row_ins_set_detailed(
737 /*=================*/
738 	trx_t*		trx,		/*!< in: transaction */
739 	dict_foreign_t*	foreign)	/*!< in: foreign key constraint */
740 {
741 	ut_ad(!srv_read_only_mode);
742 
743 	mutex_enter(&srv_misc_tmpfile_mutex);
744 	rewind(srv_misc_tmpfile);
745 
746 	if (os_file_set_eof(srv_misc_tmpfile)) {
747 		ut_print_name(srv_misc_tmpfile, trx, TRUE,
748 			      foreign->foreign_table_name);
749 		dict_print_info_on_foreign_key_in_create_format(
750 			srv_misc_tmpfile, trx, foreign, FALSE);
751 		trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
752 	} else {
753 		trx_set_detailed_error(trx, "temp file operation failed");
754 	}
755 
756 	mutex_exit(&srv_misc_tmpfile_mutex);
757 }
758 
759 /*********************************************************************//**
760 Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
761 and displays information about the given transaction.
762 The caller must release dict_foreign_err_mutex. */
763 static
764 void
row_ins_foreign_trx_print(trx_t * trx)765 row_ins_foreign_trx_print(
766 /*======================*/
767 	trx_t*	trx)	/*!< in: transaction */
768 {
769 	ulint	n_rec_locks;
770 	ulint	n_trx_locks;
771 	ulint	heap_size;
772 
773 	if (srv_read_only_mode) {
774 		return;
775 	}
776 
777 	lock_mutex_enter();
778 	n_rec_locks = lock_number_of_rows_locked(&trx->lock);
779 	n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
780 	heap_size = mem_heap_get_size(trx->lock.lock_heap);
781 	lock_mutex_exit();
782 
783 	mutex_enter(&trx_sys->mutex);
784 
785 	mutex_enter(&dict_foreign_err_mutex);
786 	rewind(dict_foreign_err_file);
787 	ut_print_timestamp(dict_foreign_err_file);
788 	fputs(" Transaction:\n", dict_foreign_err_file);
789 
790 	trx_print_low(dict_foreign_err_file, trx, 600,
791 		      n_rec_locks, n_trx_locks, heap_size);
792 
793 	mutex_exit(&trx_sys->mutex);
794 
795 	ut_ad(mutex_own(&dict_foreign_err_mutex));
796 }
797 
798 /*********************************************************************//**
799 Reports a foreign key error associated with an update or a delete of a
800 parent table index entry. */
801 static
802 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)803 row_ins_foreign_report_err(
804 /*=======================*/
805 	const char*	errstr,		/*!< in: error string from the viewpoint
806 					of the parent table */
807 	que_thr_t*	thr,		/*!< in: query thread whose run_node
808 					is an update node */
809 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
810 	const rec_t*	rec,		/*!< in: a matching index record in the
811 					child table */
812 	const dtuple_t*	entry)		/*!< in: index entry in the parent
813 					table */
814 {
815 	if (srv_read_only_mode) {
816 		return;
817 	}
818 
819 	FILE*	ef	= dict_foreign_err_file;
820 	trx_t*	trx	= thr_get_trx(thr);
821 
822 	row_ins_set_detailed(trx, foreign);
823 
824 	row_ins_foreign_trx_print(trx);
825 
826 	fputs("Foreign key constraint fails for table ", ef);
827 	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
828 	fputs(":\n", ef);
829 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
830 							TRUE);
831 	putc('\n', ef);
832 	fputs(errstr, ef);
833 	fputs(" in parent table, in index ", ef);
834 	ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
835 	if (entry) {
836 		fputs(" tuple:\n", ef);
837 		dtuple_print(ef, entry);
838 	}
839 	fputs("\nBut in child table ", ef);
840 	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
841 	fputs(", in index ", ef);
842 	ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
843 	if (rec) {
844 		fputs(", there is a record:\n", ef);
845 		rec_print(ef, rec, foreign->foreign_index);
846 	} else {
847 		fputs(", the record is not available\n", ef);
848 	}
849 	putc('\n', ef);
850 
851 	mutex_exit(&dict_foreign_err_mutex);
852 }
853 
854 /*********************************************************************//**
855 Reports a foreign key error to dict_foreign_err_file when we are trying
856 to add an index entry to a child table. Note that the adding may be the result
857 of an update, too. */
858 static
859 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)860 row_ins_foreign_report_add_err(
861 /*===========================*/
862 	trx_t*		trx,		/*!< in: transaction */
863 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
864 	const rec_t*	rec,		/*!< in: a record in the parent table:
865 					it does not match entry because we
866 					have an error! */
867 	const dtuple_t*	entry)		/*!< in: index entry to insert in the
868 					child table */
869 {
870 	if (srv_read_only_mode) {
871 		return;
872 	}
873 
874 	FILE*	ef	= dict_foreign_err_file;
875 
876 	row_ins_set_detailed(trx, foreign);
877 
878 	row_ins_foreign_trx_print(trx);
879 
880 	fputs("Foreign key constraint fails for table ", ef);
881 	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
882 	fputs(":\n", ef);
883 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
884 							TRUE);
885 	fputs("\nTrying to add in child table, in index ", ef);
886 	ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
887 	if (entry) {
888 		fputs(" tuple:\n", ef);
889 		/* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
890 		It would be better to only display the user columns. */
891 		dtuple_print(ef, entry);
892 	}
893 	fputs("\nBut in parent table ", ef);
894 	ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
895 	fputs(", in index ", ef);
896 	ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
897 	fputs(",\nthe closest match we can find is record:\n", ef);
898 	if (rec && page_rec_is_supremum(rec)) {
899 		/* If the cursor ended on a supremum record, it is better
900 		to report the previous record in the error message, so that
901 		the user gets a more descriptive error message. */
902 		rec = page_rec_get_prev_const(rec);
903 	}
904 
905 	if (rec) {
906 		rec_print(ef, rec, foreign->referenced_index);
907 	}
908 	putc('\n', ef);
909 
910 	mutex_exit(&dict_foreign_err_mutex);
911 }
912 
913 /*********************************************************************//**
914 Invalidate the query cache for the given table. */
915 static
916 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)917 row_ins_invalidate_query_cache(
918 /*===========================*/
919 	que_thr_t*	thr,		/*!< in: query thread whose run_node
920 					is an update node */
921 	const char*	name)		/*!< in: table name prefixed with
922 					database name and a '/' character */
923 {
924 	char*	buf;
925 	char*	ptr;
926 	ulint	len = strlen(name) + 1;
927 
928 	buf = mem_strdupl(name, len);
929 
930 	ptr = strchr(buf, '/');
931 	ut_a(ptr);
932 	*ptr = '\0';
933 
934 	innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
935 	mem_free(buf);
936 }
937 
938 /*********************************************************************//**
939 Perform referential actions or checks when a parent row is deleted or updated
940 and the constraint had an ON DELETE or ON UPDATE condition which was not
941 RESTRICT.
942 @return	DB_SUCCESS, DB_LOCK_WAIT, or error code */
943 static MY_ATTRIBUTE((nonnull, warn_unused_result))
944 dberr_t
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)945 row_ins_foreign_check_on_constraint(
946 /*================================*/
947 	que_thr_t*	thr,		/*!< in: query thread whose run_node
948 					is an update node */
949 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
950 					type is != 0 */
951 	btr_pcur_t*	pcur,		/*!< in: cursor placed on a matching
952 					index record in the child table */
953 	dtuple_t*	entry,		/*!< in: index entry in the parent
954 					table */
955 	mtr_t*		mtr)		/*!< in: mtr holding the latch of pcur
956 					page */
957 {
958 	upd_node_t*	node;
959 	upd_node_t*	cascade;
960 	dict_table_t*	table		= foreign->foreign_table;
961 	dict_index_t*	index;
962 	dict_index_t*	clust_index;
963 	dtuple_t*	ref;
964 	mem_heap_t*	upd_vec_heap	= NULL;
965 	const rec_t*	rec;
966 	const rec_t*	clust_rec;
967 	const buf_block_t* clust_block;
968 	upd_t*		update;
969 	ulint		n_to_update;
970 	dberr_t		err;
971 	ulint		i;
972 	trx_t*		trx;
973 	mem_heap_t*	tmp_heap	= NULL;
974 	doc_id_t	doc_id = FTS_NULL_DOC_ID;
975 	ibool		fts_col_affacted = FALSE;
976 
977 	ut_a(thr);
978 	ut_a(foreign);
979 	ut_a(pcur);
980 	ut_a(mtr);
981 
982 	trx = thr_get_trx(thr);
983 
984 	/* Since we are going to delete or update a row, we have to invalidate
985 	the MySQL query cache for table. A deadlock of threads is not possible
986 	here because the caller of this function does not hold any latches with
987 	the sync0sync.h rank above the lock_sys_t::mutex. The query cache mutex
988        	has a rank just above the lock_sys_t::mutex. */
989 
990 	row_ins_invalidate_query_cache(thr, table->name);
991 
992 	node = static_cast<upd_node_t*>(thr->run_node);
993 
994 	if (node->is_delete && 0 == (foreign->type
995 				     & (DICT_FOREIGN_ON_DELETE_CASCADE
996 					| DICT_FOREIGN_ON_DELETE_SET_NULL))) {
997 
998 		row_ins_foreign_report_err("Trying to delete",
999 					   thr, foreign,
1000 					   btr_pcur_get_rec(pcur), entry);
1001 
1002 		return(DB_ROW_IS_REFERENCED);
1003 	}
1004 
1005 	if (!node->is_delete && 0 == (foreign->type
1006 				      & (DICT_FOREIGN_ON_UPDATE_CASCADE
1007 					 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
1008 
1009 		/* This is an UPDATE */
1010 
1011 		row_ins_foreign_report_err("Trying to update",
1012 					   thr, foreign,
1013 					   btr_pcur_get_rec(pcur), entry);
1014 
1015 		return(DB_ROW_IS_REFERENCED);
1016 	}
1017 
1018 	if (node->cascade_node == NULL) {
1019 		/* Extend our query graph by creating a child to current
1020 		update node. The child is used in the cascade or set null
1021 		operation. */
1022 
1023 		node->cascade_heap = mem_heap_create(128);
1024 		node->cascade_node = row_create_update_node_for_mysql(
1025 			table, node->cascade_heap);
1026 		que_node_set_parent(node->cascade_node, node);
1027 	}
1028 
1029 	/* Initialize cascade_node to do the operation we want. Note that we
1030 	use the SAME cascade node to do all foreign key operations of the
1031 	SQL DELETE: the table of the cascade node may change if there are
1032 	several child tables to the table where the delete is done! */
1033 
1034 	cascade = node->cascade_node;
1035 
1036 	cascade->table = table;
1037 
1038 	cascade->foreign = foreign;
1039 
1040 	if (node->is_delete
1041 	    && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1042 		cascade->is_delete = TRUE;
1043 	} else {
1044 		cascade->is_delete = FALSE;
1045 
1046 		if (foreign->n_fields > cascade->update_n_fields) {
1047 			/* We have to make the update vector longer */
1048 
1049 			cascade->update = upd_create(foreign->n_fields,
1050 						     node->cascade_heap);
1051 			cascade->update_n_fields = foreign->n_fields;
1052 		}
1053 	}
1054 
1055 	/* We do not allow cyclic cascaded updating (DELETE is allowed,
1056 	but not UPDATE) of the same table, as this can lead to an infinite
1057 	cycle. Check that we are not updating the same table which is
1058 	already being modified in this cascade chain. We have to check
1059 	this also because the modification of the indexes of a 'parent'
1060 	table may still be incomplete, and we must avoid seeing the indexes
1061 	of the parent table in an inconsistent state! */
1062 
1063 	if (!cascade->is_delete
1064 	    && row_ins_cascade_ancestor_updates_table(cascade, table)) {
1065 
1066 		/* We do not know if this would break foreign key
1067 		constraints, but play safe and return an error */
1068 
1069 		err = DB_ROW_IS_REFERENCED;
1070 
1071 		row_ins_foreign_report_err(
1072 			"Trying an update, possibly causing a cyclic"
1073 			" cascaded update\n"
1074 			"in the child table,", thr, foreign,
1075 			btr_pcur_get_rec(pcur), entry);
1076 
1077 		goto nonstandard_exit_func;
1078 	}
1079 
1080 	if (row_ins_cascade_n_ancestors(cascade) >= 15) {
1081 		err = DB_ROW_IS_REFERENCED;
1082 
1083 		row_ins_foreign_report_err(
1084 			"Trying a too deep cascaded delete or update\n",
1085 			thr, foreign, btr_pcur_get_rec(pcur), entry);
1086 
1087 		goto nonstandard_exit_func;
1088 	}
1089 
1090 	index = btr_pcur_get_btr_cur(pcur)->index;
1091 
1092 	ut_a(index == foreign->foreign_index);
1093 
1094 	rec = btr_pcur_get_rec(pcur);
1095 
1096 	tmp_heap = mem_heap_create(256);
1097 
1098 	if (dict_index_is_clust(index)) {
1099 		/* pcur is already positioned in the clustered index of
1100 		the child table */
1101 
1102 		clust_index = index;
1103 		clust_rec = rec;
1104 		clust_block = btr_pcur_get_block(pcur);
1105 	} else {
1106 		/* We have to look for the record in the clustered index
1107 		in the child table */
1108 
1109 		clust_index = dict_table_get_first_index(table);
1110 
1111 		ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1112 					tmp_heap);
1113 		btr_pcur_open_with_no_init(clust_index, ref,
1114 					   PAGE_CUR_LE, BTR_SEARCH_LEAF,
1115 					   cascade->pcur, 0, mtr);
1116 
1117 		clust_rec = btr_pcur_get_rec(cascade->pcur);
1118 		clust_block = btr_pcur_get_block(cascade->pcur);
1119 
1120 		if (!page_rec_is_user_rec(clust_rec)
1121 		    || btr_pcur_get_low_match(cascade->pcur)
1122 		    < dict_index_get_n_unique(clust_index)) {
1123 
1124 			fputs("InnoDB: error in cascade of a foreign key op\n"
1125 			      "InnoDB: ", stderr);
1126 			dict_index_name_print(stderr, trx, index);
1127 
1128 			fputs("\n"
1129 			      "InnoDB: record ", stderr);
1130 			rec_print(stderr, rec, index);
1131 			fputs("\n"
1132 			      "InnoDB: clustered record ", stderr);
1133 			rec_print(stderr, clust_rec, clust_index);
1134 			fputs("\n"
1135 			      "InnoDB: Submit a detailed bug report to"
1136 			      " http://bugs.mysql.com\n", stderr);
1137 			ut_ad(0);
1138 			err = DB_SUCCESS;
1139 
1140 			goto nonstandard_exit_func;
1141 		}
1142 	}
1143 
1144 	/* Set an X-lock on the row to delete or update in the child table */
1145 
1146 	err = lock_table(0, table, LOCK_IX, thr);
1147 
1148 	if (err == DB_SUCCESS) {
1149 		/* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1150 		we already have a normal shared lock on the appropriate
1151 		gap if the search criterion was not unique */
1152 
1153 		err = lock_clust_rec_read_check_and_lock_alt(
1154 			0, clust_block, clust_rec, clust_index,
1155 			LOCK_X, LOCK_REC_NOT_GAP, thr);
1156 	}
1157 
1158 	if (err != DB_SUCCESS) {
1159 
1160 		goto nonstandard_exit_func;
1161 	}
1162 
1163 	if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1164 		/* This can happen if there is a circular reference of
1165 		rows such that cascading delete comes to delete a row
1166 		already in the process of being delete marked */
1167 		err = DB_SUCCESS;
1168 
1169 		goto nonstandard_exit_func;
1170 	}
1171 
1172 	if (table->fts) {
1173 		doc_id = fts_get_doc_id_from_rec(table, clust_rec, tmp_heap);
1174 	}
1175 
1176 	if (node->is_delete
1177 	    ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1178 	    : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1179 
1180 		/* Build the appropriate update vector which sets
1181 		foreign->n_fields first fields in rec to SQL NULL */
1182 
1183 		update = cascade->update;
1184 
1185 		update->info_bits = 0;
1186 		update->n_fields = foreign->n_fields;
1187 		UNIV_MEM_INVALID(update->fields,
1188 				 update->n_fields * sizeof *update->fields);
1189 
1190 		for (i = 0; i < foreign->n_fields; i++) {
1191 			upd_field_t*	ufield = &update->fields[i];
1192 
1193 			ufield->field_no = dict_table_get_nth_col_pos(
1194 				table,
1195 				dict_index_get_nth_col_no(index, i));
1196 			ufield->orig_len = 0;
1197 			ufield->exp = NULL;
1198 			dfield_set_null(&ufield->new_val);
1199 
1200 			if (table->fts && dict_table_is_fts_column(
1201 				table->fts->indexes,
1202 				dict_index_get_nth_col_no(index, i))
1203 				!= ULINT_UNDEFINED) {
1204 				fts_col_affacted = TRUE;
1205 			}
1206 		}
1207 
1208 		if (fts_col_affacted) {
1209 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1210 		}
1211 	} else if (table->fts && cascade->is_delete) {
1212 		/* DICT_FOREIGN_ON_DELETE_CASCADE case */
1213 		for (i = 0; i < foreign->n_fields; i++) {
1214 			if (table->fts && dict_table_is_fts_column(
1215 				table->fts->indexes,
1216 				dict_index_get_nth_col_no(index, i))
1217 				!= ULINT_UNDEFINED) {
1218 				fts_col_affacted = TRUE;
1219 			}
1220 		}
1221 
1222 		if (fts_col_affacted) {
1223 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1224 		}
1225 	}
1226 
1227 	if (!node->is_delete
1228 	    && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1229 
1230 		/* Build the appropriate update vector which sets changing
1231 		foreign->n_fields first fields in rec to new values */
1232 
1233 		upd_vec_heap = mem_heap_create(256);
1234 
1235 		n_to_update = row_ins_cascade_calc_update_vec(
1236 			node, foreign, upd_vec_heap, trx, &fts_col_affacted);
1237 
1238 		if (n_to_update == ULINT_UNDEFINED) {
1239 			err = DB_ROW_IS_REFERENCED;
1240 
1241 			row_ins_foreign_report_err(
1242 				"Trying a cascaded update where the"
1243 				" updated value in the child\n"
1244 				"table would not fit in the length"
1245 				" of the column, or the value would\n"
1246 				"be NULL and the column is"
1247 				" declared as not NULL in the child table,",
1248 				thr, foreign, btr_pcur_get_rec(pcur), entry);
1249 
1250 			goto nonstandard_exit_func;
1251 		}
1252 
1253 		if (cascade->update->n_fields == 0) {
1254 
1255 			/* The update does not change any columns referred
1256 			to in this foreign key constraint: no need to do
1257 			anything */
1258 
1259 			err = DB_SUCCESS;
1260 
1261 			goto nonstandard_exit_func;
1262 		}
1263 
1264 		/* Mark the old Doc ID as deleted */
1265 		if (fts_col_affacted) {
1266 			ut_ad(table->fts);
1267 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1268 		}
1269 	}
1270 
1271 	/* Store pcur position and initialize or store the cascade node
1272 	pcur stored position */
1273 
1274 	btr_pcur_store_position(pcur, mtr);
1275 
1276 	if (index == clust_index) {
1277 		btr_pcur_copy_stored_position(cascade->pcur, pcur);
1278 	} else {
1279 		btr_pcur_store_position(cascade->pcur, mtr);
1280 	}
1281 
1282 	mtr_commit(mtr);
1283 
1284 	ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1285 
1286 	cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1287 
1288 	err = row_update_cascade_for_mysql(thr, cascade,
1289 					   foreign->foreign_table);
1290 
1291 	if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
1292 		fprintf(stderr,
1293 			"InnoDB: error: table %s has the counter 0"
1294 			" though there is\n"
1295 			"InnoDB: a FOREIGN KEY check running on it.\n",
1296 			foreign->foreign_table->name);
1297 	}
1298 
1299 	/* Release the data dictionary latch for a while, so that we do not
1300 	starve other threads from doing CREATE TABLE etc. if we have a huge
1301 	cascaded operation running. The counter n_foreign_key_checks_running
1302 	will prevent other users from dropping or ALTERing the table when we
1303 	release the latch. */
1304 
1305 	row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1306 
1307 	DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1308 
1309 	row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1310 
1311 	mtr_start(mtr);
1312 
1313 	/* Restore pcur position */
1314 
1315 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1316 
1317 	if (tmp_heap) {
1318 		mem_heap_free(tmp_heap);
1319 	}
1320 
1321 	if (upd_vec_heap) {
1322 		mem_heap_free(upd_vec_heap);
1323 	}
1324 
1325 	return(err);
1326 
1327 nonstandard_exit_func:
1328 	if (tmp_heap) {
1329 		mem_heap_free(tmp_heap);
1330 	}
1331 
1332 	if (upd_vec_heap) {
1333 		mem_heap_free(upd_vec_heap);
1334 	}
1335 
1336 	btr_pcur_store_position(pcur, mtr);
1337 
1338 	mtr_commit(mtr);
1339 	mtr_start(mtr);
1340 
1341 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1342 
1343 	return(err);
1344 }
1345 
1346 /*********************************************************************//**
1347 Sets a shared lock on a record. Used in locking possible duplicate key
1348 records and also in checking foreign key constraints.
1349 @return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1350 static
1351 dberr_t
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1352 row_ins_set_shared_rec_lock(
1353 /*========================*/
1354 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1355 					LOCK_REC_NOT_GAP type lock */
1356 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1357 	const rec_t*		rec,	/*!< in: record */
1358 	dict_index_t*		index,	/*!< in: index */
1359 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1360 	que_thr_t*		thr)	/*!< in: query thread */
1361 {
1362 	dberr_t	err;
1363 
1364 	ut_ad(rec_offs_validate(rec, index, offsets));
1365 
1366 	if (dict_index_is_clust(index)) {
1367 		err = lock_clust_rec_read_check_and_lock(
1368 			0, block, rec, index, offsets, LOCK_S, type, thr);
1369 	} else {
1370 		err = lock_sec_rec_read_check_and_lock(
1371 			0, block, rec, index, offsets, LOCK_S, type, thr);
1372 	}
1373 
1374 	return(err);
1375 }
1376 
1377 /*********************************************************************//**
1378 Sets a exclusive lock on a record. Used in locking possible duplicate key
1379 records
1380 @return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1381 static
1382 dberr_t
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1383 row_ins_set_exclusive_rec_lock(
1384 /*===========================*/
1385 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1386 					LOCK_REC_NOT_GAP type lock */
1387 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1388 	const rec_t*		rec,	/*!< in: record */
1389 	dict_index_t*		index,	/*!< in: index */
1390 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1391 	que_thr_t*		thr)	/*!< in: query thread */
1392 {
1393 	dberr_t	err;
1394 
1395 	ut_ad(rec_offs_validate(rec, index, offsets));
1396 
1397 	if (dict_index_is_clust(index)) {
1398 		err = lock_clust_rec_read_check_and_lock(
1399 			0, block, rec, index, offsets, LOCK_X, type, thr);
1400 	} else {
1401 		err = lock_sec_rec_read_check_and_lock(
1402 			0, block, rec, index, offsets, LOCK_X, type, thr);
1403 	}
1404 
1405 	return(err);
1406 }
1407 
1408 /***************************************************************//**
1409 Checks if foreign key constraint fails for an index entry. Sets shared locks
1410 which lock either the success or the failure of the constraint. NOTE that
1411 the caller must have a shared latch on dict_operation_lock.
1412 @return	DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1413 UNIV_INTERN
1414 dberr_t
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1415 row_ins_check_foreign_constraint(
1416 /*=============================*/
1417 	ibool		check_ref,/*!< in: TRUE if we want to check that
1418 				the referenced table is ok, FALSE if we
1419 				want to check the foreign key table */
1420 	dict_foreign_t*	foreign,/*!< in: foreign constraint; NOTE that the
1421 				tables mentioned in it must be in the
1422 				dictionary cache if they exist at all */
1423 	dict_table_t*	table,	/*!< in: if check_ref is TRUE, then the foreign
1424 				table, else the referenced table */
1425 	dtuple_t*	entry,	/*!< in: index entry for index */
1426 	que_thr_t*	thr)	/*!< in: query thread */
1427 {
1428 	dberr_t		err;
1429 	upd_node_t*	upd_node;
1430 	dict_table_t*	check_table;
1431 	dict_index_t*	check_index;
1432 	ulint		n_fields_cmp;
1433 	btr_pcur_t	pcur;
1434 	int		cmp;
1435 	ulint		i;
1436 	mtr_t		mtr;
1437 	trx_t*		trx		= thr_get_trx(thr);
1438 	mem_heap_t*	heap		= NULL;
1439 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1440 	ulint*		offsets		= offsets_;
1441 	rec_offs_init(offsets_);
1442 
1443 run_again:
1444 #ifdef UNIV_SYNC_DEBUG
1445 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
1446 #endif /* UNIV_SYNC_DEBUG */
1447 
1448 	err = DB_SUCCESS;
1449 
1450 	if (trx->check_foreigns == FALSE) {
1451 		/* The user has suppressed foreign key checks currently for
1452 		this session */
1453 		goto exit_func;
1454 	}
1455 
1456 	/* If any of the foreign key fields in entry is SQL NULL, we
1457 	suppress the foreign key check: this is compatible with Oracle,
1458 	for example */
1459 
1460 	for (i = 0; i < foreign->n_fields; i++) {
1461 		if (UNIV_SQL_NULL == dfield_get_len(
1462 			    dtuple_get_nth_field(entry, i))) {
1463 
1464 			goto exit_func;
1465 		}
1466 	}
1467 
1468 	if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1469 		upd_node = static_cast<upd_node_t*>(thr->run_node);
1470 
1471 		if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1472 			/* If a cascaded update is done as defined by a
1473 			foreign key constraint, do not check that
1474 			constraint for the child row. In ON UPDATE CASCADE
1475 			the update of the parent row is only half done when
1476 			we come here: if we would check the constraint here
1477 			for the child row it would fail.
1478 
1479 			A QUESTION remains: if in the child table there are
1480 			several constraints which refer to the same parent
1481 			table, we should merge all updates to the child as
1482 			one update? And the updates can be contradictory!
1483 			Currently we just perform the update associated
1484 			with each foreign key constraint, one after
1485 			another, and the user has problems predicting in
1486 			which order they are performed. */
1487 
1488 			goto exit_func;
1489 		}
1490 	}
1491 
1492 	if (check_ref) {
1493 		check_table = foreign->referenced_table;
1494 		check_index = foreign->referenced_index;
1495 	} else {
1496 		check_table = foreign->foreign_table;
1497 		check_index = foreign->foreign_index;
1498 	}
1499 
1500 	if (check_table == NULL
1501 	    || check_table->ibd_file_missing
1502 	    || check_index == NULL) {
1503 
1504 		if (!srv_read_only_mode && check_ref) {
1505 			FILE*	ef = dict_foreign_err_file;
1506 
1507 			row_ins_set_detailed(trx, foreign);
1508 
1509 			row_ins_foreign_trx_print(trx);
1510 
1511 			fputs("Foreign key constraint fails for table ", ef);
1512 			ut_print_name(ef, trx, TRUE,
1513 				      foreign->foreign_table_name);
1514 			fputs(":\n", ef);
1515 			dict_print_info_on_foreign_key_in_create_format(
1516 				ef, trx, foreign, TRUE);
1517 			fputs("\nTrying to add to index ", ef);
1518 			ut_print_name(ef, trx, FALSE,
1519 				      foreign->foreign_index->name);
1520 			fputs(" tuple:\n", ef);
1521 			dtuple_print(ef, entry);
1522 			fputs("\nBut the parent table ", ef);
1523 			ut_print_name(ef, trx, TRUE,
1524 				      foreign->referenced_table_name);
1525 			fputs("\nor its .ibd file does"
1526 			      " not currently exist!\n", ef);
1527 			mutex_exit(&dict_foreign_err_mutex);
1528 
1529 			err = DB_NO_REFERENCED_ROW;
1530 		}
1531 
1532 		goto exit_func;
1533 	}
1534 
1535 	if (check_table != table) {
1536 		/* We already have a LOCK_IX on table, but not necessarily
1537 		on check_table */
1538 
1539 		err = lock_table(0, check_table, LOCK_IS, thr);
1540 
1541 		if (err != DB_SUCCESS) {
1542 
1543 			goto do_possible_lock_wait;
1544 		}
1545 	}
1546 
1547 	mtr_start(&mtr);
1548 
1549 	/* Store old value on n_fields_cmp */
1550 
1551 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1552 
1553 	dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1554 
1555 	btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1556 		      BTR_SEARCH_LEAF, &pcur, &mtr);
1557 
1558 	/* Scan index records and check if there is a matching record */
1559 
1560 	do {
1561 		const rec_t*		rec = btr_pcur_get_rec(&pcur);
1562 		const buf_block_t*	block = btr_pcur_get_block(&pcur);
1563 
1564 		SRV_CORRUPT_TABLE_CHECK(block,
1565 		{
1566 			err = DB_CORRUPTION;
1567 			goto exit_loop;
1568 		});
1569 
1570 		if (page_rec_is_infimum(rec)) {
1571 
1572 			continue;
1573 		}
1574 
1575 		offsets = rec_get_offsets(rec, check_index,
1576 					  offsets, ULINT_UNDEFINED, &heap);
1577 
1578 		if (page_rec_is_supremum(rec)) {
1579 
1580 			err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1581 							  rec, check_index,
1582 							  offsets, thr);
1583 			switch (err) {
1584 			case DB_SUCCESS_LOCKED_REC:
1585 			case DB_SUCCESS:
1586 				continue;
1587 			default:
1588 				goto end_scan;
1589 			}
1590 		}
1591 
1592 		cmp = cmp_dtuple_rec(entry, rec, offsets);
1593 
1594 		if (cmp == 0) {
1595 			if (rec_get_deleted_flag(rec,
1596 						 rec_offs_comp(offsets))) {
1597 				err = row_ins_set_shared_rec_lock(
1598 					LOCK_ORDINARY, block,
1599 					rec, check_index, offsets, thr);
1600 				switch (err) {
1601 				case DB_SUCCESS_LOCKED_REC:
1602 				case DB_SUCCESS:
1603 					break;
1604 				default:
1605 					goto end_scan;
1606 				}
1607 			} else {
1608 				/* Found a matching record. Lock only
1609 				a record because we can allow inserts
1610 				into gaps */
1611 
1612 				err = row_ins_set_shared_rec_lock(
1613 					LOCK_REC_NOT_GAP, block,
1614 					rec, check_index, offsets, thr);
1615 
1616 				switch (err) {
1617 				case DB_SUCCESS_LOCKED_REC:
1618 				case DB_SUCCESS:
1619 					break;
1620 				default:
1621 					goto end_scan;
1622 				}
1623 
1624 				if (check_ref) {
1625 					err = DB_SUCCESS;
1626 
1627 					goto end_scan;
1628 				} else if (foreign->type != 0) {
1629 					/* There is an ON UPDATE or ON DELETE
1630 					condition: check them in a separate
1631 					function */
1632 
1633 					err = row_ins_foreign_check_on_constraint(
1634 						thr, foreign, &pcur, entry,
1635 						&mtr);
1636 					if (err != DB_SUCCESS) {
1637 						/* Since reporting a plain
1638 						"duplicate key" error
1639 						message to the user in
1640 						cases where a long CASCADE
1641 						operation would lead to a
1642 						duplicate key in some
1643 						other table is very
1644 						confusing, map duplicate
1645 						key errors resulting from
1646 						FK constraints to a
1647 						separate error code. */
1648 
1649 						if (err == DB_DUPLICATE_KEY) {
1650 							err = DB_FOREIGN_DUPLICATE_KEY;
1651 						}
1652 
1653 						goto end_scan;
1654 					}
1655 
1656 					/* row_ins_foreign_check_on_constraint
1657 					may have repositioned pcur on a
1658 					different block */
1659 					block = btr_pcur_get_block(&pcur);
1660 				} else {
1661 					row_ins_foreign_report_err(
1662 						"Trying to delete or update",
1663 						thr, foreign, rec, entry);
1664 
1665 					err = DB_ROW_IS_REFERENCED;
1666 					goto end_scan;
1667 				}
1668 			}
1669 		} else {
1670 			ut_a(cmp < 0);
1671 
1672 			err = row_ins_set_shared_rec_lock(
1673 				LOCK_GAP, block,
1674 				rec, check_index, offsets, thr);
1675 
1676 			switch (err) {
1677 			case DB_SUCCESS_LOCKED_REC:
1678 			case DB_SUCCESS:
1679 				if (check_ref) {
1680 					err = DB_NO_REFERENCED_ROW;
1681 					row_ins_foreign_report_add_err(
1682 						trx, foreign, rec, entry);
1683 				} else {
1684 					err = DB_SUCCESS;
1685 				}
1686 			default:
1687 				break;
1688 			}
1689 
1690 			goto end_scan;
1691 		}
1692 	} while (btr_pcur_move_to_next(&pcur, &mtr));
1693 
1694 exit_loop:
1695 	if (check_ref) {
1696 		row_ins_foreign_report_add_err(
1697 			trx, foreign, btr_pcur_get_rec(&pcur), entry);
1698 		err = DB_NO_REFERENCED_ROW;
1699 	} else {
1700 		err = DB_SUCCESS;
1701 	}
1702 
1703 end_scan:
1704 	btr_pcur_close(&pcur);
1705 
1706 	mtr_commit(&mtr);
1707 
1708 	/* Restore old value */
1709 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1710 
1711 do_possible_lock_wait:
1712 	if (err == DB_LOCK_WAIT) {
1713 		bool		verified = false;
1714 
1715 		trx->error_state = err;
1716 
1717 		que_thr_stop_for_mysql(thr);
1718 
1719 		lock_wait_suspend_thread(thr);
1720 
1721 		if (check_table->to_be_dropped) {
1722 			/* The table is being dropped. We shall timeout
1723 			this operation */
1724 			err = DB_LOCK_WAIT_TIMEOUT;
1725 			goto exit_func;
1726 		}
1727 
1728 		/* We had temporarily released dict_operation_lock in
1729 		above lock sleep wait, now we have the lock again, and
1730 		we will need to re-check whether the foreign key has been
1731 		dropped. We only need to verify if the table is referenced
1732 		table case (check_ref == 0), since MDL lock will prevent
1733 		concurrent DDL and DML on the same table */
1734 		if (!check_ref) {
1735 			for (dict_foreign_set::iterator it
1736 				= table->referenced_set.begin();
1737 			     it != table->referenced_set.end();
1738 			     ++it) {
1739 				if (*it == foreign) {
1740 					verified = true;
1741 					break;
1742 				}
1743 			}
1744 		} else {
1745 			verified = true;
1746 		}
1747 
1748 		if (!verified) {
1749 			err = DB_DICT_CHANGED;
1750 		} else if (trx->error_state == DB_SUCCESS) {
1751 			goto run_again;
1752 		} else {
1753 			err = trx->error_state;
1754 		}
1755 	}
1756 
1757 exit_func:
1758 	if (UNIV_LIKELY_NULL(heap)) {
1759 		mem_heap_free(heap);
1760 	}
1761 
1762 	if (UNIV_UNLIKELY(trx->fake_changes)) {
1763 		err = DB_SUCCESS;
1764 	}
1765 
1766 	return(err);
1767 }
1768 
1769 /***************************************************************//**
1770 Checks if foreign key constraints fail for an index entry. If index
1771 is not mentioned in any constraint, this function does nothing,
1772 Otherwise does searches to the indexes of referenced tables and
1773 sets shared locks which lock either the success or the failure of
1774 a constraint.
1775 @return	DB_SUCCESS or error code */
1776 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1777 dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1778 row_ins_check_foreign_constraints(
1779 /*==============================*/
1780 	dict_table_t*	table,	/*!< in: table */
1781 	dict_index_t*	index,	/*!< in: index */
1782 	dtuple_t*	entry,	/*!< in: index entry for index */
1783 	que_thr_t*	thr)	/*!< in: query thread */
1784 {
1785 	dict_foreign_t*	foreign;
1786 	dberr_t		err;
1787 	trx_t*		trx;
1788 	ibool		got_s_lock	= FALSE;
1789 
1790 	trx = thr_get_trx(thr);
1791 
1792 	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1793 			    "foreign_constraint_check_for_ins");
1794 
1795 	for (dict_foreign_set::iterator it = table->foreign_set.begin();
1796 	     it != table->foreign_set.end();
1797 	     ++it) {
1798 
1799 		foreign = *it;
1800 
1801 		if (foreign->foreign_index == index) {
1802 			dict_table_t*	ref_table = NULL;
1803 			dict_table_t*	foreign_table = foreign->foreign_table;
1804 			dict_table_t*	referenced_table
1805 						= foreign->referenced_table;
1806 
1807 			if (referenced_table == NULL) {
1808 
1809 				ref_table = dict_table_open_on_name(
1810 					foreign->referenced_table_name_lookup,
1811 					FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1812 			}
1813 
1814 			if (0 == trx->dict_operation_lock_mode) {
1815 				got_s_lock = TRUE;
1816 
1817 				row_mysql_freeze_data_dictionary(trx);
1818 			}
1819 
1820 			if (referenced_table) {
1821 				os_inc_counter(dict_sys->mutex,
1822 					       foreign_table
1823 					       ->n_foreign_key_checks_running);
1824 			}
1825 
1826 			/* NOTE that if the thread ends up waiting for a lock
1827 			we will release dict_operation_lock temporarily!
1828 			But the counter on the table protects the referenced
1829 			table from being dropped while the check is running. */
1830 
1831 			err = row_ins_check_foreign_constraint(
1832 				TRUE, foreign, table, entry, thr);
1833 
1834 			DBUG_EXECUTE_IF("row_ins_dict_change_err",
1835 					err = DB_DICT_CHANGED;);
1836 
1837 			if (referenced_table) {
1838 				os_dec_counter(dict_sys->mutex,
1839 					       foreign_table
1840 					       ->n_foreign_key_checks_running);
1841 			}
1842 
1843 			if (got_s_lock) {
1844 				row_mysql_unfreeze_data_dictionary(trx);
1845 			}
1846 
1847 			if (ref_table != NULL) {
1848 				dict_table_close(ref_table, FALSE, FALSE);
1849 			}
1850 
1851 			if (err != DB_SUCCESS) {
1852 
1853 				return(err);
1854 			}
1855 		}
1856 	}
1857 
1858 	return(DB_SUCCESS);
1859 }
1860 
1861 /***************************************************************//**
1862 Checks if a unique key violation to rec would occur at the index entry
1863 insert.
1864 @return	TRUE if error */
1865 static
1866 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1867 row_ins_dupl_error_with_rec(
1868 /*========================*/
1869 	const rec_t*	rec,	/*!< in: user record; NOTE that we assume
1870 				that the caller already has a record lock on
1871 				the record! */
1872 	const dtuple_t*	entry,	/*!< in: entry to insert */
1873 	dict_index_t*	index,	/*!< in: index */
1874 	const ulint*	offsets)/*!< in: rec_get_offsets(rec, index) */
1875 {
1876 	ulint	matched_fields;
1877 	ulint	matched_bytes;
1878 	ulint	n_unique;
1879 	ulint	i;
1880 
1881 	ut_ad(rec_offs_validate(rec, index, offsets));
1882 
1883 	n_unique = dict_index_get_n_unique(index);
1884 
1885 	matched_fields = 0;
1886 	matched_bytes = 0;
1887 
1888 	cmp_dtuple_rec_with_match(entry, rec, offsets,
1889 				  &matched_fields, &matched_bytes);
1890 
1891 	if (matched_fields < n_unique) {
1892 
1893 		return(FALSE);
1894 	}
1895 
1896 	/* In a unique secondary index we allow equal key values if they
1897 	contain SQL NULLs */
1898 
1899 	if (!dict_index_is_clust(index)) {
1900 
1901 		for (i = 0; i < n_unique; i++) {
1902 			if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1903 
1904 				return(FALSE);
1905 			}
1906 		}
1907 	}
1908 
1909 	return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1910 }
1911 
1912 /***************************************************************//**
1913 Scans a unique non-clustered index at a given index entry to determine
1914 whether a uniqueness violation has occurred for the key value of the entry.
1915 Set shared locks on possible duplicate records.
1916 @return	DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
1917 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1918 dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)1919 row_ins_scan_sec_index_for_duplicate(
1920 /*=================================*/
1921 	ulint		flags,	/*!< in: undo logging and locking flags */
1922 	dict_index_t*	index,	/*!< in: non-clustered unique index */
1923 	dtuple_t*	entry,	/*!< in: index entry */
1924 	que_thr_t*	thr,	/*!< in: query thread */
1925 	bool		s_latch,/*!< in: whether index->lock is being held */
1926 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
1927 	mem_heap_t*	offsets_heap)
1928 				/*!< in/out: memory heap that can be emptied */
1929 {
1930 	ulint		n_unique;
1931 	int		cmp;
1932 	ulint		n_fields_cmp;
1933 	btr_pcur_t	pcur;
1934 	dberr_t		err		= DB_SUCCESS;
1935 	ulint		allow_duplicates;
1936 	ulint*		offsets		= NULL;
1937 
1938 #ifdef UNIV_SYNC_DEBUG
1939 	ut_ad(s_latch == rw_lock_own(&index->lock, RW_LOCK_SHARED));
1940 #endif /* UNIV_SYNC_DEBUG */
1941 
1942 	n_unique = dict_index_get_n_unique(index);
1943 
1944 	/* If the secondary index is unique, but one of the fields in the
1945 	n_unique first fields is NULL, a unique key violation cannot occur,
1946 	since we define NULL != NULL in this case */
1947 
1948 	for (ulint i = 0; i < n_unique; i++) {
1949 		if (UNIV_SQL_NULL == dfield_get_len(
1950 			    dtuple_get_nth_field(entry, i))) {
1951 
1952 			return(DB_SUCCESS);
1953 		}
1954 	}
1955 
1956 	/* Store old value on n_fields_cmp */
1957 
1958 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1959 
1960 	dtuple_set_n_fields_cmp(entry, n_unique);
1961 
1962 	btr_pcur_open(index, entry, PAGE_CUR_GE,
1963 		      s_latch
1964 		      ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED
1965 		      : BTR_SEARCH_LEAF,
1966 		      &pcur, mtr);
1967 
1968 	allow_duplicates = thr_get_trx(thr)->duplicates;
1969 
1970 	/* Scan index records and check if there is a duplicate */
1971 
1972 	do {
1973 		const rec_t*		rec	= btr_pcur_get_rec(&pcur);
1974 		const buf_block_t*	block	= btr_pcur_get_block(&pcur);
1975 		const ulint		lock_type = LOCK_ORDINARY;
1976 
1977 		if (page_rec_is_infimum(rec)) {
1978 
1979 			continue;
1980 		}
1981 
1982 		offsets = rec_get_offsets(rec, index, offsets,
1983 					  ULINT_UNDEFINED, &offsets_heap);
1984 
1985 		if (flags & BTR_NO_LOCKING_FLAG) {
1986 			/* Set no locks when applying log
1987 			in online table rebuild. */
1988 		} else if (allow_duplicates) {
1989 
1990 			/* If the SQL-query will update or replace
1991 			duplicate key we will take X-lock for
1992 			duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1993 			INSERT ON DUPLICATE KEY UPDATE). */
1994 
1995 			err = row_ins_set_exclusive_rec_lock(
1996 				lock_type, block, rec, index, offsets, thr);
1997 		} else {
1998 
1999 			err = row_ins_set_shared_rec_lock(
2000 				lock_type, block, rec, index, offsets, thr);
2001 		}
2002 
2003 		switch (err) {
2004 		case DB_SUCCESS_LOCKED_REC:
2005 			err = DB_SUCCESS;
2006 		case DB_SUCCESS:
2007 			break;
2008 		default:
2009 			goto end_scan;
2010 		}
2011 
2012 		if (page_rec_is_supremum(rec)) {
2013 
2014 			continue;
2015 		}
2016 
2017 		cmp = cmp_dtuple_rec(entry, rec, offsets);
2018 
2019 		if (cmp == 0) {
2020 			if (row_ins_dupl_error_with_rec(rec, entry,
2021 							index, offsets)) {
2022 				err = DB_DUPLICATE_KEY;
2023 
2024 				thr_get_trx(thr)->error_info = index;
2025 
2026 				/* If the duplicate is on hidden FTS_DOC_ID,
2027 				state so in the error log */
2028 				if (DICT_TF2_FLAG_IS_SET(
2029 					index->table,
2030 					DICT_TF2_FTS_HAS_DOC_ID)
2031 				    && strcmp(index->name,
2032 					      FTS_DOC_ID_INDEX_NAME) == 0) {
2033 					ib_logf(IB_LOG_LEVEL_ERROR,
2034 						"Duplicate FTS_DOC_ID value"
2035 						" on table %s",
2036 						index->table->name);
2037 				}
2038 
2039 				goto end_scan;
2040 			}
2041 		} else {
2042 			ut_a(cmp < 0);
2043 			goto end_scan;
2044 		}
2045 	} while (btr_pcur_move_to_next(&pcur, mtr));
2046 
2047 end_scan:
2048 	/* Restore old value */
2049 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2050 
2051 	return(err);
2052 }
2053 
2054 /** Checks for a duplicate when the table is being rebuilt online.
2055 @retval DB_SUCCESS		when no duplicate is detected
2056 @retval DB_SUCCESS_LOCKED_REC	when rec is an exact match of entry or
2057 a newer version of entry (the entry should not be inserted)
2058 @retval DB_DUPLICATE_KEY	when entry is a duplicate of rec */
2059 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2060 dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,ulint * offsets)2061 row_ins_duplicate_online(
2062 /*=====================*/
2063 	ulint		n_uniq,	/*!< in: offset of DB_TRX_ID */
2064 	const dtuple_t*	entry,	/*!< in: entry that is being inserted */
2065 	const rec_t*	rec,	/*!< in: clustered index record */
2066 	ulint*		offsets)/*!< in/out: rec_get_offsets(rec) */
2067 {
2068 	ulint	fields	= 0;
2069 	ulint	bytes	= 0;
2070 
2071 	/* During rebuild, there should not be any delete-marked rows
2072 	in the new table. */
2073 	ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2074 	ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2075 
2076 	/* Compare the PRIMARY KEY fields and the
2077 	DB_TRX_ID, DB_ROLL_PTR. */
2078 	cmp_dtuple_rec_with_match_low(
2079 		entry, rec, offsets, n_uniq + 2, &fields, &bytes);
2080 
2081 	if (fields < n_uniq) {
2082 		/* Not a duplicate. */
2083 		return(DB_SUCCESS);
2084 	}
2085 
2086 	if (fields == n_uniq + 2) {
2087 		/* rec is an exact match of entry. */
2088 		ut_ad(bytes == 0);
2089 		return(DB_SUCCESS_LOCKED_REC);
2090 	}
2091 
2092 	return(DB_DUPLICATE_KEY);
2093 }
2094 
2095 /** Checks for a duplicate when the table is being rebuilt online.
2096 @retval DB_SUCCESS		when no duplicate is detected
2097 @retval DB_SUCCESS_LOCKED_REC	when rec is an exact match of entry or
2098 a newer version of entry (the entry should not be inserted)
2099 @retval DB_DUPLICATE_KEY	when entry is a duplicate of rec */
2100 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2101 dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2102 row_ins_duplicate_error_in_clust_online(
2103 /*====================================*/
2104 	ulint		n_uniq,	/*!< in: offset of DB_TRX_ID */
2105 	const dtuple_t*	entry,	/*!< in: entry that is being inserted */
2106 	const btr_cur_t*cursor,	/*!< in: cursor on insert position */
2107 	ulint**		offsets,/*!< in/out: rec_get_offsets(rec) */
2108 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2109 {
2110 	dberr_t		err	= DB_SUCCESS;
2111 	const rec_t*	rec	= btr_cur_get_rec(cursor);
2112 
2113 	if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2114 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2115 					   ULINT_UNDEFINED, heap);
2116 		err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2117 		if (err != DB_SUCCESS) {
2118 			return(err);
2119 		}
2120 	}
2121 
2122 	rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2123 
2124 	if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2125 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2126 					   ULINT_UNDEFINED, heap);
2127 		err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2128 	}
2129 
2130 	return(err);
2131 }
2132 
2133 /***************************************************************//**
2134 Checks if a unique key violation error would occur at an index entry
2135 insert. Sets shared locks on possible duplicate records. Works only
2136 for a clustered index!
2137 @retval DB_SUCCESS if no error
2138 @retval DB_DUPLICATE_KEY if error,
2139 @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2140 record
2141 @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2142 in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2143 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2144 dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2145 row_ins_duplicate_error_in_clust(
2146 /*=============================*/
2147 	ulint		flags,	/*!< in: undo logging and locking flags */
2148 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
2149 	const dtuple_t*	entry,	/*!< in: entry to insert */
2150 	que_thr_t*	thr,	/*!< in: query thread */
2151 	mtr_t*		mtr)	/*!< in: mtr */
2152 {
2153 	dberr_t	err;
2154 	rec_t*	rec;
2155 	ulint	n_unique;
2156 	trx_t*	trx		= thr_get_trx(thr);
2157 	mem_heap_t*heap		= NULL;
2158 	ulint	offsets_[REC_OFFS_NORMAL_SIZE];
2159 	ulint*	offsets		= offsets_;
2160 	rec_offs_init(offsets_);
2161 
2162 	UT_NOT_USED(mtr);
2163 
2164 	ut_ad(dict_index_is_clust(cursor->index));
2165 
2166 	/* NOTE: For unique non-clustered indexes there may be any number
2167 	of delete marked records with the same value for the non-clustered
2168 	index key (remember multiversioning), and which differ only in
2169 	the row refererence part of the index record, containing the
2170 	clustered index key fields. For such a secondary index record,
2171 	to avoid race condition, we must FIRST do the insertion and after
2172 	that check that the uniqueness condition is not breached! */
2173 
2174 	/* NOTE: A problem is that in the B-tree node pointers on an
2175 	upper level may match more to the entry than the actual existing
2176 	user records on the leaf level. So, even if low_match would suggest
2177 	that a duplicate key violation may occur, this may not be the case. */
2178 
2179 	n_unique = dict_index_get_n_unique(cursor->index);
2180 
2181 	if (cursor->low_match >= n_unique) {
2182 
2183 		rec = btr_cur_get_rec(cursor);
2184 
2185 		if (!page_rec_is_infimum(rec)) {
2186 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2187 						  ULINT_UNDEFINED, &heap);
2188 
2189 			/* We set a lock on the possible duplicate: this
2190 			is needed in logical logging of MySQL to make
2191 			sure that in roll-forward we get the same duplicate
2192 			errors as in original execution */
2193 
2194 			if (trx->duplicates) {
2195 
2196 				/* If the SQL-query will update or replace
2197 				duplicate key we will take X-lock for
2198 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2199 				INSERT ON DUPLICATE KEY UPDATE). */
2200 
2201 				err = row_ins_set_exclusive_rec_lock(
2202 					LOCK_REC_NOT_GAP,
2203 					btr_cur_get_block(cursor),
2204 					rec, cursor->index, offsets, thr);
2205 			} else {
2206 
2207 				err = row_ins_set_shared_rec_lock(
2208 					LOCK_REC_NOT_GAP,
2209 					btr_cur_get_block(cursor), rec,
2210 					cursor->index, offsets, thr);
2211 			}
2212 
2213 			switch (err) {
2214 			case DB_SUCCESS_LOCKED_REC:
2215 			case DB_SUCCESS:
2216 				break;
2217 			default:
2218 				goto func_exit;
2219 			}
2220 
2221 			if (row_ins_dupl_error_with_rec(
2222 				    rec, entry, cursor->index, offsets)) {
2223 duplicate:
2224 				trx->error_info = cursor->index;
2225 				err = DB_DUPLICATE_KEY;
2226 				goto func_exit;
2227 			}
2228 		}
2229 	}
2230 
2231 	if (cursor->up_match >= n_unique) {
2232 
2233 		rec = page_rec_get_next(btr_cur_get_rec(cursor));
2234 
2235 		if (!page_rec_is_supremum(rec)) {
2236 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2237 						  ULINT_UNDEFINED, &heap);
2238 
2239 			if (trx->duplicates) {
2240 
2241 				/* If the SQL-query will update or replace
2242 				duplicate key we will take X-lock for
2243 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2244 				INSERT ON DUPLICATE KEY UPDATE). */
2245 
2246 				err = row_ins_set_exclusive_rec_lock(
2247 					LOCK_REC_NOT_GAP,
2248 					btr_cur_get_block(cursor),
2249 					rec, cursor->index, offsets, thr);
2250 			} else {
2251 
2252 				err = row_ins_set_shared_rec_lock(
2253 					LOCK_REC_NOT_GAP,
2254 					btr_cur_get_block(cursor),
2255 					rec, cursor->index, offsets, thr);
2256 			}
2257 
2258 			switch (err) {
2259 			case DB_SUCCESS_LOCKED_REC:
2260 			case DB_SUCCESS:
2261 				break;
2262 			default:
2263 				goto func_exit;
2264 			}
2265 
2266 			if (row_ins_dupl_error_with_rec(
2267 				    rec, entry, cursor->index, offsets)) {
2268 				goto duplicate;
2269 			}
2270 		}
2271 
2272 		/* This should never happen */
2273 		ut_error;
2274 	}
2275 
2276 	err = DB_SUCCESS;
2277 func_exit:
2278 	if (UNIV_LIKELY_NULL(heap)) {
2279 		mem_heap_free(heap);
2280 	}
2281 	return(err);
2282 }
2283 
2284 /***************************************************************//**
2285 Checks if an index entry has long enough common prefix with an
2286 existing record so that the intended insert of the entry must be
2287 changed to a modify of the existing record. In the case of a clustered
2288 index, the prefix must be n_unique fields long. In the case of a
2289 secondary index, all fields must be equal.  InnoDB never updates
2290 secondary index records in place, other than clearing or setting the
2291 delete-mark flag. We could be able to update the non-unique fields
2292 of a unique secondary index record by checking the cursor->up_match,
2293 but we do not do so, because it could have some locking implications.
2294 @return TRUE if the existing record should be updated; FALSE if not */
2295 UNIV_INLINE
2296 ibool
row_ins_must_modify_rec(const btr_cur_t * cursor)2297 row_ins_must_modify_rec(
2298 /*====================*/
2299 	const btr_cur_t*	cursor)	/*!< in: B-tree cursor */
2300 {
2301 	/* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2302 	Because node pointers on upper levels of the B-tree may match more
2303 	to entry than to actual user records on the leaf level, we
2304 	have to check if the candidate record is actually a user record.
2305 	A clustered index node pointer contains index->n_unique first fields,
2306 	and a secondary index node pointer contains all index fields. */
2307 
2308 	return(cursor->low_match
2309 	       >= dict_index_get_n_unique_in_tree(cursor->index)
2310 	       && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2311 }
2312 
2313 /***************************************************************//**
2314 Tries to insert an entry into a clustered index, ignoring foreign key
2315 constraints. If a record with the same unique key is found, the other
2316 record is necessarily marked deleted by a committed transaction, or a
2317 unique key violation error occurs. The delete marked record is then
2318 updated to an existing record, and we must write an undo log record on
2319 the delete marked record.
2320 @retval DB_SUCCESS on success
2321 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2322 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2323 @return error code */
2324 UNIV_INTERN
2325 dberr_t
row_ins_clust_index_entry_low(ulint flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,ulint n_ext,que_thr_t * thr)2326 row_ins_clust_index_entry_low(
2327 /*==========================*/
2328 	ulint		flags,	/*!< in: undo logging and locking flags */
2329 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2330 				depending on whether we wish optimistic or
2331 				pessimistic descent down the index tree */
2332 	dict_index_t*	index,	/*!< in: clustered index */
2333 	ulint		n_uniq,	/*!< in: 0 or index->n_uniq */
2334 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2335 	ulint		n_ext,	/*!< in: number of externally stored columns */
2336 	que_thr_t*	thr)	/*!< in: query thread */
2337 {
2338 	btr_cur_t	cursor;
2339 	ulint*		offsets		= NULL;
2340 	dberr_t		err;
2341 	big_rec_t*	big_rec		= NULL;
2342 	mtr_t		mtr;
2343 	mem_heap_t*	offsets_heap	= NULL;
2344 	ulint		search_mode;
2345 
2346 	ut_ad(dict_index_is_clust(index));
2347 	ut_ad(!dict_index_is_unique(index)
2348 	      || n_uniq == dict_index_get_n_unique(index));
2349 	ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2350 
2351 	/* If running with fake_changes mode on then switch from modify to
2352 	search so that code takes only s-latch and not x-latch.
2353 	For dry-run (fake-changes) s-latch is acceptable. Taking x-latch will
2354 	make it more restrictive and will block real changes/workflow. */
2355 	if (UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2356 		search_mode = (mode & BTR_MODIFY_TREE)
2357 			      ? BTR_SEARCH_TREE : BTR_SEARCH_LEAF;
2358 	} else {
2359 		search_mode = mode;
2360 	}
2361 
2362 	mtr_start(&mtr);
2363 
2364 	if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2365 
2366 		/* We really don't need to OR mode but will leave it for
2367 		code consistency. */
2368 		mode |= BTR_ALREADY_S_LATCHED;
2369 		search_mode |= BTR_ALREADY_S_LATCHED;
2370 
2371 		mtr_s_lock(dict_index_get_lock(index), &mtr);
2372 	}
2373 
2374 	cursor.thr = thr;
2375 
2376 	/* Note that we use PAGE_CUR_LE as the search mode, because then
2377 	the function will return in both low_match and up_match of the
2378 	cursor sensible values */
2379 
2380 	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, search_mode,
2381 				    &cursor, 0, __FILE__, __LINE__, &mtr);
2382 
2383 #ifdef UNIV_DEBUG
2384 	{
2385 		page_t*	page = btr_cur_get_page(&cursor);
2386 		rec_t*	first_rec = page_rec_get_next(
2387 			page_get_infimum_rec(page));
2388 
2389 		ut_ad(page_rec_is_supremum(first_rec)
2390 		      || rec_get_n_fields(first_rec, index)
2391 		      == dtuple_get_n_fields(entry));
2392 	}
2393 #endif
2394 
2395 	if (n_uniq && (cursor.up_match >= n_uniq
2396 		       || cursor.low_match >= n_uniq)) {
2397 
2398 		if (flags
2399 		    == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2400 			| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
2401 			/* Set no locks when applying log
2402 			in online table rebuild. Only check for duplicates. */
2403 			err = row_ins_duplicate_error_in_clust_online(
2404 				n_uniq, entry, &cursor,
2405 				&offsets, &offsets_heap);
2406 
2407 			switch (err) {
2408 			case DB_SUCCESS:
2409 				break;
2410 			default:
2411 				ut_ad(0);
2412 				/* fall through */
2413 			case DB_SUCCESS_LOCKED_REC:
2414 			case DB_DUPLICATE_KEY:
2415 				thr_get_trx(thr)->error_info = cursor.index;
2416 			}
2417 		} else {
2418 			/* Note that the following may return also
2419 			DB_LOCK_WAIT */
2420 
2421 			err = row_ins_duplicate_error_in_clust(
2422 				flags, &cursor, entry, thr, &mtr);
2423 		}
2424 
2425 		if (err != DB_SUCCESS) {
2426 err_exit:
2427 			mtr_commit(&mtr);
2428 			goto func_exit;
2429 		}
2430 	}
2431 
2432 	if (row_ins_must_modify_rec(&cursor)) {
2433 		/* There is already an index entry with a long enough common
2434 		prefix, we must convert the insert into a modify of an
2435 		existing record */
2436 		mem_heap_t*	entry_heap	= mem_heap_create(1024);
2437 
2438 		err = row_ins_clust_index_entry_by_modify(
2439 			flags, mode, &cursor, &offsets, &offsets_heap,
2440 			entry_heap, &big_rec, entry, thr, &mtr);
2441 
2442 		rec_t*		rec		= btr_cur_get_rec(&cursor);
2443 
2444 		if (big_rec && UNIV_LIKELY(!thr_get_trx(thr)->fake_changes)) {
2445 			ut_a(err == DB_SUCCESS);
2446 			/* Write out the externally stored
2447 			columns while still x-latching
2448 			index->lock and block->lock. Allocate
2449 			pages for big_rec in the mtr that
2450 			modified the B-tree, but be sure to skip
2451 			any pages that were freed in mtr. We will
2452 			write out the big_rec pages before
2453 			committing the B-tree mini-transaction. If
2454 			the system crashes so that crash recovery
2455 			will not replay the mtr_commit(&mtr), the
2456 			big_rec pages will be left orphaned until
2457 			the pages are allocated for something else.
2458 
2459 			TODO: If the allocation extends the
2460 			tablespace, it will not be redo
2461 			logged, in either mini-transaction.
2462 			Tablespace extension should be
2463 			redo-logged in the big_rec
2464 			mini-transaction, so that recovery
2465 			will not fail when the big_rec was
2466 			written to the extended portion of the
2467 			file, in case the file was somehow
2468 			truncated in the crash. */
2469 
2470 			DEBUG_SYNC_C_IF_THD(
2471 				thr_get_trx(thr)->mysql_thd,
2472 				"before_row_ins_upd_extern");
2473 			err = btr_store_big_rec_extern_fields(
2474 				index, btr_cur_get_block(&cursor),
2475 				rec, offsets, big_rec, &mtr,
2476 				BTR_STORE_INSERT_UPDATE);
2477 			DEBUG_SYNC_C_IF_THD(
2478 				thr_get_trx(thr)->mysql_thd,
2479 				"after_row_ins_upd_extern");
2480 			/* If writing big_rec fails (for
2481 			example, because of DB_OUT_OF_FILE_SPACE),
2482 			the record will be corrupted. Even if
2483 			we did not update any externally
2484 			stored columns, our update could cause
2485 			the record to grow so that a
2486 			non-updated column was selected for
2487 			external storage. This non-update
2488 			would not have been written to the
2489 			undo log, and thus the record cannot
2490 			be rolled back.
2491 
2492 			However, because we have not executed
2493 			mtr_commit(mtr) yet, the update will
2494 			not be replayed in crash recovery, and
2495 			the following assertion failure will
2496 			effectively "roll back" the operation. */
2497 			ut_a(err == DB_SUCCESS);
2498 			dtuple_big_rec_free(big_rec);
2499 		} else if (big_rec != NULL
2500 			   && UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2501 			dtuple_big_rec_free(big_rec);
2502 		}
2503 
2504 		if (err == DB_SUCCESS
2505 		    && dict_index_is_online_ddl(index)
2506 		    && UNIV_LIKELY(!thr_get_trx(thr)->fake_changes)) {
2507 			row_log_table_insert(rec, index, offsets);
2508 		}
2509 
2510 		mtr_commit(&mtr);
2511 		mem_heap_free(entry_heap);
2512 	} else {
2513 		rec_t*	insert_rec;
2514 
2515 		if (mode != BTR_MODIFY_TREE) {
2516 			ut_ad(((mode & ~BTR_ALREADY_S_LATCHED)
2517 			       == BTR_MODIFY_LEAF)
2518 			      || thr_get_trx(thr)->fake_changes);
2519 			err = btr_cur_optimistic_insert(
2520 				flags, &cursor, &offsets, &offsets_heap,
2521 				entry, &insert_rec, &big_rec,
2522 				n_ext, thr, &mtr);
2523 		} else {
2524 			if (buf_LRU_buf_pool_running_out()) {
2525 
2526 				err = DB_LOCK_TABLE_FULL;
2527 				goto err_exit;
2528 			}
2529 
2530 			err = btr_cur_optimistic_insert(
2531 				flags, &cursor,
2532 				&offsets, &offsets_heap,
2533 				entry, &insert_rec, &big_rec,
2534 				n_ext, thr, &mtr);
2535 
2536 			if (err == DB_FAIL) {
2537 				err = btr_cur_pessimistic_insert(
2538 					flags, &cursor,
2539 					&offsets, &offsets_heap,
2540 					entry, &insert_rec, &big_rec,
2541 					n_ext, thr, &mtr);
2542 			}
2543 		}
2544 
2545 		if (UNIV_LIKELY_NULL(big_rec)) {
2546 			mtr_commit(&mtr);
2547 
2548 			if (UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2549 
2550 				dtuple_convert_back_big_rec(
2551 					index, entry, big_rec);
2552 				goto func_exit;
2553 			}
2554 
2555 			/* Online table rebuild could read (and
2556 			ignore) the incomplete record at this point.
2557 			If online rebuild is in progress, the
2558 			row_ins_index_entry_big_rec() will write log. */
2559 
2560 			DBUG_EXECUTE_IF(
2561 				"row_ins_extern_checkpoint",
2562 				log_make_checkpoint_at(
2563 					LSN_MAX, TRUE););
2564 			err = row_ins_index_entry_big_rec(
2565 				entry, big_rec, offsets, &offsets_heap, index,
2566 				thr_get_trx(thr)->mysql_thd,
2567 				__FILE__, __LINE__);
2568 			dtuple_convert_back_big_rec(index, entry, big_rec);
2569 		} else {
2570 			if (err == DB_SUCCESS
2571 			    && dict_index_is_online_ddl(index)
2572 			    && !UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2573 				row_log_table_insert(
2574 					insert_rec, index, offsets);
2575 			}
2576 
2577 			mtr_commit(&mtr);
2578 		}
2579 	}
2580 
2581 func_exit:
2582 	if (offsets_heap) {
2583 		mem_heap_free(offsets_heap);
2584 	}
2585 
2586 	return(err);
2587 }
2588 
2589 /***************************************************************//**
2590 Starts a mini-transaction and checks if the index will be dropped.
2591 @return true if the index is to be dropped */
2592 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2593 bool
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2594 row_ins_sec_mtr_start_and_check_if_aborted(
2595 /*=======================================*/
2596 	mtr_t*		mtr,	/*!< out: mini-transaction */
2597 	dict_index_t*	index,	/*!< in/out: secondary index */
2598 	bool		check,	/*!< in: whether to check */
2599 	ulint		search_mode)
2600 				/*!< in: flags */
2601 {
2602 	ut_ad(!dict_index_is_clust(index));
2603 
2604 	mtr_start(mtr);
2605 
2606 	if (!check) {
2607 		return(false);
2608 	}
2609 
2610 	if (search_mode & BTR_ALREADY_S_LATCHED) {
2611 		mtr_s_lock(dict_index_get_lock(index), mtr);
2612 	} else {
2613 		mtr_x_lock(dict_index_get_lock(index), mtr);
2614 	}
2615 
2616 	switch (index->online_status) {
2617 	case ONLINE_INDEX_ABORTED:
2618 	case ONLINE_INDEX_ABORTED_DROPPED:
2619 		ut_ad(*index->name == TEMP_INDEX_PREFIX);
2620 		return(true);
2621 	case ONLINE_INDEX_COMPLETE:
2622 		return(false);
2623 	case ONLINE_INDEX_CREATION:
2624 		break;
2625 	}
2626 
2627 	ut_error;
2628 	return(true);
2629 }
2630 
2631 /***************************************************************//**
2632 Tries to insert an entry into a secondary index. If a record with exactly the
2633 same fields is found, the other record is necessarily marked deleted.
2634 It is then unmarked. Otherwise, the entry is just inserted to the index.
2635 @retval DB_SUCCESS on success
2636 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2637 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2638 @return error code */
2639 UNIV_INTERN
2640 dberr_t
row_ins_sec_index_entry_low(ulint flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr)2641 row_ins_sec_index_entry_low(
2642 /*========================*/
2643 	ulint		flags,	/*!< in: undo logging and locking flags */
2644 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2645 				depending on whether we wish optimistic or
2646 				pessimistic descent down the index tree */
2647 	dict_index_t*	index,	/*!< in: secondary index */
2648 	mem_heap_t*	offsets_heap,
2649 				/*!< in/out: memory heap that can be emptied */
2650 	mem_heap_t*	heap,	/*!< in/out: memory heap */
2651 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2652 	trx_id_t	trx_id,	/*!< in: PAGE_MAX_TRX_ID during
2653 				row_log_table_apply(), or 0 */
2654 	que_thr_t*	thr)	/*!< in: query thread */
2655 {
2656 	btr_cur_t	cursor;
2657 	ulint		search_mode;
2658 	dberr_t		err		= DB_SUCCESS;
2659 	ulint		n_unique;
2660 	mtr_t		mtr;
2661 	ulint*		offsets	= NULL;
2662 
2663 	ut_ad(!dict_index_is_clust(index));
2664 	ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2665 
2666 	cursor.thr = thr;
2667 	ut_ad(thr_get_trx(thr)->id);
2668 	mtr_start(&mtr);
2669 
2670 	/* If running with fake_changes mode on then avoid using insert buffer
2671 	and also switch from modify to search so that code takes only s-latch
2672 	and not x-latch. For dry-run (fake-changes) s-latch is acceptable.
2673 	Taking x-latch will make it more restrictive and will block real
2674 	changes/workflow. */
2675 	if (UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2676 		search_mode = (mode & BTR_MODIFY_TREE)
2677 			      ? BTR_SEARCH_TREE : BTR_SEARCH_LEAF;
2678 	} else {
2679 		search_mode = mode | BTR_INSERT;
2680 	}
2681 
2682 	/* Ensure that we acquire index->lock when inserting into an
2683 	index with index->online_status == ONLINE_INDEX_COMPLETE, but
2684 	could still be subject to rollback_inplace_alter_table().
2685 	This prevents a concurrent change of index->online_status.
2686 	The memory object cannot be freed as long as we have an open
2687 	reference to the table, or index->table->n_ref_count > 0. */
2688 	const bool check = *index->name == TEMP_INDEX_PREFIX;
2689 
2690 	if (check) {
2691 
2692 		DEBUG_SYNC_C("row_ins_sec_index_enter");
2693 
2694 		/* mode = MODIFY_LEAF is synonymous to search_mode = SEARCH_LEAF
2695 		search_mode = SEARCH_TREE suggest operation in fake_change mode
2696 		so continue to s-latch in this mode too. */
2697 
2698 		if (mode == BTR_MODIFY_LEAF || search_mode == BTR_SEARCH_TREE) {
2699 
2700 			ut_ad((search_mode == BTR_SEARCH_TREE
2701 			       && thr_get_trx(thr)->fake_changes)
2702 			      || mode == BTR_MODIFY_LEAF);
2703 
2704 			search_mode |= BTR_ALREADY_S_LATCHED;
2705 			mtr_s_lock(dict_index_get_lock(index), &mtr);
2706 
2707 		} else {
2708 			mtr_x_lock(dict_index_get_lock(index), &mtr);
2709 		}
2710 
2711 		if (row_log_online_op_try(
2712 			    index, entry, thr_get_trx(thr)->id)) {
2713 			goto func_exit;
2714 		}
2715 	}
2716 
2717 	if (!thr_get_trx(thr)->check_unique_secondary) {
2718 		search_mode |= BTR_IGNORE_SEC_UNIQUE;
2719 	}
2720 
2721 	/* Note that we use PAGE_CUR_LE as the search mode, because then
2722 	the function will return in both low_match and up_match of the
2723 	cursor sensible values */
2724 	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2725 				    search_mode,
2726 				    &cursor, 0, __FILE__, __LINE__, &mtr);
2727 
2728 	if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2729 		/* The insert was buffered during the search: we are done */
2730 		goto func_exit;
2731 	}
2732 
2733 #ifdef UNIV_DEBUG
2734 	{
2735 		page_t*	page = btr_cur_get_page(&cursor);
2736 		rec_t*	first_rec = page_rec_get_next(
2737 			page_get_infimum_rec(page));
2738 
2739 		ut_ad(page_rec_is_supremum(first_rec)
2740 		      || rec_get_n_fields(first_rec, index)
2741 		      == dtuple_get_n_fields(entry));
2742 	}
2743 #endif
2744 
2745 	n_unique = dict_index_get_n_unique(index);
2746 
2747 	if (dict_index_is_unique(index)
2748 	    && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
2749 		mtr_commit(&mtr);
2750 
2751 		DEBUG_SYNC_C("row_ins_sec_index_unique");
2752 
2753 		if (row_ins_sec_mtr_start_and_check_if_aborted(
2754 			    &mtr, index, check, search_mode)) {
2755 			goto func_exit;
2756 		}
2757 
2758 		err = row_ins_scan_sec_index_for_duplicate(
2759 			flags, index, entry, thr, check, &mtr, offsets_heap);
2760 
2761 		mtr_commit(&mtr);
2762 
2763 		switch (err) {
2764 		case DB_SUCCESS:
2765 			break;
2766 		case DB_DUPLICATE_KEY:
2767 			if (*index->name == TEMP_INDEX_PREFIX) {
2768 				ut_ad(!thr_get_trx(thr)
2769 				      ->dict_operation_lock_mode);
2770 				mutex_enter(&dict_sys->mutex);
2771 				dict_set_corrupted_index_cache_only(
2772 					index, index->table);
2773 				mutex_exit(&dict_sys->mutex);
2774 				/* Do not return any error to the
2775 				caller. The duplicate will be reported
2776 				by ALTER TABLE or CREATE UNIQUE INDEX.
2777 				Unfortunately we cannot report the
2778 				duplicate key value to the DDL thread,
2779 				because the altered_table object is
2780 				private to its call stack. */
2781 				err = DB_SUCCESS;
2782 			}
2783 			/* fall through */
2784 		default:
2785 			return(err);
2786 		}
2787 
2788 		if (row_ins_sec_mtr_start_and_check_if_aborted(
2789 			    &mtr, index, check, search_mode)) {
2790 			goto func_exit;
2791 		}
2792 
2793 		DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
2794 
2795 		/* We did not find a duplicate and we have now
2796 		locked with s-locks the necessary records to
2797 		prevent any insertion of a duplicate by another
2798 		transaction. Let us now reposition the cursor and
2799 		continue the insertion. */
2800 
2801 		btr_cur_search_to_nth_level(
2802 			index, 0, entry, PAGE_CUR_LE,
2803 			search_mode & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE),
2804 			&cursor, 0, __FILE__, __LINE__, &mtr);
2805 	}
2806 
2807 	if (row_ins_must_modify_rec(&cursor)) {
2808 		/* There is already an index entry with a long enough common
2809 		prefix, we must convert the insert into a modify of an
2810 		existing record */
2811 		offsets = rec_get_offsets(
2812 			btr_cur_get_rec(&cursor), index, offsets,
2813 			ULINT_UNDEFINED, &offsets_heap);
2814 
2815 		err = row_ins_sec_index_entry_by_modify(
2816 			flags, mode, &cursor, &offsets,
2817 			offsets_heap, heap, entry, thr, &mtr);
2818 	} else {
2819 		rec_t*		insert_rec;
2820 		big_rec_t*	big_rec;
2821 
2822 		if (mode == BTR_MODIFY_LEAF) {
2823 			err = btr_cur_optimistic_insert(
2824 				flags, &cursor, &offsets, &offsets_heap,
2825 				entry, &insert_rec,
2826 				&big_rec, 0, thr, &mtr);
2827 		} else {
2828 			ut_ad(mode == BTR_MODIFY_TREE);
2829 			if (buf_LRU_buf_pool_running_out()) {
2830 
2831 				err = DB_LOCK_TABLE_FULL;
2832 				goto func_exit;
2833 			}
2834 
2835 			err = btr_cur_optimistic_insert(
2836 				flags, &cursor,
2837 				&offsets, &offsets_heap,
2838 				entry, &insert_rec,
2839 				&big_rec, 0, thr, &mtr);
2840 			if (err == DB_FAIL) {
2841 				err = btr_cur_pessimistic_insert(
2842 					flags, &cursor,
2843 					&offsets, &offsets_heap,
2844 					entry, &insert_rec,
2845 					&big_rec, 0, thr, &mtr);
2846 			}
2847 		}
2848 
2849 		if (err == DB_SUCCESS && trx_id) {
2850 			page_update_max_trx_id(
2851 				btr_cur_get_block(&cursor),
2852 				btr_cur_get_page_zip(&cursor),
2853 				trx_id, &mtr);
2854 		}
2855 
2856 		ut_ad(!big_rec);
2857 	}
2858 
2859 func_exit:
2860 	mtr_commit(&mtr);
2861 	return(err);
2862 }
2863 
2864 /***************************************************************//**
2865 Tries to insert the externally stored fields (off-page columns)
2866 of a clustered index entry.
2867 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
2868 UNIV_INTERN
2869 dberr_t
row_ins_index_entry_big_rec_func(const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,dict_index_t * index,const char * file,const void * thd,ulint line)2870 row_ins_index_entry_big_rec_func(
2871 /*=============================*/
2872 	const dtuple_t*		entry,	/*!< in/out: index entry to insert */
2873 	const big_rec_t*	big_rec,/*!< in: externally stored fields */
2874 	ulint*			offsets,/*!< in/out: rec offsets */
2875 	mem_heap_t**		heap,	/*!< in/out: memory heap */
2876 	dict_index_t*		index,	/*!< in: index */
2877 	const char*		file,	/*!< in: file name of caller */
2878 #ifndef DBUG_OFF
2879 	const void*		thd,	/*!< in: connection, or NULL */
2880 #endif /* DBUG_OFF */
2881 	ulint			line)	/*!< in: line number of caller */
2882 {
2883 	mtr_t		mtr;
2884 	btr_cur_t	cursor;
2885 	rec_t*		rec;
2886 	dberr_t		error;
2887 
2888 	ut_ad(dict_index_is_clust(index));
2889 
2890 	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
2891 
2892 	mtr_start(&mtr);
2893 	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2894 				    BTR_MODIFY_TREE, &cursor, 0,
2895 				    file, line, &mtr);
2896 	rec = btr_cur_get_rec(&cursor);
2897 	offsets = rec_get_offsets(rec, index, offsets,
2898 				  ULINT_UNDEFINED, heap);
2899 
2900 	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
2901 	error = btr_store_big_rec_extern_fields(
2902 		index, btr_cur_get_block(&cursor),
2903 		rec, offsets, big_rec, &mtr, BTR_STORE_INSERT);
2904 	DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
2905 
2906 	if (error == DB_SUCCESS
2907 	    && dict_index_is_online_ddl(index)) {
2908 		row_log_table_insert(rec, index, offsets);
2909 	}
2910 
2911 	mtr_commit(&mtr);
2912 
2913 	return(error);
2914 }
2915 
2916 /***************************************************************//**
2917 Inserts an entry into a clustered index. Tries first optimistic,
2918 then pessimistic descent down the tree. If the entry matches enough
2919 to a delete marked record, performs the insert by updating or delete
2920 unmarking the delete marked record.
2921 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2922 UNIV_INTERN
2923 dberr_t
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,ulint n_ext)2924 row_ins_clust_index_entry(
2925 /*======================*/
2926 	dict_index_t*	index,	/*!< in: clustered index */
2927 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2928 	que_thr_t*	thr,	/*!< in: query thread */
2929 	ulint		n_ext)	/*!< in: number of externally stored columns */
2930 {
2931 	dberr_t	err;
2932 	ulint	n_uniq;
2933 
2934 	if (!index->table->foreign_set.empty()) {
2935 		err = row_ins_check_foreign_constraints(
2936 			index->table, index, entry, thr);
2937 		if (err != DB_SUCCESS) {
2938 
2939 			return(err);
2940 		}
2941 	}
2942 
2943 	n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
2944 
2945 	/* Try first optimistic descent to the B-tree */
2946 
2947 	log_free_check();
2948 
2949 	err = row_ins_clust_index_entry_low(
2950 		0, BTR_MODIFY_LEAF, index, n_uniq, entry, n_ext, thr);
2951 
2952 #ifdef UNIV_DEBUG
2953 	/* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
2954 	Once it is fixed, remove the 'ifdef', 'if' and this comment. */
2955 	if (!thr_get_trx(thr)->ddl) {
2956 		DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
2957 				    "after_row_ins_clust_index_entry_leaf");
2958 	}
2959 #endif /* UNIV_DEBUG */
2960 
2961 	if (err != DB_FAIL) {
2962 		DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
2963 		return(err);
2964 	}
2965 
2966 	/* Try then pessimistic descent to the B-tree */
2967 
2968 	log_free_check();
2969 
2970 	return(row_ins_clust_index_entry_low(
2971 		       0, BTR_MODIFY_TREE, index, n_uniq, entry, n_ext, thr));
2972 }
2973 
2974 /***************************************************************//**
2975 Inserts an entry into a secondary index. Tries first optimistic,
2976 then pessimistic descent down the tree. If the entry matches enough
2977 to a delete marked record, performs the insert by updating or delete
2978 unmarking the delete marked record.
2979 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2980 UNIV_INTERN
2981 dberr_t
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)2982 row_ins_sec_index_entry(
2983 /*====================*/
2984 	dict_index_t*	index,	/*!< in: secondary index */
2985 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2986 	que_thr_t*	thr)	/*!< in: query thread */
2987 {
2988 	dberr_t		err;
2989 	mem_heap_t*	offsets_heap;
2990 	mem_heap_t*	heap;
2991 
2992 	if (!index->table->foreign_set.empty()) {
2993 		err = row_ins_check_foreign_constraints(index->table, index,
2994 							entry, thr);
2995 		if (err != DB_SUCCESS) {
2996 
2997 			return(err);
2998 		}
2999 	}
3000 
3001 	ut_ad(thr_get_trx(thr)->id);
3002 
3003 	offsets_heap = mem_heap_create(1024);
3004 	heap = mem_heap_create(1024);
3005 
3006 	/* Try first optimistic descent to the B-tree */
3007 
3008 	log_free_check();
3009 
3010 	err = row_ins_sec_index_entry_low(
3011 		0, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry, 0, thr);
3012 	if (err == DB_FAIL) {
3013 		mem_heap_empty(heap);
3014 
3015 		/* Try then pessimistic descent to the B-tree */
3016 
3017 		log_free_check();
3018 
3019 		err = row_ins_sec_index_entry_low(
3020 			0, BTR_MODIFY_TREE, index,
3021 			offsets_heap, heap, entry, 0, thr);
3022 	}
3023 
3024 	mem_heap_free(heap);
3025 	mem_heap_free(offsets_heap);
3026 	return(err);
3027 }
3028 
3029 /***************************************************************//**
3030 Inserts an index entry to index. Tries first optimistic, then pessimistic
3031 descent down the tree. If the entry matches enough to a delete marked record,
3032 performs the insert by updating or delete unmarking the delete marked
3033 record.
3034 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3035 static
3036 dberr_t
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)3037 row_ins_index_entry(
3038 /*================*/
3039 	dict_index_t*	index,	/*!< in: index */
3040 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
3041 	que_thr_t*	thr)	/*!< in: query thread */
3042 {
3043 	DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
3044 			DBUG_SET("-d,row_ins_index_entry_timeout");
3045 			return(DB_LOCK_WAIT);});
3046 
3047 	if (dict_index_is_clust(index)) {
3048 		return(row_ins_clust_index_entry(index, entry, thr, 0));
3049 	} else {
3050 		return(row_ins_sec_index_entry(index, entry, thr));
3051 	}
3052 }
3053 
3054 /***********************************************************//**
3055 Sets the values of the dtuple fields in entry from the values of appropriate
3056 columns in row. */
3057 static MY_ATTRIBUTE((nonnull))
3058 void
row_ins_index_entry_set_vals(dict_index_t * index,dtuple_t * entry,const dtuple_t * row)3059 row_ins_index_entry_set_vals(
3060 /*=========================*/
3061 	dict_index_t*	index,	/*!< in: index */
3062 	dtuple_t*	entry,	/*!< in: index entry to make */
3063 	const dtuple_t*	row)	/*!< in: row */
3064 {
3065 	ulint	n_fields;
3066 	ulint	i;
3067 
3068 	n_fields = dtuple_get_n_fields(entry);
3069 
3070 	for (i = 0; i < n_fields; i++) {
3071 		dict_field_t*	ind_field;
3072 		dfield_t*	field;
3073 		const dfield_t*	row_field;
3074 		ulint		len;
3075 
3076 		field = dtuple_get_nth_field(entry, i);
3077 		ind_field = dict_index_get_nth_field(index, i);
3078 		row_field = dtuple_get_nth_field(row, ind_field->col->ind);
3079 		len = dfield_get_len(row_field);
3080 
3081 		/* Check column prefix indexes */
3082 		if (ind_field->prefix_len > 0
3083 		    && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3084 
3085 			const	dict_col_t*	col
3086 				= dict_field_get_col(ind_field);
3087 
3088 			len = dtype_get_at_most_n_mbchars(
3089 				col->prtype, col->mbminmaxlen,
3090 				ind_field->prefix_len,
3091 				len,
3092 				static_cast<const char*>(
3093 					dfield_get_data(row_field)));
3094 
3095 			ut_ad(!dfield_is_ext(row_field));
3096 		}
3097 
3098 		dfield_set_data(field, dfield_get_data(row_field), len);
3099 		if (dfield_is_ext(row_field)) {
3100 			ut_ad(dict_index_is_clust(index));
3101 			dfield_set_ext(field);
3102 		}
3103 	}
3104 }
3105 
3106 /***********************************************************//**
3107 Inserts a single index entry to the table.
3108 @return DB_SUCCESS if operation successfully completed, else error
3109 code or DB_LOCK_WAIT */
3110 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3111 dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3112 row_ins_index_entry_step(
3113 /*=====================*/
3114 	ins_node_t*	node,	/*!< in: row insert node */
3115 	que_thr_t*	thr)	/*!< in: query thread */
3116 {
3117 	dberr_t	err;
3118 
3119 	ut_ad(dtuple_check_typed(node->row));
3120 
3121 	row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3122 
3123 	ut_ad(dtuple_check_typed(node->entry));
3124 
3125 	err = row_ins_index_entry(node->index, node->entry, thr);
3126 
3127 #ifdef UNIV_DEBUG
3128 	/* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
3129 	Once it is fixed, remove the 'ifdef', 'if' and this comment. */
3130 	if (!thr_get_trx(thr)->ddl) {
3131 		DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3132 				    "after_row_ins_index_entry_step");
3133 	}
3134 #endif /* UNIV_DEBUG */
3135 
3136 	return(err);
3137 }
3138 
3139 /***********************************************************//**
3140 Allocates a row id for row and inits the node->index field. */
3141 UNIV_INLINE
3142 void
row_ins_alloc_row_id_step(ins_node_t * node)3143 row_ins_alloc_row_id_step(
3144 /*======================*/
3145 	ins_node_t*	node)	/*!< in: row insert node */
3146 {
3147 	row_id_t	row_id;
3148 
3149 	ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3150 
3151 	if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3152 
3153 		/* No row id is stored if the clustered index is unique */
3154 
3155 		return;
3156 	}
3157 
3158 	/* Fill in row id value to row */
3159 
3160 	row_id = dict_sys_get_new_row_id();
3161 
3162 	dict_sys_write_row_id(node->row_id_buf, row_id);
3163 }
3164 
3165 /***********************************************************//**
3166 Gets a row to insert from the values list. */
3167 UNIV_INLINE
3168 void
row_ins_get_row_from_values(ins_node_t * node)3169 row_ins_get_row_from_values(
3170 /*========================*/
3171 	ins_node_t*	node)	/*!< in: row insert node */
3172 {
3173 	que_node_t*	list_node;
3174 	dfield_t*	dfield;
3175 	dtuple_t*	row;
3176 	ulint		i;
3177 
3178 	/* The field values are copied in the buffers of the select node and
3179 	it is safe to use them until we fetch from select again: therefore
3180 	we can just copy the pointers */
3181 
3182 	row = node->row;
3183 
3184 	i = 0;
3185 	list_node = node->values_list;
3186 
3187 	while (list_node) {
3188 		eval_exp(list_node);
3189 
3190 		dfield = dtuple_get_nth_field(row, i);
3191 		dfield_copy_data(dfield, que_node_get_val(list_node));
3192 
3193 		i++;
3194 		list_node = que_node_get_next(list_node);
3195 	}
3196 }
3197 
3198 /***********************************************************//**
3199 Gets a row to insert from the select list. */
3200 UNIV_INLINE
3201 void
row_ins_get_row_from_select(ins_node_t * node)3202 row_ins_get_row_from_select(
3203 /*========================*/
3204 	ins_node_t*	node)	/*!< in: row insert node */
3205 {
3206 	que_node_t*	list_node;
3207 	dfield_t*	dfield;
3208 	dtuple_t*	row;
3209 	ulint		i;
3210 
3211 	/* The field values are copied in the buffers of the select node and
3212 	it is safe to use them until we fetch from select again: therefore
3213 	we can just copy the pointers */
3214 
3215 	row = node->row;
3216 
3217 	i = 0;
3218 	list_node = node->select->select_list;
3219 
3220 	while (list_node) {
3221 		dfield = dtuple_get_nth_field(row, i);
3222 		dfield_copy_data(dfield, que_node_get_val(list_node));
3223 
3224 		i++;
3225 		list_node = que_node_get_next(list_node);
3226 	}
3227 }
3228 
3229 /***********************************************************//**
3230 Inserts a row to a table.
3231 @return DB_SUCCESS if operation successfully completed, else error
3232 code or DB_LOCK_WAIT */
3233 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3234 dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3235 row_ins(
3236 /*====*/
3237 	ins_node_t*	node,	/*!< in: row insert node */
3238 	que_thr_t*	thr)	/*!< in: query thread */
3239 {
3240 	dberr_t	err;
3241 
3242 	if (node->state == INS_NODE_ALLOC_ROW_ID) {
3243 
3244 		row_ins_alloc_row_id_step(node);
3245 
3246 		node->index = dict_table_get_first_index(node->table);
3247 		node->entry = UT_LIST_GET_FIRST(node->entry_list);
3248 
3249 		if (node->ins_type == INS_SEARCHED) {
3250 
3251 			row_ins_get_row_from_select(node);
3252 
3253 		} else if (node->ins_type == INS_VALUES) {
3254 
3255 			row_ins_get_row_from_values(node);
3256 		}
3257 
3258 		node->state = INS_NODE_INSERT_ENTRIES;
3259 	}
3260 
3261 	ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3262 
3263 	while (node->index != NULL) {
3264 		if (node->index->type != DICT_FTS) {
3265 			err = row_ins_index_entry_step(node, thr);
3266 
3267 			if (err != DB_SUCCESS) {
3268 
3269 				return(err);
3270 			}
3271 		}
3272 
3273 		node->index = dict_table_get_next_index(node->index);
3274 		node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3275 
3276 		DBUG_EXECUTE_IF(
3277 			"row_ins_skip_sec",
3278 			node->index = NULL; node->entry = NULL; break;);
3279 
3280 		/* Skip corrupted secondary index and its entry */
3281 		while (node->index && dict_index_is_corrupted(node->index)) {
3282 
3283 			node->index = dict_table_get_next_index(node->index);
3284 			node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3285 		}
3286 	}
3287 
3288 	ut_ad(node->entry == NULL);
3289 
3290 	node->state = INS_NODE_ALLOC_ROW_ID;
3291 
3292 	return(DB_SUCCESS);
3293 }
3294 
3295 /***********************************************************//**
3296 Inserts a row to a table. This is a high-level function used in SQL execution
3297 graphs.
3298 @return	query thread to run next or NULL */
3299 UNIV_INTERN
3300 que_thr_t*
row_ins_step(que_thr_t * thr)3301 row_ins_step(
3302 /*=========*/
3303 	que_thr_t*	thr)	/*!< in: query thread */
3304 {
3305 	ins_node_t*	node;
3306 	que_node_t*	parent;
3307 	sel_node_t*	sel_node;
3308 	trx_t*		trx;
3309 	dberr_t		err;
3310 
3311 	ut_ad(thr);
3312 
3313 	trx = thr_get_trx(thr);
3314 
3315 	trx_start_if_not_started_xa(trx);
3316 
3317 	node = static_cast<ins_node_t*>(thr->run_node);
3318 
3319 	ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3320 
3321 	parent = que_node_get_parent(node);
3322 	sel_node = node->select;
3323 
3324 	if (thr->prev_node == parent) {
3325 		node->state = INS_NODE_SET_IX_LOCK;
3326 	}
3327 
3328 	/* If this is the first time this node is executed (or when
3329 	execution resumes after wait for the table IX lock), set an
3330 	IX lock on the table and reset the possible select node. MySQL's
3331 	partitioned table code may also call an insert within the same
3332 	SQL statement AFTER it has used this table handle to do a search.
3333 	This happens, for example, when a row update moves it to another
3334 	partition. In that case, we have already set the IX lock on the
3335 	table during the search operation, and there is no need to set
3336 	it again here. But we must write trx->id to node->trx_id_buf. */
3337 
3338 	trx_write_trx_id(node->trx_id_buf, trx->id);
3339 
3340 	if (node->state == INS_NODE_SET_IX_LOCK) {
3341 
3342 		node->state = INS_NODE_ALLOC_ROW_ID;
3343 
3344 		/* It may be that the current session has not yet started
3345 		its transaction, or it has been committed: */
3346 
3347 		if (trx->id == node->trx_id) {
3348 			/* No need to do IX-locking */
3349 
3350 			goto same_trx;
3351 		}
3352 
3353 		err = lock_table(0, node->table, LOCK_IX, thr);
3354 
3355 		DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3356 				err = DB_LOCK_WAIT;);
3357 
3358 		if (err != DB_SUCCESS) {
3359 
3360 			goto error_handling;
3361 		}
3362 
3363 		node->trx_id = trx->id;
3364 same_trx:
3365 		if (node->ins_type == INS_SEARCHED) {
3366 			/* Reset the cursor */
3367 			sel_node->state = SEL_NODE_OPEN;
3368 
3369 			/* Fetch a row to insert */
3370 
3371 			thr->run_node = sel_node;
3372 
3373 			return(thr);
3374 		}
3375 	}
3376 
3377 	if ((node->ins_type == INS_SEARCHED)
3378 	    && (sel_node->state != SEL_NODE_FETCH)) {
3379 
3380 		ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3381 
3382 		/* No more rows to insert */
3383 		thr->run_node = parent;
3384 
3385 		return(thr);
3386 	}
3387 
3388 	/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3389 
3390 	err = row_ins(node, thr);
3391 
3392 error_handling:
3393 	trx->error_state = err;
3394 
3395 	if (err != DB_SUCCESS) {
3396 		/* err == DB_LOCK_WAIT or SQL error detected */
3397 		return(NULL);
3398 	}
3399 
3400 	/* DO THE TRIGGER ACTIONS HERE */
3401 
3402 	if (node->ins_type == INS_SEARCHED) {
3403 		/* Fetch a row to insert */
3404 
3405 		thr->run_node = sel_node;
3406 	} else {
3407 		thr->run_node = que_node_get_parent(node);
3408 	}
3409 
3410 	return(thr);
3411 }
3412