1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0ins.cc
29 Insert into a table
30 
31 Created 4/20/1996 Heikki Tuuri
32 *******************************************************/
33 
34 #include "row0ins.h"
35 
36 #ifdef UNIV_NONINL
37 #include "row0ins.ic"
38 #endif
39 
40 #include "ha_prototypes.h"
41 #include "dict0dict.h"
42 #include "dict0boot.h"
43 #include "trx0rec.h"
44 #include "trx0undo.h"
45 #include "btr0btr.h"
46 #include "btr0cur.h"
47 #include "mach0data.h"
48 #include "que0que.h"
49 #include "row0upd.h"
50 #include "row0sel.h"
51 #include "row0row.h"
52 #include "row0log.h"
53 #include "rem0cmp.h"
54 #include "lock0lock.h"
55 #include "log0log.h"
56 #include "eval0eval.h"
57 #include "data0data.h"
58 #include "usr0sess.h"
59 #include "buf0lru.h"
60 #include "fts0fts.h"
61 #include "fts0types.h"
62 #include "m_string.h"
63 
64 /*************************************************************************
65 IMPORTANT NOTE: Any operation that generates redo MUST check that there
66 is enough space in the redo log before for that operation. This is
67 done by calling log_free_check(). The reason for checking the
68 availability of the redo log space before the start of the operation is
69 that we MUST not hold any synchonization objects when performing the
70 check.
71 If you make a change in this module make sure that no codepath is
72 introduced where a call to log_free_check() is bypassed. */
73 
74 /*********************************************************************//**
75 Creates an insert node struct.
76 @return	own: insert node struct */
77 UNIV_INTERN
78 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)79 ins_node_create(
80 /*============*/
81 	ulint		ins_type,	/*!< in: INS_VALUES, ... */
82 	dict_table_t*	table,		/*!< in: table where to insert */
83 	mem_heap_t*	heap)		/*!< in: mem heap where created */
84 {
85 	ins_node_t*	node;
86 
87 	node = static_cast<ins_node_t*>(
88 		mem_heap_alloc(heap, sizeof(ins_node_t)));
89 
90 	node->common.type = QUE_NODE_INSERT;
91 
92 	node->ins_type = ins_type;
93 
94 	node->state = INS_NODE_SET_IX_LOCK;
95 	node->table = table;
96 	node->index = NULL;
97 	node->entry = NULL;
98 
99 	node->select = NULL;
100 
101 	node->trx_id = 0;
102 
103 	node->entry_sys_heap = mem_heap_create(128);
104 
105 	node->magic_n = INS_NODE_MAGIC_N;
106 
107 	return(node);
108 }
109 
110 /***********************************************************//**
111 Creates an entry template for each index of a table. */
112 static
113 void
ins_node_create_entry_list(ins_node_t * node)114 ins_node_create_entry_list(
115 /*=======================*/
116 	ins_node_t*	node)	/*!< in: row insert node */
117 {
118 	dict_index_t*	index;
119 	dtuple_t*	entry;
120 
121 	ut_ad(node->entry_sys_heap);
122 
123 	UT_LIST_INIT(node->entry_list);
124 
125 	/* We will include all indexes (include those corrupted
126 	secondary indexes) in the entry list. Filteration of
127 	these corrupted index will be done in row_ins() */
128 
129 	for (index = dict_table_get_first_index(node->table);
130 	     index != 0;
131 	     index = dict_table_get_next_index(index)) {
132 
133 		entry = row_build_index_entry(
134 			node->row, NULL, index, node->entry_sys_heap);
135 
136 		UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
137 	}
138 }
139 
140 /*****************************************************************//**
141 Adds system field buffers to a row. */
142 static
143 void
row_ins_alloc_sys_fields(ins_node_t * node)144 row_ins_alloc_sys_fields(
145 /*=====================*/
146 	ins_node_t*	node)	/*!< in: insert node */
147 {
148 	dtuple_t*		row;
149 	dict_table_t*		table;
150 	mem_heap_t*		heap;
151 	const dict_col_t*	col;
152 	dfield_t*		dfield;
153 	byte*			ptr;
154 
155 	row = node->row;
156 	table = node->table;
157 	heap = node->entry_sys_heap;
158 
159 	ut_ad(row && table && heap);
160 	ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
161 
162 	/* allocate buffer to hold the needed system created hidden columns. */
163 	uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
164 	ptr = static_cast<byte*>(mem_heap_zalloc(heap, len));
165 
166 	/* 1. Populate row-id */
167 	col = dict_table_get_sys_col(table, DATA_ROW_ID);
168 
169 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
170 
171 	dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
172 
173 	node->row_id_buf = ptr;
174 
175 	ptr += DATA_ROW_ID_LEN;
176 
177 	/* 2. Populate trx id */
178 	col = dict_table_get_sys_col(table, DATA_TRX_ID);
179 
180 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
181 
182 	dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
183 
184 	node->trx_id_buf = ptr;
185 
186 	ptr += DATA_TRX_ID_LEN;
187 
188 	/* 3. Populate roll ptr */
189 
190 	col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
191 
192 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
193 
194 	dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
195 }
196 
197 /*********************************************************************//**
198 Sets a new row to insert for an INS_DIRECT node. This function is only used
199 if we have constructed the row separately, which is a rare case; this
200 function is quite slow. */
201 UNIV_INTERN
202 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)203 ins_node_set_new_row(
204 /*=================*/
205 	ins_node_t*	node,	/*!< in: insert node */
206 	dtuple_t*	row)	/*!< in: new row (or first row) for the node */
207 {
208 	node->state = INS_NODE_SET_IX_LOCK;
209 	node->index = NULL;
210 	node->entry = NULL;
211 
212 	node->row = row;
213 
214 	mem_heap_empty(node->entry_sys_heap);
215 
216 	/* Create templates for index entries */
217 
218 	ins_node_create_entry_list(node);
219 
220 	/* Allocate from entry_sys_heap buffers for sys fields */
221 
222 	row_ins_alloc_sys_fields(node);
223 
224 	/* As we allocated a new trx id buf, the trx id should be written
225 	there again: */
226 
227 	node->trx_id = 0;
228 }
229 
230 /*******************************************************************//**
231 Does an insert operation by updating a delete-marked existing record
232 in the index. This situation can occur if the delete-marked record is
233 kept in the index for consistent reads.
234 @return	DB_SUCCESS or error code */
235 static MY_ATTRIBUTE((nonnull, warn_unused_result))
236 dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)237 row_ins_sec_index_entry_by_modify(
238 /*==============================*/
239 	ulint		flags,	/*!< in: undo logging and locking flags */
240 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
241 				depending on whether mtr holds just a leaf
242 				latch or also a tree latch */
243 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
244 	ulint**		offsets,/*!< in/out: offsets on cursor->page_cur.rec */
245 	mem_heap_t*	offsets_heap,
246 				/*!< in/out: memory heap that can be emptied */
247 	mem_heap_t*	heap,	/*!< in/out: memory heap */
248 	const dtuple_t*	entry,	/*!< in: index entry to insert */
249 	que_thr_t*	thr,	/*!< in: query thread */
250 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
251 				latching any further pages */
252 {
253 	big_rec_t*	dummy_big_rec;
254 	upd_t*		update;
255 	rec_t*		rec;
256 	dberr_t		err;
257 
258 	rec = btr_cur_get_rec(cursor);
259 
260 	ut_ad(!dict_index_is_clust(cursor->index));
261 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
262 	ut_ad(!entry->info_bits);
263 
264 	/* We know that in the alphabetical ordering, entry and rec are
265 	identified. But in their binary form there may be differences if
266 	there are char fields in them. Therefore we have to calculate the
267 	difference. */
268 
269 	update = row_upd_build_sec_rec_difference_binary(
270 		rec, cursor->index, *offsets, entry, heap);
271 
272 	if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
273 		/* We should never insert in place of a record that
274 		has not been delete-marked. The only exception is when
275 		online CREATE INDEX copied the changes that we already
276 		made to the clustered index, and completed the
277 		secondary index creation before we got here. In this
278 		case, the change would already be there. The CREATE
279 		INDEX should be waiting for a MySQL meta-data lock
280 		upgrade at least until this INSERT or UPDATE
281 		returns. After that point, the TEMP_INDEX_PREFIX
282 		would be dropped from the index name in
283 		commit_inplace_alter_table(). */
284 		ut_a(update->n_fields == 0);
285 		ut_a(*cursor->index->name == TEMP_INDEX_PREFIX);
286 		ut_ad(!dict_index_is_online_ddl(cursor->index));
287 		return(DB_SUCCESS);
288 	}
289 
290 	if (mode == BTR_MODIFY_LEAF) {
291 		/* Try an optimistic updating of the record, keeping changes
292 		within the page */
293 
294 		/* TODO: pass only *offsets */
295 		err = btr_cur_optimistic_update(
296 			flags | BTR_KEEP_SYS_FLAG, cursor,
297 			offsets, &offsets_heap, update, 0, thr,
298 			thr_get_trx(thr)->id, mtr);
299 		switch (err) {
300 		case DB_OVERFLOW:
301 		case DB_UNDERFLOW:
302 		case DB_ZIP_OVERFLOW:
303 			err = DB_FAIL;
304 		default:
305 			break;
306 		}
307 	} else {
308 		ut_a(mode == BTR_MODIFY_TREE);
309 		if (buf_LRU_buf_pool_running_out()) {
310 
311 			return(DB_LOCK_TABLE_FULL);
312 		}
313 
314 		err = btr_cur_pessimistic_update(
315 			flags | BTR_KEEP_SYS_FLAG, cursor,
316 			offsets, &offsets_heap,
317 			heap, &dummy_big_rec, update, 0,
318 			thr, thr_get_trx(thr)->id, mtr);
319 		ut_ad(!dummy_big_rec);
320 	}
321 
322 	return(err);
323 }
324 
325 /*******************************************************************//**
326 Does an insert operation by delete unmarking and updating a delete marked
327 existing record in the index. This situation can occur if the delete marked
328 record is kept in the index for consistent reads.
329 @return	DB_SUCCESS, DB_FAIL, or error code */
330 static MY_ATTRIBUTE((nonnull, warn_unused_result))
331 dberr_t
row_ins_clust_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,big_rec_t ** big_rec,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)332 row_ins_clust_index_entry_by_modify(
333 /*================================*/
334 	ulint		flags,	/*!< in: undo logging and locking flags */
335 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
336 				depending on whether mtr holds just a leaf
337 				latch or also a tree latch */
338 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
339 	ulint**		offsets,/*!< out: offsets on cursor->page_cur.rec */
340 	mem_heap_t**	offsets_heap,
341 				/*!< in/out: pointer to memory heap that can
342 				be emptied, or NULL */
343 	mem_heap_t*	heap,	/*!< in/out: memory heap */
344 	big_rec_t**	big_rec,/*!< out: possible big rec vector of fields
345 				which have to be stored externally by the
346 				caller */
347 	const dtuple_t*	entry,	/*!< in: index entry to insert */
348 	que_thr_t*	thr,	/*!< in: query thread */
349 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
350 				latching any further pages */
351 {
352 	const rec_t*	rec;
353 	const upd_t*	update;
354 	dberr_t		err;
355 
356 	ut_ad(dict_index_is_clust(cursor->index));
357 
358 	*big_rec = NULL;
359 
360 	rec = btr_cur_get_rec(cursor);
361 
362 	ut_ad(rec_get_deleted_flag(rec,
363 				   dict_table_is_comp(cursor->index->table)));
364 
365 	/* Build an update vector containing all the fields to be modified;
366 	NOTE that this vector may NOT contain system columns trx_id or
367 	roll_ptr */
368 
369 	update = row_upd_build_difference_binary(
370 		cursor->index, entry, rec, NULL, true,
371 		thr_get_trx(thr), heap);
372 	if (mode != BTR_MODIFY_TREE) {
373 		ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
374 
375 		/* Try optimistic updating of the record, keeping changes
376 		within the page */
377 
378 		err = btr_cur_optimistic_update(
379 			flags, cursor, offsets, offsets_heap, update, 0, thr,
380 			thr_get_trx(thr)->id, mtr);
381 		switch (err) {
382 		case DB_OVERFLOW:
383 		case DB_UNDERFLOW:
384 		case DB_ZIP_OVERFLOW:
385 			err = DB_FAIL;
386 		default:
387 			break;
388 		}
389 	} else {
390 		if (buf_LRU_buf_pool_running_out()) {
391 
392 			return(DB_LOCK_TABLE_FULL);
393 
394 		}
395 		err = btr_cur_pessimistic_update(
396 			flags | BTR_KEEP_POS_FLAG,
397 			cursor, offsets, offsets_heap, heap,
398 			big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
399 	}
400 
401 	return(err);
402 }
403 
404 /*********************************************************************//**
405 Returns TRUE if in a cascaded update/delete an ancestor node of node
406 updates (not DELETE, but UPDATE) table.
407 @return	TRUE if an ancestor updates table */
408 static
409 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)410 row_ins_cascade_ancestor_updates_table(
411 /*===================================*/
412 	que_node_t*	node,	/*!< in: node in a query graph */
413 	dict_table_t*	table)	/*!< in: table */
414 {
415 	que_node_t*	parent;
416 
417 	for (parent = que_node_get_parent(node);
418 	     que_node_get_type(parent) == QUE_NODE_UPDATE;
419 	     parent = que_node_get_parent(parent)) {
420 
421 		upd_node_t*	upd_node;
422 
423 		upd_node = static_cast<upd_node_t*>(parent);
424 
425 		if (upd_node->table == table && upd_node->is_delete == FALSE) {
426 
427 			return(TRUE);
428 		}
429 	}
430 
431 	return(FALSE);
432 }
433 
434 /*********************************************************************//**
435 Returns the number of ancestor UPDATE or DELETE nodes of a
436 cascaded update/delete node.
437 @return	number of ancestors */
438 static MY_ATTRIBUTE((nonnull, warn_unused_result))
439 ulint
row_ins_cascade_n_ancestors(que_node_t * node)440 row_ins_cascade_n_ancestors(
441 /*========================*/
442 	que_node_t*	node)	/*!< in: node in a query graph */
443 {
444 	que_node_t*	parent;
445 	ulint		n_ancestors = 0;
446 
447 	for (parent = que_node_get_parent(node);
448 	     que_node_get_type(parent) == QUE_NODE_UPDATE;
449 	     parent = que_node_get_parent(parent)) {
450 
451 		n_ancestors++;
452 	}
453 
454 	return(n_ancestors);
455 }
456 
457 /******************************************************************//**
458 Calculates the update vector node->cascade->update for a child table in
459 a cascaded update.
460 @return number of fields in the calculated update vector; the value
461 can also be 0 if no foreign key fields changed; the returned value is
462 ULINT_UNDEFINED if the column type in the child table is too short to
463 fit the new value in the parent table: that means the update fails */
464 static MY_ATTRIBUTE((nonnull, warn_unused_result))
465 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)466 row_ins_cascade_calc_update_vec(
467 /*============================*/
468 	upd_node_t*	node,		/*!< in: update node of the parent
469 					table */
470 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
471 					type is != 0 */
472 	mem_heap_t*	heap,		/*!< in: memory heap to use as
473 					temporary storage */
474 	trx_t*		trx,		/*!< in: update transaction */
475 	ibool*		fts_col_affected)/*!< out: is FTS column affected */
476 {
477 	upd_node_t*	cascade		= node->cascade_node;
478 	dict_table_t*	table		= foreign->foreign_table;
479 	dict_index_t*	index		= foreign->foreign_index;
480 	upd_t*		update;
481 	dict_table_t*	parent_table;
482 	dict_index_t*	parent_index;
483 	upd_t*		parent_update;
484 	ulint		n_fields_updated;
485 	ulint		parent_field_no;
486 	ulint		i;
487 	ulint		j;
488 	ibool		doc_id_updated = FALSE;
489 	ulint		doc_id_pos = 0;
490 	doc_id_t	new_doc_id = FTS_NULL_DOC_ID;
491 
492 	ut_a(node);
493 	ut_a(foreign);
494 	ut_a(cascade);
495 	ut_a(table);
496 	ut_a(index);
497 
498 	/* Calculate the appropriate update vector which will set the fields
499 	in the child index record to the same value (possibly padded with
500 	spaces if the column is a fixed length CHAR or FIXBINARY column) as
501 	the referenced index record will get in the update. */
502 
503 	parent_table = node->table;
504 	ut_a(parent_table == foreign->referenced_table);
505 	parent_index = foreign->referenced_index;
506 	parent_update = node->update;
507 
508 	update = cascade->update;
509 
510 	update->info_bits = 0;
511 	update->n_fields = foreign->n_fields;
512 
513 	n_fields_updated = 0;
514 
515 	*fts_col_affected = FALSE;
516 
517 	if (table->fts) {
518 		doc_id_pos = dict_table_get_nth_col_pos(
519 			table, table->fts->doc_col);
520 	}
521 
522 	for (i = 0; i < foreign->n_fields; i++) {
523 
524 		parent_field_no = dict_table_get_nth_col_pos(
525 			parent_table,
526 			dict_index_get_nth_col_no(parent_index, i));
527 
528 		for (j = 0; j < parent_update->n_fields; j++) {
529 			const upd_field_t*	parent_ufield
530 				= &parent_update->fields[j];
531 
532 			if (parent_ufield->field_no == parent_field_no) {
533 
534 				ulint			min_size;
535 				const dict_col_t*	col;
536 				ulint			ufield_len;
537 				upd_field_t*		ufield;
538 
539 				col = dict_index_get_nth_col(index, i);
540 
541 				/* A field in the parent index record is
542 				updated. Let us make the update vector
543 				field for the child table. */
544 
545 				ufield = update->fields + n_fields_updated;
546 
547 				ufield->field_no
548 					= dict_table_get_nth_col_pos(
549 					table, dict_col_get_no(col));
550 
551 				ufield->orig_len = 0;
552 				ufield->exp = NULL;
553 
554 				ufield->new_val = parent_ufield->new_val;
555 				ufield_len = dfield_get_len(&ufield->new_val);
556 
557 				/* Clear the "external storage" flag */
558 				dfield_set_len(&ufield->new_val, ufield_len);
559 
560 				/* Do not allow a NOT NULL column to be
561 				updated as NULL */
562 
563 				if (dfield_is_null(&ufield->new_val)
564 				    && (col->prtype & DATA_NOT_NULL)) {
565 
566 					return(ULINT_UNDEFINED);
567 				}
568 
569 				/* If the new value would not fit in the
570 				column, do not allow the update */
571 
572 				if (!dfield_is_null(&ufield->new_val)
573 				    && dtype_get_at_most_n_mbchars(
574 					col->prtype, col->mbminmaxlen,
575 					col->len,
576 					ufield_len,
577 					static_cast<char*>(
578 						dfield_get_data(
579 							&ufield->new_val)))
580 				    < ufield_len) {
581 
582 					return(ULINT_UNDEFINED);
583 				}
584 
585 				/* If the parent column type has a different
586 				length than the child column type, we may
587 				need to pad with spaces the new value of the
588 				child column */
589 
590 				min_size = dict_col_get_min_size(col);
591 
592 				/* Because UNIV_SQL_NULL (the marker
593 				of SQL NULL values) exceeds all possible
594 				values of min_size, the test below will
595 				not hold for SQL NULL columns. */
596 
597 				if (min_size > ufield_len) {
598 
599 					byte*	pad;
600 					ulint	pad_len;
601 					byte*	padded_data;
602 					ulint	mbminlen;
603 
604 					padded_data = static_cast<byte*>(
605 						mem_heap_alloc(
606 							heap, min_size));
607 
608 					pad = padded_data + ufield_len;
609 					pad_len = min_size - ufield_len;
610 
611 					memcpy(padded_data,
612 					       dfield_get_data(&ufield
613 							       ->new_val),
614 					       ufield_len);
615 
616 					mbminlen = dict_col_get_mbminlen(col);
617 
618 					ut_ad(!(ufield_len % mbminlen));
619 					ut_ad(!(min_size % mbminlen));
620 
621 					if (mbminlen == 1
622 					    && dtype_get_charset_coll(
623 						    col->prtype)
624 					    == DATA_MYSQL_BINARY_CHARSET_COLL) {
625 						/* Do not pad BINARY columns */
626 						return(ULINT_UNDEFINED);
627 					}
628 
629 					row_mysql_pad_col(mbminlen,
630 							  pad, pad_len);
631 					dfield_set_data(&ufield->new_val,
632 							padded_data, min_size);
633 				}
634 
635 				/* Check whether the current column has
636 				FTS index on it */
637 				if (table->fts
638 				    && dict_table_is_fts_column(
639 					table->fts->indexes,
640 					dict_col_get_no(col))
641 					!= ULINT_UNDEFINED) {
642 					*fts_col_affected = TRUE;
643 				}
644 
645 				/* If Doc ID is updated, check whether the
646 				Doc ID is valid */
647 				if (table->fts
648 				    && ufield->field_no == doc_id_pos) {
649 					doc_id_t	n_doc_id;
650 
651 					n_doc_id =
652 						table->fts->cache->next_doc_id;
653 
654 					new_doc_id = fts_read_doc_id(
655 						static_cast<const byte*>(
656 							dfield_get_data(
657 							&ufield->new_val)));
658 
659 					if (new_doc_id <= 0) {
660 						fprintf(stderr,
661 							"InnoDB: FTS Doc ID "
662 							"must be larger than "
663 							"0 \n");
664 						return(ULINT_UNDEFINED);
665 					}
666 
667 					if (new_doc_id < n_doc_id) {
668 						fprintf(stderr,
669 						       "InnoDB: FTS Doc ID "
670 						       "must be larger than "
671 						       IB_ID_FMT" for table",
672 						       n_doc_id -1);
673 
674 						ut_print_name(stderr, trx,
675 							      TRUE,
676 							      table->name);
677 
678 						putc('\n', stderr);
679 						return(ULINT_UNDEFINED);
680 					}
681 
682 					*fts_col_affected = TRUE;
683 					doc_id_updated = TRUE;
684 				}
685 
686 				n_fields_updated++;
687 			}
688 		}
689 	}
690 
691 	/* Generate a new Doc ID if FTS index columns get updated */
692 	if (table->fts && *fts_col_affected) {
693 		if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
694 			doc_id_t	doc_id;
695                         upd_field_t*	ufield;
696 
697 			ut_ad(!doc_id_updated);
698 			ufield = update->fields + n_fields_updated;
699 			fts_get_next_doc_id(table, &trx->fts_next_doc_id);
700 			doc_id = fts_update_doc_id(table, ufield,
701 						   &trx->fts_next_doc_id);
702 			n_fields_updated++;
703 			fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
704 		} else  {
705 			if (doc_id_updated) {
706 				ut_ad(new_doc_id);
707 				fts_trx_add_op(trx, table, new_doc_id,
708 					       FTS_INSERT, NULL);
709 			} else {
710 				fprintf(stderr, "InnoDB: FTS Doc ID must be "
711 					"updated along with FTS indexed "
712 					"column for table ");
713 				ut_print_name(stderr, trx, TRUE, table->name);
714 				putc('\n', stderr);
715 				return(ULINT_UNDEFINED);
716 			}
717 		}
718 	}
719 
720 	update->n_fields = n_fields_updated;
721 
722 	return(n_fields_updated);
723 }
724 
725 /*********************************************************************//**
726 Set detailed error message associated with foreign key errors for
727 the given transaction. */
728 static
729 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)730 row_ins_set_detailed(
731 /*=================*/
732 	trx_t*		trx,		/*!< in: transaction */
733 	dict_foreign_t*	foreign)	/*!< in: foreign key constraint */
734 {
735 	ut_ad(!srv_read_only_mode);
736 
737 	mutex_enter(&srv_misc_tmpfile_mutex);
738 	rewind(srv_misc_tmpfile);
739 
740 	if (os_file_set_eof(srv_misc_tmpfile)) {
741 		ut_print_name(srv_misc_tmpfile, trx, TRUE,
742 			      foreign->foreign_table_name);
743 		dict_print_info_on_foreign_key_in_create_format(
744 			srv_misc_tmpfile, trx, foreign, FALSE);
745 		trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
746 	} else {
747 		trx_set_detailed_error(trx, "temp file operation failed");
748 	}
749 
750 	mutex_exit(&srv_misc_tmpfile_mutex);
751 }
752 
753 /*********************************************************************//**
754 Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
755 and displays information about the given transaction.
756 The caller must release dict_foreign_err_mutex. */
757 static
758 void
row_ins_foreign_trx_print(trx_t * trx)759 row_ins_foreign_trx_print(
760 /*======================*/
761 	trx_t*	trx)	/*!< in: transaction */
762 {
763 	ulint	n_rec_locks;
764 	ulint	n_trx_locks;
765 	ulint	heap_size;
766 
767 	if (srv_read_only_mode) {
768 		return;
769 	}
770 
771 	lock_mutex_enter();
772 	n_rec_locks = lock_number_of_rows_locked(&trx->lock);
773 	n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
774 	heap_size = mem_heap_get_size(trx->lock.lock_heap);
775 	lock_mutex_exit();
776 
777 	mutex_enter(&trx_sys->mutex);
778 
779 	mutex_enter(&dict_foreign_err_mutex);
780 	rewind(dict_foreign_err_file);
781 	ut_print_timestamp(dict_foreign_err_file);
782 	fputs(" Transaction:\n", dict_foreign_err_file);
783 
784 	trx_print_low(dict_foreign_err_file, trx, 600,
785 		      n_rec_locks, n_trx_locks, heap_size);
786 
787 	mutex_exit(&trx_sys->mutex);
788 
789 	ut_ad(mutex_own(&dict_foreign_err_mutex));
790 }
791 
792 /*********************************************************************//**
793 Reports a foreign key error associated with an update or a delete of a
794 parent table index entry. */
795 static
796 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)797 row_ins_foreign_report_err(
798 /*=======================*/
799 	const char*	errstr,		/*!< in: error string from the viewpoint
800 					of the parent table */
801 	que_thr_t*	thr,		/*!< in: query thread whose run_node
802 					is an update node */
803 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
804 	const rec_t*	rec,		/*!< in: a matching index record in the
805 					child table */
806 	const dtuple_t*	entry)		/*!< in: index entry in the parent
807 					table */
808 {
809 	if (srv_read_only_mode) {
810 		return;
811 	}
812 
813 	FILE*	ef	= dict_foreign_err_file;
814 	trx_t*	trx	= thr_get_trx(thr);
815 
816 	row_ins_set_detailed(trx, foreign);
817 
818 	row_ins_foreign_trx_print(trx);
819 
820 	fputs("Foreign key constraint fails for table ", ef);
821 	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
822 	fputs(":\n", ef);
823 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
824 							TRUE);
825 	putc('\n', ef);
826 	fputs(errstr, ef);
827 	fputs(" in parent table, in index ", ef);
828 	ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
829 	if (entry) {
830 		fputs(" tuple:\n", ef);
831 		dtuple_print(ef, entry);
832 	}
833 	fputs("\nBut in child table ", ef);
834 	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
835 	fputs(", in index ", ef);
836 	ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
837 	if (rec) {
838 		fputs(", there is a record:\n", ef);
839 		rec_print(ef, rec, foreign->foreign_index);
840 	} else {
841 		fputs(", the record is not available\n", ef);
842 	}
843 	putc('\n', ef);
844 
845 	mutex_exit(&dict_foreign_err_mutex);
846 }
847 
848 /*********************************************************************//**
849 Reports a foreign key error to dict_foreign_err_file when we are trying
850 to add an index entry to a child table. Note that the adding may be the result
851 of an update, too. */
852 static
853 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)854 row_ins_foreign_report_add_err(
855 /*===========================*/
856 	trx_t*		trx,		/*!< in: transaction */
857 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
858 	const rec_t*	rec,		/*!< in: a record in the parent table:
859 					it does not match entry because we
860 					have an error! */
861 	const dtuple_t*	entry)		/*!< in: index entry to insert in the
862 					child table */
863 {
864 	if (srv_read_only_mode) {
865 		return;
866 	}
867 
868 	FILE*	ef	= dict_foreign_err_file;
869 
870 	row_ins_set_detailed(trx, foreign);
871 
872 	row_ins_foreign_trx_print(trx);
873 
874 	fputs("Foreign key constraint fails for table ", ef);
875 	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
876 	fputs(":\n", ef);
877 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
878 							TRUE);
879 	fputs("\nTrying to add in child table, in index ", ef);
880 	ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
881 	if (entry) {
882 		fputs(" tuple:\n", ef);
883 		/* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
884 		It would be better to only display the user columns. */
885 		dtuple_print(ef, entry);
886 	}
887 	fputs("\nBut in parent table ", ef);
888 	ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
889 	fputs(", in index ", ef);
890 	ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
891 	fputs(",\nthe closest match we can find is record:\n", ef);
892 	if (rec && page_rec_is_supremum(rec)) {
893 		/* If the cursor ended on a supremum record, it is better
894 		to report the previous record in the error message, so that
895 		the user gets a more descriptive error message. */
896 		rec = page_rec_get_prev_const(rec);
897 	}
898 
899 	if (rec) {
900 		rec_print(ef, rec, foreign->referenced_index);
901 	}
902 	putc('\n', ef);
903 
904 	mutex_exit(&dict_foreign_err_mutex);
905 }
906 
907 /*********************************************************************//**
908 Invalidate the query cache for the given table. */
909 static
910 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)911 row_ins_invalidate_query_cache(
912 /*===========================*/
913 	que_thr_t*	thr,		/*!< in: query thread whose run_node
914 					is an update node */
915 	const char*	name)		/*!< in: table name prefixed with
916 					database name and a '/' character */
917 {
918 	char*	buf;
919 	char*	ptr;
920 	ulint	len = strlen(name) + 1;
921 
922 	buf = mem_strdupl(name, len);
923 
924 	ptr = strchr(buf, '/');
925 	ut_a(ptr);
926 	*ptr = '\0';
927 
928 	innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
929 	mem_free(buf);
930 }
931 
932 /*********************************************************************//**
933 Perform referential actions or checks when a parent row is deleted or updated
934 and the constraint had an ON DELETE or ON UPDATE condition which was not
935 RESTRICT.
936 @return	DB_SUCCESS, DB_LOCK_WAIT, or error code */
937 static MY_ATTRIBUTE((nonnull, warn_unused_result))
938 dberr_t
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)939 row_ins_foreign_check_on_constraint(
940 /*================================*/
941 	que_thr_t*	thr,		/*!< in: query thread whose run_node
942 					is an update node */
943 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
944 					type is != 0 */
945 	btr_pcur_t*	pcur,		/*!< in: cursor placed on a matching
946 					index record in the child table */
947 	dtuple_t*	entry,		/*!< in: index entry in the parent
948 					table */
949 	mtr_t*		mtr)		/*!< in: mtr holding the latch of pcur
950 					page */
951 {
952 	upd_node_t*	node;
953 	upd_node_t*	cascade;
954 	dict_table_t*	table		= foreign->foreign_table;
955 	dict_index_t*	index;
956 	dict_index_t*	clust_index;
957 	dtuple_t*	ref;
958 	mem_heap_t*	upd_vec_heap	= NULL;
959 	const rec_t*	rec;
960 	const rec_t*	clust_rec;
961 	const buf_block_t* clust_block;
962 	upd_t*		update;
963 	ulint		n_to_update;
964 	dberr_t		err;
965 	ulint		i;
966 	trx_t*		trx;
967 	mem_heap_t*	tmp_heap	= NULL;
968 	doc_id_t	doc_id = FTS_NULL_DOC_ID;
969 	ibool		fts_col_affacted = FALSE;
970 
971 	ut_a(thr);
972 	ut_a(foreign);
973 	ut_a(pcur);
974 	ut_a(mtr);
975 
976 	trx = thr_get_trx(thr);
977 
978 	/* Since we are going to delete or update a row, we have to invalidate
979 	the MySQL query cache for table. A deadlock of threads is not possible
980 	here because the caller of this function does not hold any latches with
981 	the sync0sync.h rank above the lock_sys_t::mutex. The query cache mutex
982        	has a rank just above the lock_sys_t::mutex. */
983 
984 	row_ins_invalidate_query_cache(thr, table->name);
985 
986 	node = static_cast<upd_node_t*>(thr->run_node);
987 
988 	if (node->is_delete && 0 == (foreign->type
989 				     & (DICT_FOREIGN_ON_DELETE_CASCADE
990 					| DICT_FOREIGN_ON_DELETE_SET_NULL))) {
991 
992 		row_ins_foreign_report_err("Trying to delete",
993 					   thr, foreign,
994 					   btr_pcur_get_rec(pcur), entry);
995 
996 		return(DB_ROW_IS_REFERENCED);
997 	}
998 
999 	if (!node->is_delete && 0 == (foreign->type
1000 				      & (DICT_FOREIGN_ON_UPDATE_CASCADE
1001 					 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
1002 
1003 		/* This is an UPDATE */
1004 
1005 		row_ins_foreign_report_err("Trying to update",
1006 					   thr, foreign,
1007 					   btr_pcur_get_rec(pcur), entry);
1008 
1009 		return(DB_ROW_IS_REFERENCED);
1010 	}
1011 
1012 	if (node->cascade_node == NULL) {
1013 		/* Extend our query graph by creating a child to current
1014 		update node. The child is used in the cascade or set null
1015 		operation. */
1016 
1017 		node->cascade_heap = mem_heap_create(128);
1018 		node->cascade_node = row_create_update_node_for_mysql(
1019 			table, node->cascade_heap);
1020 		que_node_set_parent(node->cascade_node, node);
1021 	}
1022 
1023 	/* Initialize cascade_node to do the operation we want. Note that we
1024 	use the SAME cascade node to do all foreign key operations of the
1025 	SQL DELETE: the table of the cascade node may change if there are
1026 	several child tables to the table where the delete is done! */
1027 
1028 	cascade = node->cascade_node;
1029 
1030 	cascade->table = table;
1031 
1032 	cascade->foreign = foreign;
1033 
1034 	if (node->is_delete
1035 	    && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1036 		cascade->is_delete = TRUE;
1037 	} else {
1038 		cascade->is_delete = FALSE;
1039 
1040 		if (foreign->n_fields > cascade->update_n_fields) {
1041 			/* We have to make the update vector longer */
1042 
1043 			cascade->update = upd_create(foreign->n_fields,
1044 						     node->cascade_heap);
1045 			cascade->update_n_fields = foreign->n_fields;
1046 		}
1047 	}
1048 
1049 	/* We do not allow cyclic cascaded updating (DELETE is allowed,
1050 	but not UPDATE) of the same table, as this can lead to an infinite
1051 	cycle. Check that we are not updating the same table which is
1052 	already being modified in this cascade chain. We have to check
1053 	this also because the modification of the indexes of a 'parent'
1054 	table may still be incomplete, and we must avoid seeing the indexes
1055 	of the parent table in an inconsistent state! */
1056 
1057 	if (!cascade->is_delete
1058 	    && row_ins_cascade_ancestor_updates_table(cascade, table)) {
1059 
1060 		/* We do not know if this would break foreign key
1061 		constraints, but play safe and return an error */
1062 
1063 		err = DB_ROW_IS_REFERENCED;
1064 
1065 		row_ins_foreign_report_err(
1066 			"Trying an update, possibly causing a cyclic"
1067 			" cascaded update\n"
1068 			"in the child table,", thr, foreign,
1069 			btr_pcur_get_rec(pcur), entry);
1070 
1071 		goto nonstandard_exit_func;
1072 	}
1073 
1074 	if (row_ins_cascade_n_ancestors(cascade) >= 15) {
1075 		err = DB_ROW_IS_REFERENCED;
1076 
1077 		row_ins_foreign_report_err(
1078 			"Trying a too deep cascaded delete or update\n",
1079 			thr, foreign, btr_pcur_get_rec(pcur), entry);
1080 
1081 		goto nonstandard_exit_func;
1082 	}
1083 
1084 	index = btr_pcur_get_btr_cur(pcur)->index;
1085 
1086 	ut_a(index == foreign->foreign_index);
1087 
1088 	rec = btr_pcur_get_rec(pcur);
1089 
1090 	tmp_heap = mem_heap_create(256);
1091 
1092 	if (dict_index_is_clust(index)) {
1093 		/* pcur is already positioned in the clustered index of
1094 		the child table */
1095 
1096 		clust_index = index;
1097 		clust_rec = rec;
1098 		clust_block = btr_pcur_get_block(pcur);
1099 	} else {
1100 		/* We have to look for the record in the clustered index
1101 		in the child table */
1102 
1103 		clust_index = dict_table_get_first_index(table);
1104 
1105 		ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1106 					tmp_heap);
1107 		btr_pcur_open_with_no_init(clust_index, ref,
1108 					   PAGE_CUR_LE, BTR_SEARCH_LEAF,
1109 					   cascade->pcur, 0, mtr);
1110 
1111 		clust_rec = btr_pcur_get_rec(cascade->pcur);
1112 		clust_block = btr_pcur_get_block(cascade->pcur);
1113 
1114 		if (!page_rec_is_user_rec(clust_rec)
1115 		    || btr_pcur_get_low_match(cascade->pcur)
1116 		    < dict_index_get_n_unique(clust_index)) {
1117 
1118 			fputs("InnoDB: error in cascade of a foreign key op\n"
1119 			      "InnoDB: ", stderr);
1120 			dict_index_name_print(stderr, trx, index);
1121 
1122 			fputs("\n"
1123 			      "InnoDB: record ", stderr);
1124 			rec_print(stderr, rec, index);
1125 			fputs("\n"
1126 			      "InnoDB: clustered record ", stderr);
1127 			rec_print(stderr, clust_rec, clust_index);
1128 			fputs("\n"
1129 			      "InnoDB: Submit a detailed bug report to"
1130 			      " http://bugs.mysql.com\n", stderr);
1131 			ut_ad(0);
1132 			err = DB_SUCCESS;
1133 
1134 			goto nonstandard_exit_func;
1135 		}
1136 	}
1137 
1138 	/* Set an X-lock on the row to delete or update in the child table */
1139 
1140 	err = lock_table(0, table, LOCK_IX, thr);
1141 
1142 	if (err == DB_SUCCESS) {
1143 		/* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1144 		we already have a normal shared lock on the appropriate
1145 		gap if the search criterion was not unique */
1146 
1147 		err = lock_clust_rec_read_check_and_lock_alt(
1148 			0, clust_block, clust_rec, clust_index,
1149 			LOCK_X, LOCK_REC_NOT_GAP, thr);
1150 	}
1151 
1152 	if (err != DB_SUCCESS) {
1153 
1154 		goto nonstandard_exit_func;
1155 	}
1156 
1157 	if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1158 		/* This can happen if there is a circular reference of
1159 		rows such that cascading delete comes to delete a row
1160 		already in the process of being delete marked */
1161 		err = DB_SUCCESS;
1162 
1163 		goto nonstandard_exit_func;
1164 	}
1165 
1166 	if (table->fts) {
1167 		doc_id = fts_get_doc_id_from_rec(table, clust_rec, tmp_heap);
1168 	}
1169 
1170 	if (node->is_delete
1171 	    ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1172 	    : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1173 
1174 		/* Build the appropriate update vector which sets
1175 		foreign->n_fields first fields in rec to SQL NULL */
1176 
1177 		update = cascade->update;
1178 
1179 		update->info_bits = 0;
1180 		update->n_fields = foreign->n_fields;
1181 		UNIV_MEM_INVALID(update->fields,
1182 				 update->n_fields * sizeof *update->fields);
1183 
1184 		for (i = 0; i < foreign->n_fields; i++) {
1185 			upd_field_t*	ufield = &update->fields[i];
1186 
1187 			ufield->field_no = dict_table_get_nth_col_pos(
1188 				table,
1189 				dict_index_get_nth_col_no(index, i));
1190 			ufield->orig_len = 0;
1191 			ufield->exp = NULL;
1192 			dfield_set_null(&ufield->new_val);
1193 
1194 			if (table->fts && dict_table_is_fts_column(
1195 				table->fts->indexes,
1196 				dict_index_get_nth_col_no(index, i))
1197 				!= ULINT_UNDEFINED) {
1198 				fts_col_affacted = TRUE;
1199 			}
1200 		}
1201 
1202 		if (fts_col_affacted) {
1203 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1204 		}
1205 	} else if (table->fts && cascade->is_delete) {
1206 		/* DICT_FOREIGN_ON_DELETE_CASCADE case */
1207 		for (i = 0; i < foreign->n_fields; i++) {
1208 			if (table->fts && dict_table_is_fts_column(
1209 				table->fts->indexes,
1210 				dict_index_get_nth_col_no(index, i))
1211 				!= ULINT_UNDEFINED) {
1212 				fts_col_affacted = TRUE;
1213 			}
1214 		}
1215 
1216 		if (fts_col_affacted) {
1217 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1218 		}
1219 	}
1220 
1221 	if (!node->is_delete
1222 	    && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1223 
1224 		/* Build the appropriate update vector which sets changing
1225 		foreign->n_fields first fields in rec to new values */
1226 
1227 		upd_vec_heap = mem_heap_create(256);
1228 
1229 		n_to_update = row_ins_cascade_calc_update_vec(
1230 			node, foreign, upd_vec_heap, trx, &fts_col_affacted);
1231 
1232 		if (n_to_update == ULINT_UNDEFINED) {
1233 			err = DB_ROW_IS_REFERENCED;
1234 
1235 			row_ins_foreign_report_err(
1236 				"Trying a cascaded update where the"
1237 				" updated value in the child\n"
1238 				"table would not fit in the length"
1239 				" of the column, or the value would\n"
1240 				"be NULL and the column is"
1241 				" declared as not NULL in the child table,",
1242 				thr, foreign, btr_pcur_get_rec(pcur), entry);
1243 
1244 			goto nonstandard_exit_func;
1245 		}
1246 
1247 		if (cascade->update->n_fields == 0) {
1248 
1249 			/* The update does not change any columns referred
1250 			to in this foreign key constraint: no need to do
1251 			anything */
1252 
1253 			err = DB_SUCCESS;
1254 
1255 			goto nonstandard_exit_func;
1256 		}
1257 
1258 		/* Mark the old Doc ID as deleted */
1259 		if (fts_col_affacted) {
1260 			ut_ad(table->fts);
1261 			fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1262 		}
1263 	}
1264 
1265 	/* Store pcur position and initialize or store the cascade node
1266 	pcur stored position */
1267 
1268 	btr_pcur_store_position(pcur, mtr);
1269 
1270 	if (index == clust_index) {
1271 		btr_pcur_copy_stored_position(cascade->pcur, pcur);
1272 	} else {
1273 		btr_pcur_store_position(cascade->pcur, mtr);
1274 	}
1275 
1276 	mtr_commit(mtr);
1277 
1278 	ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1279 
1280 	cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1281 
1282 	err = row_update_cascade_for_mysql(thr, cascade,
1283 					   foreign->foreign_table);
1284 
1285 	if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
1286 		fprintf(stderr,
1287 			"InnoDB: error: table %s has the counter 0"
1288 			" though there is\n"
1289 			"InnoDB: a FOREIGN KEY check running on it.\n",
1290 			foreign->foreign_table->name);
1291 	}
1292 
1293 	/* Release the data dictionary latch for a while, so that we do not
1294 	starve other threads from doing CREATE TABLE etc. if we have a huge
1295 	cascaded operation running. The counter n_foreign_key_checks_running
1296 	will prevent other users from dropping or ALTERing the table when we
1297 	release the latch. */
1298 
1299 	row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1300 
1301 	DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1302 
1303 	row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1304 
1305 	mtr_start(mtr);
1306 
1307 	/* Restore pcur position */
1308 
1309 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1310 
1311 	if (tmp_heap) {
1312 		mem_heap_free(tmp_heap);
1313 	}
1314 
1315 	if (upd_vec_heap) {
1316 		mem_heap_free(upd_vec_heap);
1317 	}
1318 
1319 	return(err);
1320 
1321 nonstandard_exit_func:
1322 	if (tmp_heap) {
1323 		mem_heap_free(tmp_heap);
1324 	}
1325 
1326 	if (upd_vec_heap) {
1327 		mem_heap_free(upd_vec_heap);
1328 	}
1329 
1330 	btr_pcur_store_position(pcur, mtr);
1331 
1332 	mtr_commit(mtr);
1333 	mtr_start(mtr);
1334 
1335 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1336 
1337 	return(err);
1338 }
1339 
1340 /*********************************************************************//**
1341 Sets a shared lock on a record. Used in locking possible duplicate key
1342 records and also in checking foreign key constraints.
1343 @return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1344 static
1345 dberr_t
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1346 row_ins_set_shared_rec_lock(
1347 /*========================*/
1348 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1349 					LOCK_REC_NOT_GAP type lock */
1350 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1351 	const rec_t*		rec,	/*!< in: record */
1352 	dict_index_t*		index,	/*!< in: index */
1353 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1354 	que_thr_t*		thr)	/*!< in: query thread */
1355 {
1356 	dberr_t	err;
1357 
1358 	ut_ad(rec_offs_validate(rec, index, offsets));
1359 
1360 	if (dict_index_is_clust(index)) {
1361 		err = lock_clust_rec_read_check_and_lock(
1362 			0, block, rec, index, offsets, LOCK_S, type, thr);
1363 	} else {
1364 		err = lock_sec_rec_read_check_and_lock(
1365 			0, block, rec, index, offsets, LOCK_S, type, thr);
1366 	}
1367 
1368 	return(err);
1369 }
1370 
1371 /*********************************************************************//**
1372 Sets a exclusive lock on a record. Used in locking possible duplicate key
1373 records
1374 @return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1375 static
1376 dberr_t
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1377 row_ins_set_exclusive_rec_lock(
1378 /*===========================*/
1379 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1380 					LOCK_REC_NOT_GAP type lock */
1381 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1382 	const rec_t*		rec,	/*!< in: record */
1383 	dict_index_t*		index,	/*!< in: index */
1384 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1385 	que_thr_t*		thr)	/*!< in: query thread */
1386 {
1387 	dberr_t	err;
1388 
1389 	ut_ad(rec_offs_validate(rec, index, offsets));
1390 
1391 	if (dict_index_is_clust(index)) {
1392 		err = lock_clust_rec_read_check_and_lock(
1393 			0, block, rec, index, offsets, LOCK_X, type, thr);
1394 	} else {
1395 		err = lock_sec_rec_read_check_and_lock(
1396 			0, block, rec, index, offsets, LOCK_X, type, thr);
1397 	}
1398 
1399 	return(err);
1400 }
1401 
1402 /***************************************************************//**
1403 Checks if foreign key constraint fails for an index entry. Sets shared locks
1404 which lock either the success or the failure of the constraint. NOTE that
1405 the caller must have a shared latch on dict_operation_lock.
1406 @return	DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1407 UNIV_INTERN
1408 dberr_t
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1409 row_ins_check_foreign_constraint(
1410 /*=============================*/
1411 	ibool		check_ref,/*!< in: TRUE if we want to check that
1412 				the referenced table is ok, FALSE if we
1413 				want to check the foreign key table */
1414 	dict_foreign_t*	foreign,/*!< in: foreign constraint; NOTE that the
1415 				tables mentioned in it must be in the
1416 				dictionary cache if they exist at all */
1417 	dict_table_t*	table,	/*!< in: if check_ref is TRUE, then the foreign
1418 				table, else the referenced table */
1419 	dtuple_t*	entry,	/*!< in: index entry for index */
1420 	que_thr_t*	thr)	/*!< in: query thread */
1421 {
1422 	dberr_t		err;
1423 	upd_node_t*	upd_node;
1424 	dict_table_t*	check_table;
1425 	dict_index_t*	check_index;
1426 	ulint		n_fields_cmp;
1427 	btr_pcur_t	pcur;
1428 	int		cmp;
1429 	ulint		i;
1430 	mtr_t		mtr;
1431 	trx_t*		trx		= thr_get_trx(thr);
1432 	mem_heap_t*	heap		= NULL;
1433 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1434 	ulint*		offsets		= offsets_;
1435 	rec_offs_init(offsets_);
1436 
1437 run_again:
1438 #ifdef UNIV_SYNC_DEBUG
1439 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
1440 #endif /* UNIV_SYNC_DEBUG */
1441 
1442 	err = DB_SUCCESS;
1443 
1444 	if (trx->check_foreigns == FALSE) {
1445 		/* The user has suppressed foreign key checks currently for
1446 		this session */
1447 		goto exit_func;
1448 	}
1449 
1450 	/* If any of the foreign key fields in entry is SQL NULL, we
1451 	suppress the foreign key check: this is compatible with Oracle,
1452 	for example */
1453 
1454 	for (i = 0; i < foreign->n_fields; i++) {
1455 		if (UNIV_SQL_NULL == dfield_get_len(
1456 			    dtuple_get_nth_field(entry, i))) {
1457 
1458 			goto exit_func;
1459 		}
1460 	}
1461 
1462 	if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1463 		upd_node = static_cast<upd_node_t*>(thr->run_node);
1464 
1465 		if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1466 			/* If a cascaded update is done as defined by a
1467 			foreign key constraint, do not check that
1468 			constraint for the child row. In ON UPDATE CASCADE
1469 			the update of the parent row is only half done when
1470 			we come here: if we would check the constraint here
1471 			for the child row it would fail.
1472 
1473 			A QUESTION remains: if in the child table there are
1474 			several constraints which refer to the same parent
1475 			table, we should merge all updates to the child as
1476 			one update? And the updates can be contradictory!
1477 			Currently we just perform the update associated
1478 			with each foreign key constraint, one after
1479 			another, and the user has problems predicting in
1480 			which order they are performed. */
1481 
1482 			goto exit_func;
1483 		}
1484 	}
1485 
1486 	if (check_ref) {
1487 		check_table = foreign->referenced_table;
1488 		check_index = foreign->referenced_index;
1489 	} else {
1490 		check_table = foreign->foreign_table;
1491 		check_index = foreign->foreign_index;
1492 	}
1493 
1494 	if (check_table == NULL
1495 	    || check_table->ibd_file_missing
1496 	    || check_index == NULL) {
1497 
1498 		if (!srv_read_only_mode && check_ref) {
1499 			FILE*	ef = dict_foreign_err_file;
1500 
1501 			row_ins_set_detailed(trx, foreign);
1502 
1503 			row_ins_foreign_trx_print(trx);
1504 
1505 			fputs("Foreign key constraint fails for table ", ef);
1506 			ut_print_name(ef, trx, TRUE,
1507 				      foreign->foreign_table_name);
1508 			fputs(":\n", ef);
1509 			dict_print_info_on_foreign_key_in_create_format(
1510 				ef, trx, foreign, TRUE);
1511 			fputs("\nTrying to add to index ", ef);
1512 			ut_print_name(ef, trx, FALSE,
1513 				      foreign->foreign_index->name);
1514 			fputs(" tuple:\n", ef);
1515 			dtuple_print(ef, entry);
1516 			fputs("\nBut the parent table ", ef);
1517 			ut_print_name(ef, trx, TRUE,
1518 				      foreign->referenced_table_name);
1519 			fputs("\nor its .ibd file does"
1520 			      " not currently exist!\n", ef);
1521 			mutex_exit(&dict_foreign_err_mutex);
1522 
1523 			err = DB_NO_REFERENCED_ROW;
1524 		}
1525 
1526 		goto exit_func;
1527 	}
1528 
1529 	if (check_table != table) {
1530 		/* We already have a LOCK_IX on table, but not necessarily
1531 		on check_table */
1532 
1533 		err = lock_table(0, check_table, LOCK_IS, thr);
1534 
1535 		if (err != DB_SUCCESS) {
1536 
1537 			goto do_possible_lock_wait;
1538 		}
1539 	}
1540 
1541 	mtr_start(&mtr);
1542 
1543 	/* Store old value on n_fields_cmp */
1544 
1545 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1546 
1547 	dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1548 
1549 	btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1550 		      BTR_SEARCH_LEAF, &pcur, &mtr);
1551 
1552 	/* Scan index records and check if there is a matching record */
1553 
1554 	do {
1555 		const rec_t*		rec = btr_pcur_get_rec(&pcur);
1556 		const buf_block_t*	block = btr_pcur_get_block(&pcur);
1557 
1558 		if (page_rec_is_infimum(rec)) {
1559 
1560 			continue;
1561 		}
1562 
1563 		offsets = rec_get_offsets(rec, check_index,
1564 					  offsets, ULINT_UNDEFINED, &heap);
1565 
1566 		if (page_rec_is_supremum(rec)) {
1567 
1568 			err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1569 							  rec, check_index,
1570 							  offsets, thr);
1571 			switch (err) {
1572 			case DB_SUCCESS_LOCKED_REC:
1573 			case DB_SUCCESS:
1574 				continue;
1575 			default:
1576 				goto end_scan;
1577 			}
1578 		}
1579 
1580 		cmp = cmp_dtuple_rec(entry, rec, offsets);
1581 
1582 		if (cmp == 0) {
1583 			if (rec_get_deleted_flag(rec,
1584 						 rec_offs_comp(offsets))) {
1585 				err = row_ins_set_shared_rec_lock(
1586 					LOCK_ORDINARY, block,
1587 					rec, check_index, offsets, thr);
1588 				switch (err) {
1589 				case DB_SUCCESS_LOCKED_REC:
1590 				case DB_SUCCESS:
1591 					break;
1592 				default:
1593 					goto end_scan;
1594 				}
1595 			} else {
1596 				/* Found a matching record. Lock only
1597 				a record because we can allow inserts
1598 				into gaps */
1599 
1600 				err = row_ins_set_shared_rec_lock(
1601 					LOCK_REC_NOT_GAP, block,
1602 					rec, check_index, offsets, thr);
1603 
1604 				switch (err) {
1605 				case DB_SUCCESS_LOCKED_REC:
1606 				case DB_SUCCESS:
1607 					break;
1608 				default:
1609 					goto end_scan;
1610 				}
1611 
1612 				if (check_ref) {
1613 					err = DB_SUCCESS;
1614 
1615 					goto end_scan;
1616 				} else if (foreign->type != 0) {
1617 					/* There is an ON UPDATE or ON DELETE
1618 					condition: check them in a separate
1619 					function */
1620 
1621 					err = row_ins_foreign_check_on_constraint(
1622 						thr, foreign, &pcur, entry,
1623 						&mtr);
1624 					if (err != DB_SUCCESS) {
1625 						/* Since reporting a plain
1626 						"duplicate key" error
1627 						message to the user in
1628 						cases where a long CASCADE
1629 						operation would lead to a
1630 						duplicate key in some
1631 						other table is very
1632 						confusing, map duplicate
1633 						key errors resulting from
1634 						FK constraints to a
1635 						separate error code. */
1636 
1637 						if (err == DB_DUPLICATE_KEY) {
1638 							err = DB_FOREIGN_DUPLICATE_KEY;
1639 						}
1640 
1641 						goto end_scan;
1642 					}
1643 
1644 					/* row_ins_foreign_check_on_constraint
1645 					may have repositioned pcur on a
1646 					different block */
1647 					block = btr_pcur_get_block(&pcur);
1648 				} else {
1649 					row_ins_foreign_report_err(
1650 						"Trying to delete or update",
1651 						thr, foreign, rec, entry);
1652 
1653 					err = DB_ROW_IS_REFERENCED;
1654 					goto end_scan;
1655 				}
1656 			}
1657 		} else {
1658 			ut_a(cmp < 0);
1659 
1660 			err = row_ins_set_shared_rec_lock(
1661 				LOCK_GAP, block,
1662 				rec, check_index, offsets, thr);
1663 
1664 			switch (err) {
1665 			case DB_SUCCESS_LOCKED_REC:
1666 			case DB_SUCCESS:
1667 				if (check_ref) {
1668 					err = DB_NO_REFERENCED_ROW;
1669 					row_ins_foreign_report_add_err(
1670 						trx, foreign, rec, entry);
1671 				} else {
1672 					err = DB_SUCCESS;
1673 				}
1674 			default:
1675 				break;
1676 			}
1677 
1678 			goto end_scan;
1679 		}
1680 	} while (btr_pcur_move_to_next(&pcur, &mtr));
1681 
1682 	if (check_ref) {
1683 		row_ins_foreign_report_add_err(
1684 			trx, foreign, btr_pcur_get_rec(&pcur), entry);
1685 		err = DB_NO_REFERENCED_ROW;
1686 	} else {
1687 		err = DB_SUCCESS;
1688 	}
1689 
1690 end_scan:
1691 	btr_pcur_close(&pcur);
1692 
1693 	mtr_commit(&mtr);
1694 
1695 	/* Restore old value */
1696 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1697 
1698 do_possible_lock_wait:
1699 	if (err == DB_LOCK_WAIT) {
1700 		bool		verified = false;
1701 
1702 		trx->error_state = err;
1703 
1704 		que_thr_stop_for_mysql(thr);
1705 
1706 		lock_wait_suspend_thread(thr);
1707 
1708 		if (check_table->to_be_dropped) {
1709 			/* The table is being dropped. We shall timeout
1710 			this operation */
1711 			err = DB_LOCK_WAIT_TIMEOUT;
1712 			goto exit_func;
1713 		}
1714 
1715 		/* We had temporarily released dict_operation_lock in
1716 		above lock sleep wait, now we have the lock again, and
1717 		we will need to re-check whether the foreign key has been
1718 		dropped. We only need to verify if the table is referenced
1719 		table case (check_ref == 0), since MDL lock will prevent
1720 		concurrent DDL and DML on the same table */
1721 		if (!check_ref) {
1722 			for (dict_foreign_set::iterator it
1723 				= table->referenced_set.begin();
1724 			     it != table->referenced_set.end();
1725 			     ++it) {
1726 				if (*it == foreign) {
1727 					verified = true;
1728 					break;
1729 				}
1730 			}
1731 		} else {
1732 			verified = true;
1733 		}
1734 
1735 		if (!verified) {
1736 			err = DB_DICT_CHANGED;
1737 		} else if (trx->error_state == DB_SUCCESS) {
1738 			goto run_again;
1739 		} else {
1740 			err = trx->error_state;
1741 		}
1742 	}
1743 
1744 exit_func:
1745 	if (UNIV_LIKELY_NULL(heap)) {
1746 		mem_heap_free(heap);
1747 	}
1748 	return(err);
1749 }
1750 
1751 /***************************************************************//**
1752 Checks if foreign key constraints fail for an index entry. If index
1753 is not mentioned in any constraint, this function does nothing,
1754 Otherwise does searches to the indexes of referenced tables and
1755 sets shared locks which lock either the success or the failure of
1756 a constraint.
1757 @return	DB_SUCCESS or error code */
1758 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1759 dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1760 row_ins_check_foreign_constraints(
1761 /*==============================*/
1762 	dict_table_t*	table,	/*!< in: table */
1763 	dict_index_t*	index,	/*!< in: index */
1764 	dtuple_t*	entry,	/*!< in: index entry for index */
1765 	que_thr_t*	thr)	/*!< in: query thread */
1766 {
1767 	dict_foreign_t*	foreign;
1768 	dberr_t		err;
1769 	trx_t*		trx;
1770 	ibool		got_s_lock	= FALSE;
1771 
1772 	trx = thr_get_trx(thr);
1773 
1774 	DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1775 			    "foreign_constraint_check_for_ins");
1776 
1777 	for (dict_foreign_set::iterator it = table->foreign_set.begin();
1778 	     it != table->foreign_set.end();
1779 	     ++it) {
1780 
1781 		foreign = *it;
1782 
1783 		if (foreign->foreign_index == index) {
1784 			dict_table_t*	ref_table = NULL;
1785 			dict_table_t*	foreign_table = foreign->foreign_table;
1786 			dict_table_t*	referenced_table
1787 						= foreign->referenced_table;
1788 
1789 			if (referenced_table == NULL) {
1790 
1791 				ref_table = dict_table_open_on_name(
1792 					foreign->referenced_table_name_lookup,
1793 					FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1794 			}
1795 
1796 			if (0 == trx->dict_operation_lock_mode) {
1797 				got_s_lock = TRUE;
1798 
1799 				row_mysql_freeze_data_dictionary(trx);
1800 			}
1801 
1802 			if (referenced_table) {
1803 				os_inc_counter(dict_sys->mutex,
1804 					       foreign_table
1805 					       ->n_foreign_key_checks_running);
1806 			}
1807 
1808 			/* NOTE that if the thread ends up waiting for a lock
1809 			we will release dict_operation_lock temporarily!
1810 			But the counter on the table protects the referenced
1811 			table from being dropped while the check is running. */
1812 
1813 			err = row_ins_check_foreign_constraint(
1814 				TRUE, foreign, table, entry, thr);
1815 
1816 			DBUG_EXECUTE_IF("row_ins_dict_change_err",
1817 					err = DB_DICT_CHANGED;);
1818 
1819 			if (referenced_table) {
1820 				os_dec_counter(dict_sys->mutex,
1821 					       foreign_table
1822 					       ->n_foreign_key_checks_running);
1823 			}
1824 
1825 			if (got_s_lock) {
1826 				row_mysql_unfreeze_data_dictionary(trx);
1827 			}
1828 
1829 			if (ref_table != NULL) {
1830 				dict_table_close(ref_table, FALSE, FALSE);
1831 			}
1832 
1833 			if (err != DB_SUCCESS) {
1834 
1835 				return(err);
1836 			}
1837 		}
1838 	}
1839 
1840 	return(DB_SUCCESS);
1841 }
1842 
1843 /***************************************************************//**
1844 Checks if a unique key violation to rec would occur at the index entry
1845 insert.
1846 @return	TRUE if error */
1847 static
1848 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1849 row_ins_dupl_error_with_rec(
1850 /*========================*/
1851 	const rec_t*	rec,	/*!< in: user record; NOTE that we assume
1852 				that the caller already has a record lock on
1853 				the record! */
1854 	const dtuple_t*	entry,	/*!< in: entry to insert */
1855 	dict_index_t*	index,	/*!< in: index */
1856 	const ulint*	offsets)/*!< in: rec_get_offsets(rec, index) */
1857 {
1858 	ulint	matched_fields;
1859 	ulint	matched_bytes;
1860 	ulint	n_unique;
1861 	ulint	i;
1862 
1863 	ut_ad(rec_offs_validate(rec, index, offsets));
1864 
1865 	n_unique = dict_index_get_n_unique(index);
1866 
1867 	matched_fields = 0;
1868 	matched_bytes = 0;
1869 
1870 	cmp_dtuple_rec_with_match(entry, rec, offsets,
1871 				  &matched_fields, &matched_bytes);
1872 
1873 	if (matched_fields < n_unique) {
1874 
1875 		return(FALSE);
1876 	}
1877 
1878 	/* In a unique secondary index we allow equal key values if they
1879 	contain SQL NULLs */
1880 
1881 	if (!dict_index_is_clust(index)) {
1882 
1883 		for (i = 0; i < n_unique; i++) {
1884 			if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1885 
1886 				return(FALSE);
1887 			}
1888 		}
1889 	}
1890 
1891 	return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1892 }
1893 
1894 /***************************************************************//**
1895 Scans a unique non-clustered index at a given index entry to determine
1896 whether a uniqueness violation has occurred for the key value of the entry.
1897 Set shared locks on possible duplicate records.
1898 @return	DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
1899 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1900 dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)1901 row_ins_scan_sec_index_for_duplicate(
1902 /*=================================*/
1903 	ulint		flags,	/*!< in: undo logging and locking flags */
1904 	dict_index_t*	index,	/*!< in: non-clustered unique index */
1905 	dtuple_t*	entry,	/*!< in: index entry */
1906 	que_thr_t*	thr,	/*!< in: query thread */
1907 	bool		s_latch,/*!< in: whether index->lock is being held */
1908 	mtr_t*		mtr,	/*!< in/out: mini-transaction */
1909 	mem_heap_t*	offsets_heap)
1910 				/*!< in/out: memory heap that can be emptied */
1911 {
1912 	ulint		n_unique;
1913 	int		cmp;
1914 	ulint		n_fields_cmp;
1915 	btr_pcur_t	pcur;
1916 	dberr_t		err		= DB_SUCCESS;
1917 	ulint		allow_duplicates;
1918 	ulint*		offsets		= NULL;
1919 
1920 #ifdef UNIV_SYNC_DEBUG
1921 	ut_ad(s_latch == rw_lock_own(&index->lock, RW_LOCK_SHARED));
1922 #endif /* UNIV_SYNC_DEBUG */
1923 
1924 	n_unique = dict_index_get_n_unique(index);
1925 
1926 	/* If the secondary index is unique, but one of the fields in the
1927 	n_unique first fields is NULL, a unique key violation cannot occur,
1928 	since we define NULL != NULL in this case */
1929 
1930 	for (ulint i = 0; i < n_unique; i++) {
1931 		if (UNIV_SQL_NULL == dfield_get_len(
1932 			    dtuple_get_nth_field(entry, i))) {
1933 
1934 			return(DB_SUCCESS);
1935 		}
1936 	}
1937 
1938 	/* Store old value on n_fields_cmp */
1939 
1940 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1941 
1942 	dtuple_set_n_fields_cmp(entry, n_unique);
1943 
1944 	btr_pcur_open(index, entry, PAGE_CUR_GE,
1945 		      s_latch
1946 		      ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED
1947 		      : BTR_SEARCH_LEAF,
1948 		      &pcur, mtr);
1949 
1950 	allow_duplicates = thr_get_trx(thr)->duplicates;
1951 
1952 	/* Scan index records and check if there is a duplicate */
1953 
1954 	do {
1955 		const rec_t*		rec	= btr_pcur_get_rec(&pcur);
1956 		const buf_block_t*	block	= btr_pcur_get_block(&pcur);
1957 		const ulint		lock_type = LOCK_ORDINARY;
1958 
1959 		if (page_rec_is_infimum(rec)) {
1960 
1961 			continue;
1962 		}
1963 
1964 		offsets = rec_get_offsets(rec, index, offsets,
1965 					  ULINT_UNDEFINED, &offsets_heap);
1966 
1967 		if (flags & BTR_NO_LOCKING_FLAG) {
1968 			/* Set no locks when applying log
1969 			in online table rebuild. */
1970 		} else if (allow_duplicates) {
1971 
1972 			/* If the SQL-query will update or replace
1973 			duplicate key we will take X-lock for
1974 			duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1975 			INSERT ON DUPLICATE KEY UPDATE). */
1976 
1977 			err = row_ins_set_exclusive_rec_lock(
1978 				lock_type, block, rec, index, offsets, thr);
1979 		} else {
1980 
1981 			err = row_ins_set_shared_rec_lock(
1982 				lock_type, block, rec, index, offsets, thr);
1983 		}
1984 
1985 		switch (err) {
1986 		case DB_SUCCESS_LOCKED_REC:
1987 			err = DB_SUCCESS;
1988 		case DB_SUCCESS:
1989 			break;
1990 		default:
1991 			goto end_scan;
1992 		}
1993 
1994 		if (page_rec_is_supremum(rec)) {
1995 
1996 			continue;
1997 		}
1998 
1999 		cmp = cmp_dtuple_rec(entry, rec, offsets);
2000 
2001 		if (cmp == 0) {
2002 			if (row_ins_dupl_error_with_rec(rec, entry,
2003 							index, offsets)) {
2004 				err = DB_DUPLICATE_KEY;
2005 
2006 				thr_get_trx(thr)->error_info = index;
2007 
2008 				/* If the duplicate is on hidden FTS_DOC_ID,
2009 				state so in the error log */
2010 				if (DICT_TF2_FLAG_IS_SET(
2011 					index->table,
2012 					DICT_TF2_FTS_HAS_DOC_ID)
2013 				    && strcmp(index->name,
2014 					      FTS_DOC_ID_INDEX_NAME) == 0) {
2015 					ib_logf(IB_LOG_LEVEL_ERROR,
2016 						"Duplicate FTS_DOC_ID value"
2017 						" on table %s",
2018 						index->table->name);
2019 				}
2020 
2021 				goto end_scan;
2022 			}
2023 		} else {
2024 			ut_a(cmp < 0);
2025 			goto end_scan;
2026 		}
2027 	} while (btr_pcur_move_to_next(&pcur, mtr));
2028 
2029 end_scan:
2030 	/* Restore old value */
2031 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2032 
2033 	return(err);
2034 }
2035 
2036 /** Checks for a duplicate when the table is being rebuilt online.
2037 @retval DB_SUCCESS		when no duplicate is detected
2038 @retval DB_SUCCESS_LOCKED_REC	when rec is an exact match of entry or
2039 a newer version of entry (the entry should not be inserted)
2040 @retval DB_DUPLICATE_KEY	when entry is a duplicate of rec */
2041 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2042 dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,ulint * offsets)2043 row_ins_duplicate_online(
2044 /*=====================*/
2045 	ulint		n_uniq,	/*!< in: offset of DB_TRX_ID */
2046 	const dtuple_t*	entry,	/*!< in: entry that is being inserted */
2047 	const rec_t*	rec,	/*!< in: clustered index record */
2048 	ulint*		offsets)/*!< in/out: rec_get_offsets(rec) */
2049 {
2050 	ulint	fields	= 0;
2051 	ulint	bytes	= 0;
2052 
2053 	/* During rebuild, there should not be any delete-marked rows
2054 	in the new table. */
2055 	ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2056 	ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2057 
2058 	/* Compare the PRIMARY KEY fields and the
2059 	DB_TRX_ID, DB_ROLL_PTR. */
2060 	cmp_dtuple_rec_with_match_low(
2061 		entry, rec, offsets, n_uniq + 2, &fields, &bytes);
2062 
2063 	if (fields < n_uniq) {
2064 		/* Not a duplicate. */
2065 		return(DB_SUCCESS);
2066 	}
2067 
2068 	if (fields == n_uniq + 2) {
2069 		/* rec is an exact match of entry. */
2070 		ut_ad(bytes == 0);
2071 		return(DB_SUCCESS_LOCKED_REC);
2072 	}
2073 
2074 	return(DB_DUPLICATE_KEY);
2075 }
2076 
2077 /** Checks for a duplicate when the table is being rebuilt online.
2078 @retval DB_SUCCESS		when no duplicate is detected
2079 @retval DB_SUCCESS_LOCKED_REC	when rec is an exact match of entry or
2080 a newer version of entry (the entry should not be inserted)
2081 @retval DB_DUPLICATE_KEY	when entry is a duplicate of rec */
2082 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2083 dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2084 row_ins_duplicate_error_in_clust_online(
2085 /*====================================*/
2086 	ulint		n_uniq,	/*!< in: offset of DB_TRX_ID */
2087 	const dtuple_t*	entry,	/*!< in: entry that is being inserted */
2088 	const btr_cur_t*cursor,	/*!< in: cursor on insert position */
2089 	ulint**		offsets,/*!< in/out: rec_get_offsets(rec) */
2090 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2091 {
2092 	dberr_t		err	= DB_SUCCESS;
2093 	const rec_t*	rec	= btr_cur_get_rec(cursor);
2094 
2095 	if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2096 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2097 					   ULINT_UNDEFINED, heap);
2098 		err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2099 		if (err != DB_SUCCESS) {
2100 			return(err);
2101 		}
2102 	}
2103 
2104 	rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2105 
2106 	if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2107 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2108 					   ULINT_UNDEFINED, heap);
2109 		err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2110 	}
2111 
2112 	return(err);
2113 }
2114 
2115 /***************************************************************//**
2116 Checks if a unique key violation error would occur at an index entry
2117 insert. Sets shared locks on possible duplicate records. Works only
2118 for a clustered index!
2119 @retval DB_SUCCESS if no error
2120 @retval DB_DUPLICATE_KEY if error,
2121 @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2122 record
2123 @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2124 in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2125 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2126 dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2127 row_ins_duplicate_error_in_clust(
2128 /*=============================*/
2129 	ulint		flags,	/*!< in: undo logging and locking flags */
2130 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
2131 	const dtuple_t*	entry,	/*!< in: entry to insert */
2132 	que_thr_t*	thr,	/*!< in: query thread */
2133 	mtr_t*		mtr)	/*!< in: mtr */
2134 {
2135 	dberr_t	err;
2136 	rec_t*	rec;
2137 	ulint	n_unique;
2138 	trx_t*	trx		= thr_get_trx(thr);
2139 	mem_heap_t*heap		= NULL;
2140 	ulint	offsets_[REC_OFFS_NORMAL_SIZE];
2141 	ulint*	offsets		= offsets_;
2142 	rec_offs_init(offsets_);
2143 
2144 	UT_NOT_USED(mtr);
2145 
2146 	ut_ad(dict_index_is_clust(cursor->index));
2147 
2148 	/* NOTE: For unique non-clustered indexes there may be any number
2149 	of delete marked records with the same value for the non-clustered
2150 	index key (remember multiversioning), and which differ only in
2151 	the row refererence part of the index record, containing the
2152 	clustered index key fields. For such a secondary index record,
2153 	to avoid race condition, we must FIRST do the insertion and after
2154 	that check that the uniqueness condition is not breached! */
2155 
2156 	/* NOTE: A problem is that in the B-tree node pointers on an
2157 	upper level may match more to the entry than the actual existing
2158 	user records on the leaf level. So, even if low_match would suggest
2159 	that a duplicate key violation may occur, this may not be the case. */
2160 
2161 	n_unique = dict_index_get_n_unique(cursor->index);
2162 
2163 	if (cursor->low_match >= n_unique) {
2164 
2165 		rec = btr_cur_get_rec(cursor);
2166 
2167 		if (!page_rec_is_infimum(rec)) {
2168 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2169 						  ULINT_UNDEFINED, &heap);
2170 
2171 			/* We set a lock on the possible duplicate: this
2172 			is needed in logical logging of MySQL to make
2173 			sure that in roll-forward we get the same duplicate
2174 			errors as in original execution */
2175 
2176 			if (trx->duplicates) {
2177 
2178 				/* If the SQL-query will update or replace
2179 				duplicate key we will take X-lock for
2180 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2181 				INSERT ON DUPLICATE KEY UPDATE). */
2182 
2183 				err = row_ins_set_exclusive_rec_lock(
2184 					LOCK_REC_NOT_GAP,
2185 					btr_cur_get_block(cursor),
2186 					rec, cursor->index, offsets, thr);
2187 			} else {
2188 
2189 				err = row_ins_set_shared_rec_lock(
2190 					LOCK_REC_NOT_GAP,
2191 					btr_cur_get_block(cursor), rec,
2192 					cursor->index, offsets, thr);
2193 			}
2194 
2195 			switch (err) {
2196 			case DB_SUCCESS_LOCKED_REC:
2197 			case DB_SUCCESS:
2198 				break;
2199 			default:
2200 				goto func_exit;
2201 			}
2202 
2203 			if (row_ins_dupl_error_with_rec(
2204 				    rec, entry, cursor->index, offsets)) {
2205 duplicate:
2206 				trx->error_info = cursor->index;
2207 				err = DB_DUPLICATE_KEY;
2208 				goto func_exit;
2209 			}
2210 		}
2211 	}
2212 
2213 	if (cursor->up_match >= n_unique) {
2214 
2215 		rec = page_rec_get_next(btr_cur_get_rec(cursor));
2216 
2217 		if (!page_rec_is_supremum(rec)) {
2218 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2219 						  ULINT_UNDEFINED, &heap);
2220 
2221 			if (trx->duplicates) {
2222 
2223 				/* If the SQL-query will update or replace
2224 				duplicate key we will take X-lock for
2225 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2226 				INSERT ON DUPLICATE KEY UPDATE). */
2227 
2228 				err = row_ins_set_exclusive_rec_lock(
2229 					LOCK_REC_NOT_GAP,
2230 					btr_cur_get_block(cursor),
2231 					rec, cursor->index, offsets, thr);
2232 			} else {
2233 
2234 				err = row_ins_set_shared_rec_lock(
2235 					LOCK_REC_NOT_GAP,
2236 					btr_cur_get_block(cursor),
2237 					rec, cursor->index, offsets, thr);
2238 			}
2239 
2240 			switch (err) {
2241 			case DB_SUCCESS_LOCKED_REC:
2242 			case DB_SUCCESS:
2243 				break;
2244 			default:
2245 				goto func_exit;
2246 			}
2247 
2248 			if (row_ins_dupl_error_with_rec(
2249 				    rec, entry, cursor->index, offsets)) {
2250 				goto duplicate;
2251 			}
2252 		}
2253 
2254 		/* This should never happen */
2255 		ut_error;
2256 	}
2257 
2258 	err = DB_SUCCESS;
2259 func_exit:
2260 	if (UNIV_LIKELY_NULL(heap)) {
2261 		mem_heap_free(heap);
2262 	}
2263 	return(err);
2264 }
2265 
2266 /***************************************************************//**
2267 Checks if an index entry has long enough common prefix with an
2268 existing record so that the intended insert of the entry must be
2269 changed to a modify of the existing record. In the case of a clustered
2270 index, the prefix must be n_unique fields long. In the case of a
2271 secondary index, all fields must be equal.  InnoDB never updates
2272 secondary index records in place, other than clearing or setting the
2273 delete-mark flag. We could be able to update the non-unique fields
2274 of a unique secondary index record by checking the cursor->up_match,
2275 but we do not do so, because it could have some locking implications.
2276 @return TRUE if the existing record should be updated; FALSE if not */
2277 UNIV_INLINE
2278 ibool
row_ins_must_modify_rec(const btr_cur_t * cursor)2279 row_ins_must_modify_rec(
2280 /*====================*/
2281 	const btr_cur_t*	cursor)	/*!< in: B-tree cursor */
2282 {
2283 	/* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2284 	Because node pointers on upper levels of the B-tree may match more
2285 	to entry than to actual user records on the leaf level, we
2286 	have to check if the candidate record is actually a user record.
2287 	A clustered index node pointer contains index->n_unique first fields,
2288 	and a secondary index node pointer contains all index fields. */
2289 
2290 	return(cursor->low_match
2291 	       >= dict_index_get_n_unique_in_tree(cursor->index)
2292 	       && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2293 }
2294 
2295 /***************************************************************//**
2296 Tries to insert an entry into a clustered index, ignoring foreign key
2297 constraints. If a record with the same unique key is found, the other
2298 record is necessarily marked deleted by a committed transaction, or a
2299 unique key violation error occurs. The delete marked record is then
2300 updated to an existing record, and we must write an undo log record on
2301 the delete marked record.
2302 @retval DB_SUCCESS on success
2303 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2304 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2305 @return error code */
2306 UNIV_INTERN
2307 dberr_t
row_ins_clust_index_entry_low(ulint flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,ulint n_ext,que_thr_t * thr)2308 row_ins_clust_index_entry_low(
2309 /*==========================*/
2310 	ulint		flags,	/*!< in: undo logging and locking flags */
2311 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2312 				depending on whether we wish optimistic or
2313 				pessimistic descent down the index tree */
2314 	dict_index_t*	index,	/*!< in: clustered index */
2315 	ulint		n_uniq,	/*!< in: 0 or index->n_uniq */
2316 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2317 	ulint		n_ext,	/*!< in: number of externally stored columns */
2318 	que_thr_t*	thr)	/*!< in: query thread */
2319 {
2320 	btr_cur_t	cursor;
2321 	ulint*		offsets		= NULL;
2322 	dberr_t		err;
2323 	big_rec_t*	big_rec		= NULL;
2324 	mtr_t		mtr;
2325 	mem_heap_t*	offsets_heap	= NULL;
2326 
2327 	ut_ad(dict_index_is_clust(index));
2328 	ut_ad(!dict_index_is_unique(index)
2329 	      || n_uniq == dict_index_get_n_unique(index));
2330 	ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2331 
2332 	mtr_start(&mtr);
2333 
2334 	if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2335 		mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
2336 		mtr_s_lock(dict_index_get_lock(index), &mtr);
2337 	}
2338 
2339 	cursor.thr = thr;
2340 
2341 	/* Note that we use PAGE_CUR_LE as the search mode, because then
2342 	the function will return in both low_match and up_match of the
2343 	cursor sensible values */
2344 
2345 	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, mode,
2346 				    &cursor, 0, __FILE__, __LINE__, &mtr);
2347 
2348 #ifdef UNIV_DEBUG
2349 	{
2350 		page_t*	page = btr_cur_get_page(&cursor);
2351 		rec_t*	first_rec = page_rec_get_next(
2352 			page_get_infimum_rec(page));
2353 
2354 		ut_ad(page_rec_is_supremum(first_rec)
2355 		      || rec_get_n_fields(first_rec, index)
2356 		      == dtuple_get_n_fields(entry));
2357 	}
2358 #endif
2359 
2360 	if (n_uniq && (cursor.up_match >= n_uniq
2361 		       || cursor.low_match >= n_uniq)) {
2362 
2363 		if (flags
2364 		    == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2365 			| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
2366 			/* Set no locks when applying log
2367 			in online table rebuild. Only check for duplicates. */
2368 			err = row_ins_duplicate_error_in_clust_online(
2369 				n_uniq, entry, &cursor,
2370 				&offsets, &offsets_heap);
2371 
2372 			switch (err) {
2373 			case DB_SUCCESS:
2374 				break;
2375 			default:
2376 				ut_ad(0);
2377 				/* fall through */
2378 			case DB_SUCCESS_LOCKED_REC:
2379 			case DB_DUPLICATE_KEY:
2380 				thr_get_trx(thr)->error_info = cursor.index;
2381 			}
2382 		} else {
2383 			/* Note that the following may return also
2384 			DB_LOCK_WAIT */
2385 
2386 			err = row_ins_duplicate_error_in_clust(
2387 				flags, &cursor, entry, thr, &mtr);
2388 		}
2389 
2390 		if (err != DB_SUCCESS) {
2391 err_exit:
2392 			mtr_commit(&mtr);
2393 			goto func_exit;
2394 		}
2395 	}
2396 
2397 	if (row_ins_must_modify_rec(&cursor)) {
2398 		/* There is already an index entry with a long enough common
2399 		prefix, we must convert the insert into a modify of an
2400 		existing record */
2401 		mem_heap_t*	entry_heap	= mem_heap_create(1024);
2402 
2403 		err = row_ins_clust_index_entry_by_modify(
2404 			flags, mode, &cursor, &offsets, &offsets_heap,
2405 			entry_heap, &big_rec, entry, thr, &mtr);
2406 
2407 		rec_t*		rec		= btr_cur_get_rec(&cursor);
2408 
2409 		if (big_rec) {
2410 			ut_a(err == DB_SUCCESS);
2411 			/* Write out the externally stored
2412 			columns while still x-latching
2413 			index->lock and block->lock. Allocate
2414 			pages for big_rec in the mtr that
2415 			modified the B-tree, but be sure to skip
2416 			any pages that were freed in mtr. We will
2417 			write out the big_rec pages before
2418 			committing the B-tree mini-transaction. If
2419 			the system crashes so that crash recovery
2420 			will not replay the mtr_commit(&mtr), the
2421 			big_rec pages will be left orphaned until
2422 			the pages are allocated for something else.
2423 
2424 			TODO: If the allocation extends the
2425 			tablespace, it will not be redo
2426 			logged, in either mini-transaction.
2427 			Tablespace extension should be
2428 			redo-logged in the big_rec
2429 			mini-transaction, so that recovery
2430 			will not fail when the big_rec was
2431 			written to the extended portion of the
2432 			file, in case the file was somehow
2433 			truncated in the crash. */
2434 
2435 			DEBUG_SYNC_C_IF_THD(
2436 				thr_get_trx(thr)->mysql_thd,
2437 				"before_row_ins_upd_extern");
2438 			err = btr_store_big_rec_extern_fields(
2439 				index, btr_cur_get_block(&cursor),
2440 				rec, offsets, big_rec, &mtr,
2441 				BTR_STORE_INSERT_UPDATE);
2442 			DEBUG_SYNC_C_IF_THD(
2443 				thr_get_trx(thr)->mysql_thd,
2444 				"after_row_ins_upd_extern");
2445 			/* If writing big_rec fails (for
2446 			example, because of DB_OUT_OF_FILE_SPACE),
2447 			the record will be corrupted. Even if
2448 			we did not update any externally
2449 			stored columns, our update could cause
2450 			the record to grow so that a
2451 			non-updated column was selected for
2452 			external storage. This non-update
2453 			would not have been written to the
2454 			undo log, and thus the record cannot
2455 			be rolled back.
2456 
2457 			However, because we have not executed
2458 			mtr_commit(mtr) yet, the update will
2459 			not be replayed in crash recovery, and
2460 			the following assertion failure will
2461 			effectively "roll back" the operation. */
2462 			ut_a(err == DB_SUCCESS);
2463 			dtuple_big_rec_free(big_rec);
2464 		}
2465 
2466 		if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2467 			row_log_table_insert(rec, index, offsets);
2468 		}
2469 
2470 		mtr_commit(&mtr);
2471 		mem_heap_free(entry_heap);
2472 	} else {
2473 		rec_t*	insert_rec;
2474 
2475 		if (mode != BTR_MODIFY_TREE) {
2476 			ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2477 			      == BTR_MODIFY_LEAF);
2478 			err = btr_cur_optimistic_insert(
2479 				flags, &cursor, &offsets, &offsets_heap,
2480 				entry, &insert_rec, &big_rec,
2481 				n_ext, thr, &mtr);
2482 		} else {
2483 			if (buf_LRU_buf_pool_running_out()) {
2484 
2485 				err = DB_LOCK_TABLE_FULL;
2486 				goto err_exit;
2487 			}
2488 
2489 			err = btr_cur_optimistic_insert(
2490 				flags, &cursor,
2491 				&offsets, &offsets_heap,
2492 				entry, &insert_rec, &big_rec,
2493 				n_ext, thr, &mtr);
2494 
2495 			if (err == DB_FAIL) {
2496 				err = btr_cur_pessimistic_insert(
2497 					flags, &cursor,
2498 					&offsets, &offsets_heap,
2499 					entry, &insert_rec, &big_rec,
2500 					n_ext, thr, &mtr);
2501 			}
2502 		}
2503 
2504 		if (UNIV_LIKELY_NULL(big_rec)) {
2505 			mtr_commit(&mtr);
2506 
2507 			/* Online table rebuild could read (and
2508 			ignore) the incomplete record at this point.
2509 			If online rebuild is in progress, the
2510 			row_ins_index_entry_big_rec() will write log. */
2511 
2512 			DBUG_EXECUTE_IF(
2513 				"row_ins_extern_checkpoint",
2514 				log_make_checkpoint_at(
2515 					LSN_MAX, TRUE););
2516 			err = row_ins_index_entry_big_rec(
2517 				entry, big_rec, offsets, &offsets_heap, index,
2518 				thr_get_trx(thr)->mysql_thd,
2519 				__FILE__, __LINE__);
2520 			dtuple_convert_back_big_rec(index, entry, big_rec);
2521 		} else {
2522 			if (err == DB_SUCCESS
2523 			    && dict_index_is_online_ddl(index)) {
2524 				row_log_table_insert(
2525 					insert_rec, index, offsets);
2526 			}
2527 
2528 			mtr_commit(&mtr);
2529 		}
2530 	}
2531 
2532 func_exit:
2533 	if (offsets_heap) {
2534 		mem_heap_free(offsets_heap);
2535 	}
2536 
2537 	return(err);
2538 }
2539 
2540 /***************************************************************//**
2541 Starts a mini-transaction and checks if the index will be dropped.
2542 @return true if the index is to be dropped */
2543 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2544 bool
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2545 row_ins_sec_mtr_start_and_check_if_aborted(
2546 /*=======================================*/
2547 	mtr_t*		mtr,	/*!< out: mini-transaction */
2548 	dict_index_t*	index,	/*!< in/out: secondary index */
2549 	bool		check,	/*!< in: whether to check */
2550 	ulint		search_mode)
2551 				/*!< in: flags */
2552 {
2553 	ut_ad(!dict_index_is_clust(index));
2554 
2555 	mtr_start(mtr);
2556 
2557 	if (!check) {
2558 		return(false);
2559 	}
2560 
2561 	if (search_mode & BTR_ALREADY_S_LATCHED) {
2562 		mtr_s_lock(dict_index_get_lock(index), mtr);
2563 	} else {
2564 		mtr_x_lock(dict_index_get_lock(index), mtr);
2565 	}
2566 
2567 	switch (index->online_status) {
2568 	case ONLINE_INDEX_ABORTED:
2569 	case ONLINE_INDEX_ABORTED_DROPPED:
2570 		ut_ad(*index->name == TEMP_INDEX_PREFIX);
2571 		return(true);
2572 	case ONLINE_INDEX_COMPLETE:
2573 		return(false);
2574 	case ONLINE_INDEX_CREATION:
2575 		break;
2576 	}
2577 
2578 	ut_error;
2579 	return(true);
2580 }
2581 
2582 /***************************************************************//**
2583 Tries to insert an entry into a secondary index. If a record with exactly the
2584 same fields is found, the other record is necessarily marked deleted.
2585 It is then unmarked. Otherwise, the entry is just inserted to the index.
2586 @retval DB_SUCCESS on success
2587 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2588 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2589 @return error code */
2590 UNIV_INTERN
2591 dberr_t
row_ins_sec_index_entry_low(ulint flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr)2592 row_ins_sec_index_entry_low(
2593 /*========================*/
2594 	ulint		flags,	/*!< in: undo logging and locking flags */
2595 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2596 				depending on whether we wish optimistic or
2597 				pessimistic descent down the index tree */
2598 	dict_index_t*	index,	/*!< in: secondary index */
2599 	mem_heap_t*	offsets_heap,
2600 				/*!< in/out: memory heap that can be emptied */
2601 	mem_heap_t*	heap,	/*!< in/out: memory heap */
2602 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2603 	trx_id_t	trx_id,	/*!< in: PAGE_MAX_TRX_ID during
2604 				row_log_table_apply(), or 0 */
2605 	que_thr_t*	thr)	/*!< in: query thread */
2606 {
2607 	btr_cur_t	cursor;
2608 	ulint		search_mode	= mode | BTR_INSERT;
2609 	dberr_t		err		= DB_SUCCESS;
2610 	ulint		n_unique;
2611 	mtr_t		mtr;
2612 	ulint*		offsets	= NULL;
2613 
2614 	ut_ad(!dict_index_is_clust(index));
2615 	ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2616 
2617 	cursor.thr = thr;
2618 	ut_ad(thr_get_trx(thr)->id);
2619 	mtr_start(&mtr);
2620 
2621 	/* Ensure that we acquire index->lock when inserting into an
2622 	index with index->online_status == ONLINE_INDEX_COMPLETE, but
2623 	could still be subject to rollback_inplace_alter_table().
2624 	This prevents a concurrent change of index->online_status.
2625 	The memory object cannot be freed as long as we have an open
2626 	reference to the table, or index->table->n_ref_count > 0. */
2627 	const bool check = *index->name == TEMP_INDEX_PREFIX;
2628 	if (check) {
2629 		DEBUG_SYNC_C("row_ins_sec_index_enter");
2630 		if (mode == BTR_MODIFY_LEAF) {
2631 			search_mode |= BTR_ALREADY_S_LATCHED;
2632 			mtr_s_lock(dict_index_get_lock(index), &mtr);
2633 		} else {
2634 			mtr_x_lock(dict_index_get_lock(index), &mtr);
2635 		}
2636 
2637 		if (row_log_online_op_try(
2638 			    index, entry, thr_get_trx(thr)->id)) {
2639 			goto func_exit;
2640 		}
2641 	}
2642 
2643 	/* Note that we use PAGE_CUR_LE as the search mode, because then
2644 	the function will return in both low_match and up_match of the
2645 	cursor sensible values */
2646 
2647 	if (!thr_get_trx(thr)->check_unique_secondary) {
2648 		search_mode |= BTR_IGNORE_SEC_UNIQUE;
2649 	}
2650 
2651 	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2652 				    search_mode,
2653 				    &cursor, 0, __FILE__, __LINE__, &mtr);
2654 
2655 	if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2656 		/* The insert was buffered during the search: we are done */
2657 		goto func_exit;
2658 	}
2659 
2660 #ifdef UNIV_DEBUG
2661 	{
2662 		page_t*	page = btr_cur_get_page(&cursor);
2663 		rec_t*	first_rec = page_rec_get_next(
2664 			page_get_infimum_rec(page));
2665 
2666 		ut_ad(page_rec_is_supremum(first_rec)
2667 		      || rec_get_n_fields(first_rec, index)
2668 		      == dtuple_get_n_fields(entry));
2669 	}
2670 #endif
2671 
2672 	n_unique = dict_index_get_n_unique(index);
2673 
2674 	if (dict_index_is_unique(index)
2675 	    && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
2676 		mtr_commit(&mtr);
2677 
2678 		DEBUG_SYNC_C("row_ins_sec_index_unique");
2679 
2680 		if (row_ins_sec_mtr_start_and_check_if_aborted(
2681 			    &mtr, index, check, search_mode)) {
2682 			goto func_exit;
2683 		}
2684 
2685 		err = row_ins_scan_sec_index_for_duplicate(
2686 			flags, index, entry, thr, check, &mtr, offsets_heap);
2687 
2688 		mtr_commit(&mtr);
2689 
2690 		switch (err) {
2691 		case DB_SUCCESS:
2692 			break;
2693 		case DB_DUPLICATE_KEY:
2694 			if (*index->name == TEMP_INDEX_PREFIX) {
2695 				ut_ad(!thr_get_trx(thr)
2696 				      ->dict_operation_lock_mode);
2697 				mutex_enter(&dict_sys->mutex);
2698 				dict_set_corrupted_index_cache_only(
2699 					index, index->table);
2700 				mutex_exit(&dict_sys->mutex);
2701 				/* Do not return any error to the
2702 				caller. The duplicate will be reported
2703 				by ALTER TABLE or CREATE UNIQUE INDEX.
2704 				Unfortunately we cannot report the
2705 				duplicate key value to the DDL thread,
2706 				because the altered_table object is
2707 				private to its call stack. */
2708 				err = DB_SUCCESS;
2709 			}
2710 			/* fall through */
2711 		default:
2712 			return(err);
2713 		}
2714 
2715 		if (row_ins_sec_mtr_start_and_check_if_aborted(
2716 			    &mtr, index, check, search_mode)) {
2717 			goto func_exit;
2718 		}
2719 
2720 		DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
2721 
2722 		/* We did not find a duplicate and we have now
2723 		locked with s-locks the necessary records to
2724 		prevent any insertion of a duplicate by another
2725 		transaction. Let us now reposition the cursor and
2726 		continue the insertion. */
2727 
2728 		btr_cur_search_to_nth_level(
2729 			index, 0, entry, PAGE_CUR_LE,
2730 			search_mode & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE),
2731 			&cursor, 0, __FILE__, __LINE__, &mtr);
2732 	}
2733 
2734 	if (row_ins_must_modify_rec(&cursor)) {
2735 		/* There is already an index entry with a long enough common
2736 		prefix, we must convert the insert into a modify of an
2737 		existing record */
2738 		offsets = rec_get_offsets(
2739 			btr_cur_get_rec(&cursor), index, offsets,
2740 			ULINT_UNDEFINED, &offsets_heap);
2741 
2742 		err = row_ins_sec_index_entry_by_modify(
2743 			flags, mode, &cursor, &offsets,
2744 			offsets_heap, heap, entry, thr, &mtr);
2745 	} else {
2746 		rec_t*		insert_rec;
2747 		big_rec_t*	big_rec;
2748 
2749 		if (mode == BTR_MODIFY_LEAF) {
2750 			err = btr_cur_optimistic_insert(
2751 				flags, &cursor, &offsets, &offsets_heap,
2752 				entry, &insert_rec,
2753 				&big_rec, 0, thr, &mtr);
2754 		} else {
2755 			ut_ad(mode == BTR_MODIFY_TREE);
2756 			if (buf_LRU_buf_pool_running_out()) {
2757 
2758 				err = DB_LOCK_TABLE_FULL;
2759 				goto func_exit;
2760 			}
2761 
2762 			err = btr_cur_optimistic_insert(
2763 				flags, &cursor,
2764 				&offsets, &offsets_heap,
2765 				entry, &insert_rec,
2766 				&big_rec, 0, thr, &mtr);
2767 			if (err == DB_FAIL) {
2768 				err = btr_cur_pessimistic_insert(
2769 					flags, &cursor,
2770 					&offsets, &offsets_heap,
2771 					entry, &insert_rec,
2772 					&big_rec, 0, thr, &mtr);
2773 			}
2774 		}
2775 
2776 		if (err == DB_SUCCESS && trx_id) {
2777 			page_update_max_trx_id(
2778 				btr_cur_get_block(&cursor),
2779 				btr_cur_get_page_zip(&cursor),
2780 				trx_id, &mtr);
2781 		}
2782 
2783 		ut_ad(!big_rec);
2784 	}
2785 
2786 func_exit:
2787 	mtr_commit(&mtr);
2788 	return(err);
2789 }
2790 
2791 /***************************************************************//**
2792 Tries to insert the externally stored fields (off-page columns)
2793 of a clustered index entry.
2794 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
2795 UNIV_INTERN
2796 dberr_t
row_ins_index_entry_big_rec_func(const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,dict_index_t * index,const char * file,const void * thd,ulint line)2797 row_ins_index_entry_big_rec_func(
2798 /*=============================*/
2799 	const dtuple_t*		entry,	/*!< in/out: index entry to insert */
2800 	const big_rec_t*	big_rec,/*!< in: externally stored fields */
2801 	ulint*			offsets,/*!< in/out: rec offsets */
2802 	mem_heap_t**		heap,	/*!< in/out: memory heap */
2803 	dict_index_t*		index,	/*!< in: index */
2804 	const char*		file,	/*!< in: file name of caller */
2805 #ifndef DBUG_OFF
2806 	const void*		thd,	/*!< in: connection, or NULL */
2807 #endif /* DBUG_OFF */
2808 	ulint			line)	/*!< in: line number of caller */
2809 {
2810 	mtr_t		mtr;
2811 	btr_cur_t	cursor;
2812 	rec_t*		rec;
2813 	dberr_t		error;
2814 
2815 	ut_ad(dict_index_is_clust(index));
2816 
2817 	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
2818 
2819 	mtr_start(&mtr);
2820 	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2821 				    BTR_MODIFY_TREE, &cursor, 0,
2822 				    file, line, &mtr);
2823 	rec = btr_cur_get_rec(&cursor);
2824 	offsets = rec_get_offsets(rec, index, offsets,
2825 				  ULINT_UNDEFINED, heap);
2826 
2827 	DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
2828 	error = btr_store_big_rec_extern_fields(
2829 		index, btr_cur_get_block(&cursor),
2830 		rec, offsets, big_rec, &mtr, BTR_STORE_INSERT);
2831 	DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
2832 
2833 	if (error == DB_SUCCESS
2834 	    && dict_index_is_online_ddl(index)) {
2835 		row_log_table_insert(rec, index, offsets);
2836 	}
2837 
2838 	mtr_commit(&mtr);
2839 
2840 	return(error);
2841 }
2842 
2843 /***************************************************************//**
2844 Inserts an entry into a clustered index. Tries first optimistic,
2845 then pessimistic descent down the tree. If the entry matches enough
2846 to a delete marked record, performs the insert by updating or delete
2847 unmarking the delete marked record.
2848 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2849 UNIV_INTERN
2850 dberr_t
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,ulint n_ext)2851 row_ins_clust_index_entry(
2852 /*======================*/
2853 	dict_index_t*	index,	/*!< in: clustered index */
2854 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2855 	que_thr_t*	thr,	/*!< in: query thread */
2856 	ulint		n_ext)	/*!< in: number of externally stored columns */
2857 {
2858 	dberr_t	err;
2859 	ulint	n_uniq;
2860 
2861 	if (!index->table->foreign_set.empty()) {
2862 		err = row_ins_check_foreign_constraints(
2863 			index->table, index, entry, thr);
2864 		if (err != DB_SUCCESS) {
2865 
2866 			return(err);
2867 		}
2868 	}
2869 
2870 	n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
2871 
2872 	/* Try first optimistic descent to the B-tree */
2873 
2874 	log_free_check();
2875 
2876 	err = row_ins_clust_index_entry_low(
2877 		0, BTR_MODIFY_LEAF, index, n_uniq, entry, n_ext, thr);
2878 
2879 #ifdef UNIV_DEBUG
2880 	/* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
2881 	Once it is fixed, remove the 'ifdef', 'if' and this comment. */
2882 	if (!thr_get_trx(thr)->ddl) {
2883 		DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
2884 				    "after_row_ins_clust_index_entry_leaf");
2885 	}
2886 #endif /* UNIV_DEBUG */
2887 
2888 	if (err != DB_FAIL) {
2889 		DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
2890 		return(err);
2891 	}
2892 
2893 	/* Try then pessimistic descent to the B-tree */
2894 
2895 	log_free_check();
2896 
2897 	return(row_ins_clust_index_entry_low(
2898 		       0, BTR_MODIFY_TREE, index, n_uniq, entry, n_ext, thr));
2899 }
2900 
2901 /***************************************************************//**
2902 Inserts an entry into a secondary index. Tries first optimistic,
2903 then pessimistic descent down the tree. If the entry matches enough
2904 to a delete marked record, performs the insert by updating or delete
2905 unmarking the delete marked record.
2906 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2907 UNIV_INTERN
2908 dberr_t
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)2909 row_ins_sec_index_entry(
2910 /*====================*/
2911 	dict_index_t*	index,	/*!< in: secondary index */
2912 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2913 	que_thr_t*	thr)	/*!< in: query thread */
2914 {
2915 	dberr_t		err;
2916 	mem_heap_t*	offsets_heap;
2917 	mem_heap_t*	heap;
2918 
2919 	if (!index->table->foreign_set.empty()) {
2920 		err = row_ins_check_foreign_constraints(index->table, index,
2921 							entry, thr);
2922 		if (err != DB_SUCCESS) {
2923 
2924 			return(err);
2925 		}
2926 	}
2927 
2928 	ut_ad(thr_get_trx(thr)->id);
2929 
2930 	offsets_heap = mem_heap_create(1024);
2931 	heap = mem_heap_create(1024);
2932 
2933 	/* Try first optimistic descent to the B-tree */
2934 
2935 	log_free_check();
2936 
2937 	err = row_ins_sec_index_entry_low(
2938 		0, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry, 0, thr);
2939 	if (err == DB_FAIL) {
2940 		mem_heap_empty(heap);
2941 
2942 		/* Try then pessimistic descent to the B-tree */
2943 
2944 		log_free_check();
2945 
2946 		err = row_ins_sec_index_entry_low(
2947 			0, BTR_MODIFY_TREE, index,
2948 			offsets_heap, heap, entry, 0, thr);
2949 	}
2950 
2951 	mem_heap_free(heap);
2952 	mem_heap_free(offsets_heap);
2953 	return(err);
2954 }
2955 
2956 /***************************************************************//**
2957 Inserts an index entry to index. Tries first optimistic, then pessimistic
2958 descent down the tree. If the entry matches enough to a delete marked record,
2959 performs the insert by updating or delete unmarking the delete marked
2960 record.
2961 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2962 static
2963 dberr_t
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)2964 row_ins_index_entry(
2965 /*================*/
2966 	dict_index_t*	index,	/*!< in: index */
2967 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2968 	que_thr_t*	thr)	/*!< in: query thread */
2969 {
2970 	DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
2971 			DBUG_SET("-d,row_ins_index_entry_timeout");
2972 			return(DB_LOCK_WAIT);});
2973 
2974 	if (dict_index_is_clust(index)) {
2975 		return(row_ins_clust_index_entry(index, entry, thr, 0));
2976 	} else {
2977 		return(row_ins_sec_index_entry(index, entry, thr));
2978 	}
2979 }
2980 
2981 /***********************************************************//**
2982 Sets the values of the dtuple fields in entry from the values of appropriate
2983 columns in row. */
2984 static MY_ATTRIBUTE((nonnull))
2985 void
row_ins_index_entry_set_vals(dict_index_t * index,dtuple_t * entry,const dtuple_t * row)2986 row_ins_index_entry_set_vals(
2987 /*=========================*/
2988 	dict_index_t*	index,	/*!< in: index */
2989 	dtuple_t*	entry,	/*!< in: index entry to make */
2990 	const dtuple_t*	row)	/*!< in: row */
2991 {
2992 	ulint	n_fields;
2993 	ulint	i;
2994 
2995 	n_fields = dtuple_get_n_fields(entry);
2996 
2997 	for (i = 0; i < n_fields; i++) {
2998 		dict_field_t*	ind_field;
2999 		dfield_t*	field;
3000 		const dfield_t*	row_field;
3001 		ulint		len;
3002 
3003 		field = dtuple_get_nth_field(entry, i);
3004 		ind_field = dict_index_get_nth_field(index, i);
3005 		row_field = dtuple_get_nth_field(row, ind_field->col->ind);
3006 		len = dfield_get_len(row_field);
3007 
3008 		/* Check column prefix indexes */
3009 		if (ind_field->prefix_len > 0
3010 		    && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3011 
3012 			const	dict_col_t*	col
3013 				= dict_field_get_col(ind_field);
3014 
3015 			len = dtype_get_at_most_n_mbchars(
3016 				col->prtype, col->mbminmaxlen,
3017 				ind_field->prefix_len,
3018 				len,
3019 				static_cast<const char*>(
3020 					dfield_get_data(row_field)));
3021 
3022 			ut_ad(!dfield_is_ext(row_field));
3023 		}
3024 
3025 		dfield_set_data(field, dfield_get_data(row_field), len);
3026 		if (dfield_is_ext(row_field)) {
3027 			ut_ad(dict_index_is_clust(index));
3028 			dfield_set_ext(field);
3029 		}
3030 	}
3031 }
3032 
3033 /***********************************************************//**
3034 Inserts a single index entry to the table.
3035 @return DB_SUCCESS if operation successfully completed, else error
3036 code or DB_LOCK_WAIT */
3037 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3038 dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3039 row_ins_index_entry_step(
3040 /*=====================*/
3041 	ins_node_t*	node,	/*!< in: row insert node */
3042 	que_thr_t*	thr)	/*!< in: query thread */
3043 {
3044 	dberr_t	err;
3045 
3046 	ut_ad(dtuple_check_typed(node->row));
3047 
3048 	row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3049 
3050 	ut_ad(dtuple_check_typed(node->entry));
3051 
3052 	err = row_ins_index_entry(node->index, node->entry, thr);
3053 
3054 #ifdef UNIV_DEBUG
3055 	/* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
3056 	Once it is fixed, remove the 'ifdef', 'if' and this comment. */
3057 	if (!thr_get_trx(thr)->ddl) {
3058 		DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3059 				    "after_row_ins_index_entry_step");
3060 	}
3061 #endif /* UNIV_DEBUG */
3062 
3063 	return(err);
3064 }
3065 
3066 /***********************************************************//**
3067 Allocates a row id for row and inits the node->index field. */
3068 UNIV_INLINE
3069 void
row_ins_alloc_row_id_step(ins_node_t * node)3070 row_ins_alloc_row_id_step(
3071 /*======================*/
3072 	ins_node_t*	node)	/*!< in: row insert node */
3073 {
3074 	row_id_t	row_id;
3075 
3076 	ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3077 
3078 	if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3079 
3080 		/* No row id is stored if the clustered index is unique */
3081 
3082 		return;
3083 	}
3084 
3085 	/* Fill in row id value to row */
3086 
3087 	row_id = dict_sys_get_new_row_id();
3088 
3089 	dict_sys_write_row_id(node->row_id_buf, row_id);
3090 }
3091 
3092 /***********************************************************//**
3093 Gets a row to insert from the values list. */
3094 UNIV_INLINE
3095 void
row_ins_get_row_from_values(ins_node_t * node)3096 row_ins_get_row_from_values(
3097 /*========================*/
3098 	ins_node_t*	node)	/*!< in: row insert node */
3099 {
3100 	que_node_t*	list_node;
3101 	dfield_t*	dfield;
3102 	dtuple_t*	row;
3103 	ulint		i;
3104 
3105 	/* The field values are copied in the buffers of the select node and
3106 	it is safe to use them until we fetch from select again: therefore
3107 	we can just copy the pointers */
3108 
3109 	row = node->row;
3110 
3111 	i = 0;
3112 	list_node = node->values_list;
3113 
3114 	while (list_node) {
3115 		eval_exp(list_node);
3116 
3117 		dfield = dtuple_get_nth_field(row, i);
3118 		dfield_copy_data(dfield, que_node_get_val(list_node));
3119 
3120 		i++;
3121 		list_node = que_node_get_next(list_node);
3122 	}
3123 }
3124 
3125 /***********************************************************//**
3126 Gets a row to insert from the select list. */
3127 UNIV_INLINE
3128 void
row_ins_get_row_from_select(ins_node_t * node)3129 row_ins_get_row_from_select(
3130 /*========================*/
3131 	ins_node_t*	node)	/*!< in: row insert node */
3132 {
3133 	que_node_t*	list_node;
3134 	dfield_t*	dfield;
3135 	dtuple_t*	row;
3136 	ulint		i;
3137 
3138 	/* The field values are copied in the buffers of the select node and
3139 	it is safe to use them until we fetch from select again: therefore
3140 	we can just copy the pointers */
3141 
3142 	row = node->row;
3143 
3144 	i = 0;
3145 	list_node = node->select->select_list;
3146 
3147 	while (list_node) {
3148 		dfield = dtuple_get_nth_field(row, i);
3149 		dfield_copy_data(dfield, que_node_get_val(list_node));
3150 
3151 		i++;
3152 		list_node = que_node_get_next(list_node);
3153 	}
3154 }
3155 
3156 /***********************************************************//**
3157 Inserts a row to a table.
3158 @return DB_SUCCESS if operation successfully completed, else error
3159 code or DB_LOCK_WAIT */
3160 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3161 dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3162 row_ins(
3163 /*====*/
3164 	ins_node_t*	node,	/*!< in: row insert node */
3165 	que_thr_t*	thr)	/*!< in: query thread */
3166 {
3167 	dberr_t	err;
3168 
3169 	if (node->state == INS_NODE_ALLOC_ROW_ID) {
3170 
3171 		row_ins_alloc_row_id_step(node);
3172 
3173 		node->index = dict_table_get_first_index(node->table);
3174 		node->entry = UT_LIST_GET_FIRST(node->entry_list);
3175 
3176 		if (node->ins_type == INS_SEARCHED) {
3177 
3178 			row_ins_get_row_from_select(node);
3179 
3180 		} else if (node->ins_type == INS_VALUES) {
3181 
3182 			row_ins_get_row_from_values(node);
3183 		}
3184 
3185 		node->state = INS_NODE_INSERT_ENTRIES;
3186 	}
3187 
3188 	ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3189 
3190 	while (node->index != NULL) {
3191 		if (node->index->type != DICT_FTS) {
3192 			err = row_ins_index_entry_step(node, thr);
3193 
3194 			if (err != DB_SUCCESS) {
3195 
3196 				return(err);
3197 			}
3198 		}
3199 
3200 		node->index = dict_table_get_next_index(node->index);
3201 		node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3202 
3203 		DBUG_EXECUTE_IF(
3204 			"row_ins_skip_sec",
3205 			node->index = NULL; node->entry = NULL; break;);
3206 
3207 		/* Skip corrupted secondary index and its entry */
3208 		while (node->index && dict_index_is_corrupted(node->index)) {
3209 
3210 			node->index = dict_table_get_next_index(node->index);
3211 			node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3212 		}
3213 	}
3214 
3215 	ut_ad(node->entry == NULL);
3216 
3217 	node->state = INS_NODE_ALLOC_ROW_ID;
3218 
3219 	return(DB_SUCCESS);
3220 }
3221 
3222 /***********************************************************//**
3223 Inserts a row to a table. This is a high-level function used in SQL execution
3224 graphs.
3225 @return	query thread to run next or NULL */
3226 UNIV_INTERN
3227 que_thr_t*
row_ins_step(que_thr_t * thr)3228 row_ins_step(
3229 /*=========*/
3230 	que_thr_t*	thr)	/*!< in: query thread */
3231 {
3232 	ins_node_t*	node;
3233 	que_node_t*	parent;
3234 	sel_node_t*	sel_node;
3235 	trx_t*		trx;
3236 	dberr_t		err;
3237 
3238 	ut_ad(thr);
3239 
3240 	trx = thr_get_trx(thr);
3241 
3242 	trx_start_if_not_started_xa(trx);
3243 
3244 	node = static_cast<ins_node_t*>(thr->run_node);
3245 
3246 	ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3247 
3248 	parent = que_node_get_parent(node);
3249 	sel_node = node->select;
3250 
3251 	if (thr->prev_node == parent) {
3252 		node->state = INS_NODE_SET_IX_LOCK;
3253 	}
3254 
3255 	/* If this is the first time this node is executed (or when
3256 	execution resumes after wait for the table IX lock), set an
3257 	IX lock on the table and reset the possible select node. MySQL's
3258 	partitioned table code may also call an insert within the same
3259 	SQL statement AFTER it has used this table handle to do a search.
3260 	This happens, for example, when a row update moves it to another
3261 	partition. In that case, we have already set the IX lock on the
3262 	table during the search operation, and there is no need to set
3263 	it again here. But we must write trx->id to node->trx_id_buf. */
3264 
3265 	trx_write_trx_id(node->trx_id_buf, trx->id);
3266 
3267 	if (node->state == INS_NODE_SET_IX_LOCK) {
3268 
3269 		node->state = INS_NODE_ALLOC_ROW_ID;
3270 
3271 		/* It may be that the current session has not yet started
3272 		its transaction, or it has been committed: */
3273 
3274 		if (trx->id == node->trx_id) {
3275 			/* No need to do IX-locking */
3276 
3277 			goto same_trx;
3278 		}
3279 
3280 		err = lock_table(0, node->table, LOCK_IX, thr);
3281 
3282 		DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3283 				err = DB_LOCK_WAIT;);
3284 
3285 		if (err != DB_SUCCESS) {
3286 
3287 			goto error_handling;
3288 		}
3289 
3290 		node->trx_id = trx->id;
3291 same_trx:
3292 		if (node->ins_type == INS_SEARCHED) {
3293 			/* Reset the cursor */
3294 			sel_node->state = SEL_NODE_OPEN;
3295 
3296 			/* Fetch a row to insert */
3297 
3298 			thr->run_node = sel_node;
3299 
3300 			return(thr);
3301 		}
3302 	}
3303 
3304 	if ((node->ins_type == INS_SEARCHED)
3305 	    && (sel_node->state != SEL_NODE_FETCH)) {
3306 
3307 		ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3308 
3309 		/* No more rows to insert */
3310 		thr->run_node = parent;
3311 
3312 		return(thr);
3313 	}
3314 
3315 	/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3316 
3317 	err = row_ins(node, thr);
3318 
3319 error_handling:
3320 	trx->error_state = err;
3321 
3322 	if (err != DB_SUCCESS) {
3323 		/* err == DB_LOCK_WAIT or SQL error detected */
3324 		return(NULL);
3325 	}
3326 
3327 	/* DO THE TRIGGER ACTIONS HERE */
3328 
3329 	if (node->ins_type == INS_SEARCHED) {
3330 		/* Fetch a row to insert */
3331 
3332 		thr->run_node = sel_node;
3333 	} else {
3334 		thr->run_node = que_node_get_parent(node);
3335 	}
3336 
3337 	return(thr);
3338 }
3339