1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2014, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
16 
17 *****************************************************************************/
18 
19 /**************************************************//**
20 @file row/row0ins.c
21 Insert into a table
22 
23 Created 4/20/1996 Heikki Tuuri
24 *******************************************************/
25 
26 #include "m_string.h" /* for my_sys.h */
27 #include "my_sys.h" /* DEBUG_SYNC_C */
28 #include "row0ins.h"
29 
30 #ifdef UNIV_NONINL
31 #include "row0ins.ic"
32 #endif
33 
34 #include "ha_prototypes.h"
35 #include "dict0dict.h"
36 #include "dict0boot.h"
37 #include "trx0undo.h"
38 #include "btr0btr.h"
39 #include "btr0cur.h"
40 #include "mach0data.h"
41 #include "que0que.h"
42 #include "row0upd.h"
43 #include "row0sel.h"
44 #include "row0row.h"
45 #include "rem0cmp.h"
46 #include "lock0lock.h"
47 #include "log0log.h"
48 #include "eval0eval.h"
49 #include "data0data.h"
50 #include "usr0sess.h"
51 #include "buf0lru.h"
52 #include "m_string.h"
53 #include "my_sys.h"
54 
55 #define	ROW_INS_PREV	1
56 #define	ROW_INS_NEXT	2
57 
58 /*************************************************************************
59 IMPORTANT NOTE: Any operation that generates redo MUST check that there
60 is enough space in the redo log before for that operation. This is
61 done by calling log_free_check(). The reason for checking the
62 availability of the redo log space before the start of the operation is
63 that we MUST not hold any synchonization objects when performing the
64 check.
65 If you make a change in this module make sure that no codepath is
66 introduced where a call to log_free_check() is bypassed. */
67 
68 /*********************************************************************//**
69 Creates an insert node struct.
70 @return	own: insert node struct */
71 UNIV_INTERN
72 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)73 ins_node_create(
74 /*============*/
75 	ulint		ins_type,	/*!< in: INS_VALUES, ... */
76 	dict_table_t*	table,		/*!< in: table where to insert */
77 	mem_heap_t*	heap)		/*!< in: mem heap where created */
78 {
79 	ins_node_t*	node;
80 
81 	node = mem_heap_alloc(heap, sizeof(ins_node_t));
82 
83 	node->common.type = QUE_NODE_INSERT;
84 
85 	node->ins_type = ins_type;
86 
87 	node->state = INS_NODE_SET_IX_LOCK;
88 	node->table = table;
89 	node->index = NULL;
90 	node->entry = NULL;
91 
92 	node->select = NULL;
93 
94 	node->trx_id = 0;
95 
96 	node->entry_sys_heap = mem_heap_create(128);
97 
98 	node->magic_n = INS_NODE_MAGIC_N;
99 
100 	return(node);
101 }
102 
103 /***********************************************************//**
104 Creates an entry template for each index of a table. */
105 UNIV_INTERN
106 void
ins_node_create_entry_list(ins_node_t * node)107 ins_node_create_entry_list(
108 /*=======================*/
109 	ins_node_t*	node)	/*!< in: row insert node */
110 {
111 	dict_index_t*	index;
112 	dtuple_t*	entry;
113 
114 	ut_ad(node->entry_sys_heap);
115 
116 	UT_LIST_INIT(node->entry_list);
117 
118 	index = dict_table_get_first_index(node->table);
119 
120 	while (index != NULL) {
121 		entry = row_build_index_entry(node->row, NULL, index,
122 					      node->entry_sys_heap);
123 		UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
124 
125 		/* We will include all indexes (include those corrupted
126 		secondary indexes) in the entry list. Filteration of
127 		these corrupted index will be done in row_ins() */
128 		index = dict_table_get_next_index(index);
129 	}
130 }
131 
132 /*****************************************************************//**
133 Adds system field buffers to a row. */
134 static
135 void
row_ins_alloc_sys_fields(ins_node_t * node)136 row_ins_alloc_sys_fields(
137 /*=====================*/
138 	ins_node_t*	node)	/*!< in: insert node */
139 {
140 	dtuple_t*		row;
141 	dict_table_t*		table;
142 	mem_heap_t*		heap;
143 	const dict_col_t*	col;
144 	dfield_t*		dfield;
145 	byte*			ptr;
146 	uint			len;
147 
148 	row = node->row;
149 	table = node->table;
150 	heap = node->entry_sys_heap;
151 
152 	ut_ad(row && table && heap);
153 	ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
154 
155 	/* allocate buffer to hold the needed system created hidden columns. */
156 	len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
157 	ptr = mem_heap_zalloc(heap, len);
158 
159 	/* 1. Populate row-id */
160 	col = dict_table_get_sys_col(table, DATA_ROW_ID);
161 
162 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
163 
164 	dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
165 
166 	node->row_id_buf = ptr;
167 
168 	ptr += DATA_ROW_ID_LEN;
169 
170 	/* 2. Populate trx id */
171 	col = dict_table_get_sys_col(table, DATA_TRX_ID);
172 
173 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
174 
175 	dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
176 
177 	node->trx_id_buf = ptr;
178 
179 	ptr += DATA_TRX_ID_LEN;
180 
181 	/* 3. Populate roll ptr */
182 
183 	col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
184 
185 	dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
186 
187 	dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
188 }
189 
190 /*********************************************************************//**
191 Sets a new row to insert for an INS_DIRECT node. This function is only used
192 if we have constructed the row separately, which is a rare case; this
193 function is quite slow. */
194 UNIV_INTERN
195 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)196 ins_node_set_new_row(
197 /*=================*/
198 	ins_node_t*	node,	/*!< in: insert node */
199 	dtuple_t*	row)	/*!< in: new row (or first row) for the node */
200 {
201 	node->state = INS_NODE_SET_IX_LOCK;
202 	node->index = NULL;
203 	node->entry = NULL;
204 
205 	node->row = row;
206 
207 	mem_heap_empty(node->entry_sys_heap);
208 
209 	/* Create templates for index entries */
210 
211 	ins_node_create_entry_list(node);
212 
213 	/* Allocate from entry_sys_heap buffers for sys fields */
214 
215 	row_ins_alloc_sys_fields(node);
216 
217 	/* As we allocated a new trx id buf, the trx id should be written
218 	there again: */
219 
220 	node->trx_id = 0;
221 }
222 
223 /*******************************************************************//**
224 Does an insert operation by updating a delete-marked existing record
225 in the index. This situation can occur if the delete-marked record is
226 kept in the index for consistent reads.
227 @return	DB_SUCCESS or error code */
228 static
229 ulint
row_ins_sec_index_entry_by_modify(ulint mode,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)230 row_ins_sec_index_entry_by_modify(
231 /*==============================*/
232 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
233 				depending on whether mtr holds just a leaf
234 				latch or also a tree latch */
235 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
236 	const dtuple_t*	entry,	/*!< in: index entry to insert */
237 	que_thr_t*	thr,	/*!< in: query thread */
238 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
239 				latching any further pages */
240 {
241 	big_rec_t*	dummy_big_rec;
242 	mem_heap_t*	heap;
243 	upd_t*		update;
244 	rec_t*		rec;
245 	ulint		err;
246 
247 	rec = btr_cur_get_rec(cursor);
248 
249 	ut_ad(!dict_index_is_clust(cursor->index));
250 	ut_ad(rec_get_deleted_flag(rec,
251 				   dict_table_is_comp(cursor->index->table)));
252 
253 	/* We know that in the alphabetical ordering, entry and rec are
254 	identified. But in their binary form there may be differences if
255 	there are char fields in them. Therefore we have to calculate the
256 	difference. */
257 
258 	heap = mem_heap_create(1024);
259 
260 	update = row_upd_build_sec_rec_difference_binary(
261 		cursor->index, entry, rec, thr_get_trx(thr), heap);
262 	if (mode == BTR_MODIFY_LEAF) {
263 		/* Try an optimistic updating of the record, keeping changes
264 		within the page */
265 
266 		err = btr_cur_optimistic_update(BTR_KEEP_SYS_FLAG, cursor,
267 						update, 0, thr, mtr);
268 		switch (err) {
269 		case DB_OVERFLOW:
270 		case DB_UNDERFLOW:
271 		case DB_ZIP_OVERFLOW:
272 			err = DB_FAIL;
273 		}
274 	} else {
275 		ut_a(mode == BTR_MODIFY_TREE);
276 		if (buf_LRU_buf_pool_running_out()) {
277 
278 			err = DB_LOCK_TABLE_FULL;
279 
280 			goto func_exit;
281 		}
282 
283 		err = btr_cur_pessimistic_update(BTR_KEEP_SYS_FLAG, cursor,
284 						 &heap, &dummy_big_rec, update,
285 						 0, thr, mtr);
286 		ut_ad(!dummy_big_rec);
287 	}
288 func_exit:
289 	mem_heap_free(heap);
290 
291 	return(err);
292 }
293 
294 /*******************************************************************//**
295 Does an insert operation by delete unmarking and updating a delete marked
296 existing record in the index. This situation can occur if the delete marked
297 record is kept in the index for consistent reads.
298 @return	DB_SUCCESS, DB_FAIL, or error code */
299 static
300 ulint
row_ins_clust_index_entry_by_modify(ulint mode,btr_cur_t * cursor,mem_heap_t ** heap,big_rec_t ** big_rec,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)301 row_ins_clust_index_entry_by_modify(
302 /*================================*/
303 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
304 				depending on whether mtr holds just a leaf
305 				latch or also a tree latch */
306 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
307 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
308 	big_rec_t**	big_rec,/*!< out: possible big rec vector of fields
309 				which have to be stored externally by the
310 				caller */
311 	const dtuple_t*	entry,	/*!< in: index entry to insert */
312 	que_thr_t*	thr,	/*!< in: query thread */
313 	mtr_t*		mtr)	/*!< in: mtr; must be committed before
314 				latching any further pages */
315 {
316 	rec_t*		rec;
317 	upd_t*		update;
318 	ulint		err;
319 
320 	ut_ad(dict_index_is_clust(cursor->index));
321 
322 	*big_rec = NULL;
323 
324 	rec = btr_cur_get_rec(cursor);
325 
326 	ut_ad(rec_get_deleted_flag(rec,
327 				   dict_table_is_comp(cursor->index->table)));
328 
329 	if (!*heap) {
330 		*heap = mem_heap_create(1024);
331 	}
332 
333 	/* Build an update vector containing all the fields to be modified;
334 	NOTE that this vector may NOT contain system columns trx_id or
335 	roll_ptr */
336 
337 	update = row_upd_build_difference_binary(cursor->index, entry, rec,
338 						 thr_get_trx(thr), *heap);
339 	if (mode == BTR_MODIFY_LEAF) {
340 		/* Try optimistic updating of the record, keeping changes
341 		within the page */
342 
343 		err = btr_cur_optimistic_update(0, cursor, update, 0, thr,
344 						mtr);
345 		switch (err) {
346 		case DB_OVERFLOW:
347 		case DB_UNDERFLOW:
348 		case DB_ZIP_OVERFLOW:
349 			err = DB_FAIL;
350 		}
351 	} else {
352 		ut_a(mode == BTR_MODIFY_TREE);
353 		if (buf_LRU_buf_pool_running_out()) {
354 
355 			return(DB_LOCK_TABLE_FULL);
356 
357 		}
358 		err = btr_cur_pessimistic_update(
359 			BTR_KEEP_POS_FLAG, cursor, heap, big_rec, update,
360 			0, thr, mtr);
361 	}
362 
363 	return(err);
364 }
365 
366 /*********************************************************************//**
367 Returns TRUE if in a cascaded update/delete an ancestor node of node
368 updates (not DELETE, but UPDATE) table.
369 @return	TRUE if an ancestor updates table */
370 static
371 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)372 row_ins_cascade_ancestor_updates_table(
373 /*===================================*/
374 	que_node_t*	node,	/*!< in: node in a query graph */
375 	dict_table_t*	table)	/*!< in: table */
376 {
377 	que_node_t*	parent;
378 	upd_node_t*	upd_node;
379 
380 	parent = que_node_get_parent(node);
381 
382 	while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
383 
384 		upd_node = parent;
385 
386 		if (upd_node->table == table && upd_node->is_delete == FALSE) {
387 
388 			return(TRUE);
389 		}
390 
391 		parent = que_node_get_parent(parent);
392 
393 		ut_a(parent);
394 	}
395 
396 	return(FALSE);
397 }
398 
399 /*********************************************************************//**
400 Returns the number of ancestor UPDATE or DELETE nodes of a
401 cascaded update/delete node.
402 @return	number of ancestors */
403 static
404 ulint
row_ins_cascade_n_ancestors(que_node_t * node)405 row_ins_cascade_n_ancestors(
406 /*========================*/
407 	que_node_t*	node)	/*!< in: node in a query graph */
408 {
409 	que_node_t*	parent;
410 	ulint		n_ancestors = 0;
411 
412 	parent = que_node_get_parent(node);
413 
414 	while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
415 		n_ancestors++;
416 
417 		parent = que_node_get_parent(parent);
418 
419 		ut_a(parent);
420 	}
421 
422 	return(n_ancestors);
423 }
424 
425 /******************************************************************//**
426 Calculates the update vector node->cascade->update for a child table in
427 a cascaded update.
428 @return number of fields in the calculated update vector; the value
429 can also be 0 if no foreign key fields changed; the returned value is
430 ULINT_UNDEFINED if the column type in the child table is too short to
431 fit the new value in the parent table: that means the update fails */
432 static
433 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap)434 row_ins_cascade_calc_update_vec(
435 /*============================*/
436 	upd_node_t*	node,		/*!< in: update node of the parent
437 					table */
438 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
439 					type is != 0 */
440 	mem_heap_t*	heap)		/*!< in: memory heap to use as
441 					temporary storage */
442 {
443 	upd_node_t*	cascade		= node->cascade_node;
444 	dict_table_t*	table		= foreign->foreign_table;
445 	dict_index_t*	index		= foreign->foreign_index;
446 	upd_t*		update;
447 	dict_table_t*	parent_table;
448 	dict_index_t*	parent_index;
449 	upd_t*		parent_update;
450 	ulint		n_fields_updated;
451 	ulint		parent_field_no;
452 	ulint		i;
453 	ulint		j;
454 
455 	ut_a(node);
456 	ut_a(foreign);
457 	ut_a(cascade);
458 	ut_a(table);
459 	ut_a(index);
460 
461 	/* Calculate the appropriate update vector which will set the fields
462 	in the child index record to the same value (possibly padded with
463 	spaces if the column is a fixed length CHAR or FIXBINARY column) as
464 	the referenced index record will get in the update. */
465 
466 	parent_table = node->table;
467 	ut_a(parent_table == foreign->referenced_table);
468 	parent_index = foreign->referenced_index;
469 	parent_update = node->update;
470 
471 	update = cascade->update;
472 
473 	update->info_bits = 0;
474 	update->n_fields = foreign->n_fields;
475 
476 	n_fields_updated = 0;
477 
478 	for (i = 0; i < foreign->n_fields; i++) {
479 
480 		parent_field_no = dict_table_get_nth_col_pos(
481 			parent_table,
482 			dict_index_get_nth_col_no(parent_index, i));
483 
484 		for (j = 0; j < parent_update->n_fields; j++) {
485 			const upd_field_t*	parent_ufield
486 				= &parent_update->fields[j];
487 
488 			if (parent_ufield->field_no == parent_field_no) {
489 
490 				ulint			min_size;
491 				const dict_col_t*	col;
492 				ulint			ufield_len;
493 				upd_field_t*		ufield;
494 
495 				col = dict_index_get_nth_col(index, i);
496 
497 				/* A field in the parent index record is
498 				updated. Let us make the update vector
499 				field for the child table. */
500 
501 				ufield = update->fields + n_fields_updated;
502 
503 				ufield->field_no
504 					= dict_table_get_nth_col_pos(
505 					table, dict_col_get_no(col));
506 
507 				ufield->orig_len = 0;
508 				ufield->exp = NULL;
509 
510 				ufield->new_val = parent_ufield->new_val;
511 				ufield_len = dfield_get_len(&ufield->new_val);
512 
513 				/* Clear the "external storage" flag */
514 				dfield_set_len(&ufield->new_val, ufield_len);
515 
516 				/* Do not allow a NOT NULL column to be
517 				updated as NULL */
518 
519 				if (dfield_is_null(&ufield->new_val)
520 				    && (col->prtype & DATA_NOT_NULL)) {
521 
522 					return(ULINT_UNDEFINED);
523 				}
524 
525 				/* If the new value would not fit in the
526 				column, do not allow the update */
527 
528 				if (!dfield_is_null(&ufield->new_val)
529 				    && dtype_get_at_most_n_mbchars(
530 					col->prtype, col->mbminmaxlen,
531 					col->len,
532 					ufield_len,
533 					dfield_get_data(&ufield->new_val))
534 				    < ufield_len) {
535 
536 					return(ULINT_UNDEFINED);
537 				}
538 
539 				/* If the parent column type has a different
540 				length than the child column type, we may
541 				need to pad with spaces the new value of the
542 				child column */
543 
544 				min_size = dict_col_get_min_size(col);
545 
546 				/* Because UNIV_SQL_NULL (the marker
547 				of SQL NULL values) exceeds all possible
548 				values of min_size, the test below will
549 				not hold for SQL NULL columns. */
550 
551 				if (min_size > ufield_len) {
552 
553 					byte*	pad;
554 					ulint	pad_len;
555 					byte*	padded_data;
556 					ulint	mbminlen;
557 
558 					padded_data = mem_heap_alloc(
559 						heap, min_size);
560 
561 					pad = padded_data + ufield_len;
562 					pad_len = min_size - ufield_len;
563 
564 					memcpy(padded_data,
565 					       dfield_get_data(&ufield
566 							       ->new_val),
567 					       ufield_len);
568 
569 					mbminlen = dict_col_get_mbminlen(col);
570 
571 					ut_ad(!(ufield_len % mbminlen));
572 					ut_ad(!(min_size % mbminlen));
573 
574 					if (mbminlen == 1
575 					    && dtype_get_charset_coll(
576 						    col->prtype)
577 					    == DATA_MYSQL_BINARY_CHARSET_COLL) {
578 						/* Do not pad BINARY columns */
579 						return(ULINT_UNDEFINED);
580 					}
581 
582 					row_mysql_pad_col(mbminlen,
583 							  pad, pad_len);
584 					dfield_set_data(&ufield->new_val,
585 							padded_data, min_size);
586 				}
587 
588 				n_fields_updated++;
589 			}
590 		}
591 	}
592 
593 	update->n_fields = n_fields_updated;
594 
595 	return(n_fields_updated);
596 }
597 
598 /*********************************************************************//**
599 Set detailed error message associated with foreign key errors for
600 the given transaction. */
601 static
602 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)603 row_ins_set_detailed(
604 /*=================*/
605 	trx_t*		trx,		/*!< in: transaction */
606 	dict_foreign_t*	foreign)	/*!< in: foreign key constraint */
607 {
608 	mutex_enter(&srv_misc_tmpfile_mutex);
609 	rewind(srv_misc_tmpfile);
610 
611 	if (os_file_set_eof(srv_misc_tmpfile)) {
612 		ut_print_name(srv_misc_tmpfile, trx, TRUE,
613 			      foreign->foreign_table_name);
614 		dict_print_info_on_foreign_key_in_create_format(
615 			srv_misc_tmpfile, trx, foreign, FALSE);
616 		trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
617 	} else {
618 		trx_set_detailed_error(trx, "temp file operation failed");
619 	}
620 
621 	mutex_exit(&srv_misc_tmpfile_mutex);
622 }
623 
624 /*********************************************************************//**
625 Reports a foreign key error associated with an update or a delete of a
626 parent table index entry. */
627 static
628 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)629 row_ins_foreign_report_err(
630 /*=======================*/
631 	const char*	errstr,		/*!< in: error string from the viewpoint
632 					of the parent table */
633 	que_thr_t*	thr,		/*!< in: query thread whose run_node
634 					is an update node */
635 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
636 	const rec_t*	rec,		/*!< in: a matching index record in the
637 					child table */
638 	const dtuple_t*	entry)		/*!< in: index entry in the parent
639 					table */
640 {
641 	FILE*	ef	= dict_foreign_err_file;
642 	trx_t*	trx	= thr_get_trx(thr);
643 
644 	row_ins_set_detailed(trx, foreign);
645 
646 	mutex_enter(&dict_foreign_err_mutex);
647 	rewind(ef);
648 	ut_print_timestamp(ef);
649 	fputs(" Transaction:\n", ef);
650 	trx_print(ef, trx, 600);
651 
652 	fputs("Foreign key constraint fails for table ", ef);
653 	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
654 	fputs(":\n", ef);
655 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
656 							TRUE);
657 	putc('\n', ef);
658 	fputs(errstr, ef);
659 	fputs(" in parent table, in index ", ef);
660 	ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
661 	if (entry) {
662 		fputs(" tuple:\n", ef);
663 		dtuple_print(ef, entry);
664 	}
665 	fputs("\nBut in child table ", ef);
666 	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
667 	fputs(", in index ", ef);
668 	ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
669 	if (rec) {
670 		fputs(", there is a record:\n", ef);
671 		rec_print(ef, rec, foreign->foreign_index);
672 	} else {
673 		fputs(", the record is not available\n", ef);
674 	}
675 	putc('\n', ef);
676 
677 	mutex_exit(&dict_foreign_err_mutex);
678 }
679 
680 /*********************************************************************//**
681 Reports a foreign key error to dict_foreign_err_file when we are trying
682 to add an index entry to a child table. Note that the adding may be the result
683 of an update, too. */
684 static
685 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)686 row_ins_foreign_report_add_err(
687 /*===========================*/
688 	trx_t*		trx,		/*!< in: transaction */
689 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint */
690 	const rec_t*	rec,		/*!< in: a record in the parent table:
691 					it does not match entry because we
692 					have an error! */
693 	const dtuple_t*	entry)		/*!< in: index entry to insert in the
694 					child table */
695 {
696 	FILE*	ef	= dict_foreign_err_file;
697 
698 	row_ins_set_detailed(trx, foreign);
699 
700 	mutex_enter(&dict_foreign_err_mutex);
701 	rewind(ef);
702 	ut_print_timestamp(ef);
703 	fputs(" Transaction:\n", ef);
704 	trx_print(ef, trx, 600);
705 	fputs("Foreign key constraint fails for table ", ef);
706 	ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
707 	fputs(":\n", ef);
708 	dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
709 							TRUE);
710 	fputs("\nTrying to add in child table, in index ", ef);
711 	ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
712 	if (entry) {
713 		fputs(" tuple:\n", ef);
714 		/* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
715 		It would be better to only display the user columns. */
716 		dtuple_print(ef, entry);
717 	}
718 	fputs("\nBut in parent table ", ef);
719 	ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
720 	fputs(", in index ", ef);
721 	ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
722 	fputs(",\nthe closest match we can find is record:\n", ef);
723 	if (rec && page_rec_is_supremum(rec)) {
724 		/* If the cursor ended on a supremum record, it is better
725 		to report the previous record in the error message, so that
726 		the user gets a more descriptive error message. */
727 		rec = page_rec_get_prev_const(rec);
728 	}
729 
730 	if (rec) {
731 		rec_print(ef, rec, foreign->referenced_index);
732 	}
733 	putc('\n', ef);
734 
735 	mutex_exit(&dict_foreign_err_mutex);
736 }
737 
738 /*********************************************************************//**
739 Invalidate the query cache for the given table. */
740 static
741 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)742 row_ins_invalidate_query_cache(
743 /*===========================*/
744 	que_thr_t*	thr,		/*!< in: query thread whose run_node
745 					is an update node */
746 	const char*	name)		/*!< in: table name prefixed with
747 					database name and a '/' character */
748 {
749 	char*	buf;
750 	char*	ptr;
751 	ulint	len = strlen(name) + 1;
752 
753 	buf = mem_strdupl(name, len);
754 
755 	ptr = strchr(buf, '/');
756 	ut_a(ptr);
757 	*ptr = '\0';
758 
759 	innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
760 	mem_free(buf);
761 }
762 
763 /*********************************************************************//**
764 Perform referential actions or checks when a parent row is deleted or updated
765 and the constraint had an ON DELETE or ON UPDATE condition which was not
766 RESTRICT.
767 @return	DB_SUCCESS, DB_LOCK_WAIT, or error code */
768 static
769 ulint
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)770 row_ins_foreign_check_on_constraint(
771 /*================================*/
772 	que_thr_t*	thr,		/*!< in: query thread whose run_node
773 					is an update node */
774 	dict_foreign_t*	foreign,	/*!< in: foreign key constraint whose
775 					type is != 0 */
776 	btr_pcur_t*	pcur,		/*!< in: cursor placed on a matching
777 					index record in the child table */
778 	dtuple_t*	entry,		/*!< in: index entry in the parent
779 					table */
780 	mtr_t*		mtr)		/*!< in: mtr holding the latch of pcur
781 					page */
782 {
783 	upd_node_t*	node;
784 	upd_node_t*	cascade;
785 	dict_table_t*	table		= foreign->foreign_table;
786 	dict_index_t*	index;
787 	dict_index_t*	clust_index;
788 	dtuple_t*	ref;
789 	mem_heap_t*	upd_vec_heap	= NULL;
790 	const rec_t*	rec;
791 	const rec_t*	clust_rec;
792 	const buf_block_t* clust_block;
793 	upd_t*		update;
794 	ulint		n_to_update;
795 	ulint		err;
796 	ulint		i;
797 	trx_t*		trx;
798 	mem_heap_t*	tmp_heap	= NULL;
799 
800 	ut_a(thr);
801 	ut_a(foreign);
802 	ut_a(pcur);
803 	ut_a(mtr);
804 
805 	trx = thr_get_trx(thr);
806 
807 	/* Since we are going to delete or update a row, we have to invalidate
808 	the MySQL query cache for table. A deadlock of threads is not possible
809 	here because the caller of this function does not hold any latches with
810 	the sync0sync.h rank above the kernel mutex. The query cache mutex has
811 	a rank just above the kernel mutex. */
812 
813 	row_ins_invalidate_query_cache(thr, table->name);
814 
815 	node = thr->run_node;
816 
817 	if (node->is_delete && 0 == (foreign->type
818 				     & (DICT_FOREIGN_ON_DELETE_CASCADE
819 					| DICT_FOREIGN_ON_DELETE_SET_NULL))) {
820 
821 		row_ins_foreign_report_err("Trying to delete",
822 					   thr, foreign,
823 					   btr_pcur_get_rec(pcur), entry);
824 
825 		return(DB_ROW_IS_REFERENCED);
826 	}
827 
828 	if (!node->is_delete && 0 == (foreign->type
829 				      & (DICT_FOREIGN_ON_UPDATE_CASCADE
830 					 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
831 
832 		/* This is an UPDATE */
833 
834 		row_ins_foreign_report_err("Trying to update",
835 					   thr, foreign,
836 					   btr_pcur_get_rec(pcur), entry);
837 
838 		return(DB_ROW_IS_REFERENCED);
839 	}
840 
841 	if (node->cascade_node == NULL) {
842 		/* Extend our query graph by creating a child to current
843 		update node. The child is used in the cascade or set null
844 		operation. */
845 
846 		node->cascade_heap = mem_heap_create(128);
847 		node->cascade_node = row_create_update_node_for_mysql(
848 			table, node->cascade_heap);
849 		que_node_set_parent(node->cascade_node, node);
850 	}
851 
852 	/* Initialize cascade_node to do the operation we want. Note that we
853 	use the SAME cascade node to do all foreign key operations of the
854 	SQL DELETE: the table of the cascade node may change if there are
855 	several child tables to the table where the delete is done! */
856 
857 	cascade = node->cascade_node;
858 
859 	cascade->table = table;
860 
861 	cascade->foreign = foreign;
862 
863 	if (node->is_delete
864 	    && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
865 		cascade->is_delete = TRUE;
866 	} else {
867 		cascade->is_delete = FALSE;
868 
869 		if (foreign->n_fields > cascade->update_n_fields) {
870 			/* We have to make the update vector longer */
871 
872 			cascade->update = upd_create(foreign->n_fields,
873 						     node->cascade_heap);
874 			cascade->update_n_fields = foreign->n_fields;
875 		}
876 	}
877 
878 	/* We do not allow cyclic cascaded updating (DELETE is allowed,
879 	but not UPDATE) of the same table, as this can lead to an infinite
880 	cycle. Check that we are not updating the same table which is
881 	already being modified in this cascade chain. We have to check
882 	this also because the modification of the indexes of a 'parent'
883 	table may still be incomplete, and we must avoid seeing the indexes
884 	of the parent table in an inconsistent state! */
885 
886 	if (!cascade->is_delete
887 	    && row_ins_cascade_ancestor_updates_table(cascade, table)) {
888 
889 		/* We do not know if this would break foreign key
890 		constraints, but play safe and return an error */
891 
892 		err = DB_ROW_IS_REFERENCED;
893 
894 		row_ins_foreign_report_err(
895 			"Trying an update, possibly causing a cyclic"
896 			" cascaded update\n"
897 			"in the child table,", thr, foreign,
898 			btr_pcur_get_rec(pcur), entry);
899 
900 		goto nonstandard_exit_func;
901 	}
902 
903 	if (row_ins_cascade_n_ancestors(cascade) >= 15) {
904 		err = DB_ROW_IS_REFERENCED;
905 
906 		row_ins_foreign_report_err(
907 			"Trying a too deep cascaded delete or update\n",
908 			thr, foreign, btr_pcur_get_rec(pcur), entry);
909 
910 		goto nonstandard_exit_func;
911 	}
912 
913 	index = btr_pcur_get_btr_cur(pcur)->index;
914 
915 	ut_a(index == foreign->foreign_index);
916 
917 	rec = btr_pcur_get_rec(pcur);
918 
919 	if (dict_index_is_clust(index)) {
920 		/* pcur is already positioned in the clustered index of
921 		the child table */
922 
923 		clust_index = index;
924 		clust_rec = rec;
925 		clust_block = btr_pcur_get_block(pcur);
926 	} else {
927 		/* We have to look for the record in the clustered index
928 		in the child table */
929 
930 		clust_index = dict_table_get_first_index(table);
931 
932 		tmp_heap = mem_heap_create(256);
933 
934 		ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
935 					tmp_heap);
936 		btr_pcur_open_with_no_init(clust_index, ref,
937 					   PAGE_CUR_LE, BTR_SEARCH_LEAF,
938 					   cascade->pcur, 0, mtr);
939 
940 		clust_rec = btr_pcur_get_rec(cascade->pcur);
941 		clust_block = btr_pcur_get_block(cascade->pcur);
942 
943 		if (!page_rec_is_user_rec(clust_rec)
944 		    || btr_pcur_get_low_match(cascade->pcur)
945 		    < dict_index_get_n_unique(clust_index)) {
946 
947 			fputs("InnoDB: error in cascade of a foreign key op\n"
948 			      "InnoDB: ", stderr);
949 			dict_index_name_print(stderr, trx, index);
950 
951 			fputs("\n"
952 			      "InnoDB: record ", stderr);
953 			rec_print(stderr, rec, index);
954 			fputs("\n"
955 			      "InnoDB: clustered record ", stderr);
956 			rec_print(stderr, clust_rec, clust_index);
957 			fputs("\n"
958 			      "InnoDB: Submit a detailed bug report to"
959 			      " http://bugs.mysql.com\n", stderr);
960 			ut_ad(0);
961 			err = DB_SUCCESS;
962 
963 			goto nonstandard_exit_func;
964 		}
965 	}
966 
967 	/* Set an X-lock on the row to delete or update in the child table */
968 
969 	err = lock_table(0, table, LOCK_IX, thr);
970 
971 	if (err == DB_SUCCESS) {
972 		/* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
973 		we already have a normal shared lock on the appropriate
974 		gap if the search criterion was not unique */
975 
976 		err = lock_clust_rec_read_check_and_lock_alt(
977 			0, clust_block, clust_rec, clust_index,
978 			LOCK_X, LOCK_REC_NOT_GAP, thr);
979 	}
980 
981 	if (err != DB_SUCCESS) {
982 
983 		goto nonstandard_exit_func;
984 	}
985 
986 	if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
987 		/* This can happen if there is a circular reference of
988 		rows such that cascading delete comes to delete a row
989 		already in the process of being delete marked */
990 		err = DB_SUCCESS;
991 
992 		goto nonstandard_exit_func;
993 	}
994 
995 	if (node->is_delete
996 	    ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
997 	    : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
998 
999 		/* Build the appropriate update vector which sets
1000 		foreign->n_fields first fields in rec to SQL NULL */
1001 
1002 		update = cascade->update;
1003 
1004 		update->info_bits = 0;
1005 		update->n_fields = foreign->n_fields;
1006 		UNIV_MEM_INVALID(update->fields,
1007 				 update->n_fields * sizeof *update->fields);
1008 
1009 		for (i = 0; i < foreign->n_fields; i++) {
1010 			upd_field_t*	ufield = &update->fields[i];
1011 
1012 			ufield->field_no = dict_table_get_nth_col_pos(
1013 				table,
1014 				dict_index_get_nth_col_no(index, i));
1015 			ufield->orig_len = 0;
1016 			ufield->exp = NULL;
1017 			dfield_set_null(&ufield->new_val);
1018 		}
1019 	}
1020 
1021 	if (!node->is_delete
1022 	    && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1023 
1024 		/* Build the appropriate update vector which sets changing
1025 		foreign->n_fields first fields in rec to new values */
1026 
1027 		upd_vec_heap = mem_heap_create(256);
1028 
1029 		n_to_update = row_ins_cascade_calc_update_vec(node, foreign,
1030 							      upd_vec_heap);
1031 		if (n_to_update == ULINT_UNDEFINED) {
1032 			err = DB_ROW_IS_REFERENCED;
1033 
1034 			row_ins_foreign_report_err(
1035 				"Trying a cascaded update where the"
1036 				" updated value in the child\n"
1037 				"table would not fit in the length"
1038 				" of the column, or the value would\n"
1039 				"be NULL and the column is"
1040 				" declared as not NULL in the child table,",
1041 				thr, foreign, btr_pcur_get_rec(pcur), entry);
1042 
1043 			goto nonstandard_exit_func;
1044 		}
1045 
1046 		if (cascade->update->n_fields == 0) {
1047 
1048 			/* The update does not change any columns referred
1049 			to in this foreign key constraint: no need to do
1050 			anything */
1051 
1052 			err = DB_SUCCESS;
1053 
1054 			goto nonstandard_exit_func;
1055 		}
1056 	}
1057 
1058 	/* Store pcur position and initialize or store the cascade node
1059 	pcur stored position */
1060 
1061 	btr_pcur_store_position(pcur, mtr);
1062 
1063 	if (index == clust_index) {
1064 		btr_pcur_copy_stored_position(cascade->pcur, pcur);
1065 	} else {
1066 		btr_pcur_store_position(cascade->pcur, mtr);
1067 	}
1068 
1069 	mtr_commit(mtr);
1070 
1071 	ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1072 
1073 	cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1074 
1075 	err = row_update_cascade_for_mysql(thr, cascade,
1076 					   foreign->foreign_table);
1077 
1078 	if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
1079 		fprintf(stderr,
1080 			"InnoDB: error: table %s has the counter 0"
1081 			" though there is\n"
1082 			"InnoDB: a FOREIGN KEY check running on it.\n",
1083 			foreign->foreign_table->name);
1084 	}
1085 
1086 	/* Release the data dictionary latch for a while, so that we do not
1087 	starve other threads from doing CREATE TABLE etc. if we have a huge
1088 	cascaded operation running. The counter n_foreign_key_checks_running
1089 	will prevent other users from dropping or ALTERing the table when we
1090 	release the latch. */
1091 
1092 	row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1093 
1094 	DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1095 
1096 	row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1097 
1098 	mtr_start(mtr);
1099 
1100 	/* Restore pcur position */
1101 
1102 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1103 
1104 	if (tmp_heap) {
1105 		mem_heap_free(tmp_heap);
1106 	}
1107 
1108 	if (upd_vec_heap) {
1109 		mem_heap_free(upd_vec_heap);
1110 	}
1111 
1112 	return(err);
1113 
1114 nonstandard_exit_func:
1115 	if (tmp_heap) {
1116 		mem_heap_free(tmp_heap);
1117 	}
1118 
1119 	if (upd_vec_heap) {
1120 		mem_heap_free(upd_vec_heap);
1121 	}
1122 
1123 	btr_pcur_store_position(pcur, mtr);
1124 
1125 	mtr_commit(mtr);
1126 	mtr_start(mtr);
1127 
1128 	btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1129 
1130 	return(err);
1131 }
1132 
1133 /*********************************************************************//**
1134 Sets a shared lock on a record. Used in locking possible duplicate key
1135 records and also in checking foreign key constraints.
1136 @return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1137 static
1138 enum db_err
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1139 row_ins_set_shared_rec_lock(
1140 /*========================*/
1141 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1142 					LOCK_REC_NOT_GAP type lock */
1143 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1144 	const rec_t*		rec,	/*!< in: record */
1145 	dict_index_t*		index,	/*!< in: index */
1146 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1147 	que_thr_t*		thr)	/*!< in: query thread */
1148 {
1149 	enum db_err	err;
1150 
1151 	ut_ad(rec_offs_validate(rec, index, offsets));
1152 
1153 	if (dict_index_is_clust(index)) {
1154 		err = lock_clust_rec_read_check_and_lock(
1155 			0, block, rec, index, offsets, LOCK_S, type, thr);
1156 	} else {
1157 		err = lock_sec_rec_read_check_and_lock(
1158 			0, block, rec, index, offsets, LOCK_S, type, thr);
1159 	}
1160 
1161 	return(err);
1162 }
1163 
1164 /*********************************************************************//**
1165 Sets a exclusive lock on a record. Used in locking possible duplicate key
1166 records
1167 @return	DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1168 static
1169 enum db_err
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1170 row_ins_set_exclusive_rec_lock(
1171 /*===========================*/
1172 	ulint			type,	/*!< in: LOCK_ORDINARY, LOCK_GAP, or
1173 					LOCK_REC_NOT_GAP type lock */
1174 	const buf_block_t*	block,	/*!< in: buffer block of rec */
1175 	const rec_t*		rec,	/*!< in: record */
1176 	dict_index_t*		index,	/*!< in: index */
1177 	const ulint*		offsets,/*!< in: rec_get_offsets(rec, index) */
1178 	que_thr_t*		thr)	/*!< in: query thread */
1179 {
1180 	enum db_err	err;
1181 
1182 	ut_ad(rec_offs_validate(rec, index, offsets));
1183 
1184 	if (dict_index_is_clust(index)) {
1185 		err = lock_clust_rec_read_check_and_lock(
1186 			0, block, rec, index, offsets, LOCK_X, type, thr);
1187 	} else {
1188 		err = lock_sec_rec_read_check_and_lock(
1189 			0, block, rec, index, offsets, LOCK_X, type, thr);
1190 	}
1191 
1192 	return(err);
1193 }
1194 
1195 /***************************************************************//**
1196 Checks if foreign key constraint fails for an index entry. Sets shared locks
1197 which lock either the success or the failure of the constraint. NOTE that
1198 the caller must have a shared latch on dict_operation_lock.
1199 @return	DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1200 UNIV_INTERN
1201 ulint
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1202 row_ins_check_foreign_constraint(
1203 /*=============================*/
1204 	ibool		check_ref,/*!< in: TRUE if we want to check that
1205 				the referenced table is ok, FALSE if we
1206 				want to check the foreign key table */
1207 	dict_foreign_t*	foreign,/*!< in: foreign constraint; NOTE that the
1208 				tables mentioned in it must be in the
1209 				dictionary cache if they exist at all */
1210 	dict_table_t*	table,	/*!< in: if check_ref is TRUE, then the foreign
1211 				table, else the referenced table */
1212 	dtuple_t*	entry,	/*!< in: index entry for index */
1213 	que_thr_t*	thr)	/*!< in: query thread */
1214 {
1215 	upd_node_t*	upd_node;
1216 	dict_table_t*	check_table;
1217 	dict_index_t*	check_index;
1218 	ulint		n_fields_cmp;
1219 	btr_pcur_t	pcur;
1220 	int		cmp;
1221 	ulint		err;
1222 	ulint		i;
1223 	mtr_t		mtr;
1224 	trx_t*		trx		= thr_get_trx(thr);
1225 	mem_heap_t*	heap		= NULL;
1226 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1227 	ulint*		offsets		= offsets_;
1228 	rec_offs_init(offsets_);
1229 
1230 run_again:
1231 #ifdef UNIV_SYNC_DEBUG
1232 	ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
1233 #endif /* UNIV_SYNC_DEBUG */
1234 
1235 	err = DB_SUCCESS;
1236 
1237 	if (trx->check_foreigns == FALSE) {
1238 		/* The user has suppressed foreign key checks currently for
1239 		this session */
1240 		goto exit_func;
1241 	}
1242 
1243 	/* If any of the foreign key fields in entry is SQL NULL, we
1244 	suppress the foreign key check: this is compatible with Oracle,
1245 	for example */
1246 
1247 	for (i = 0; i < foreign->n_fields; i++) {
1248 		if (UNIV_SQL_NULL == dfield_get_len(
1249 			    dtuple_get_nth_field(entry, i))) {
1250 
1251 			goto exit_func;
1252 		}
1253 	}
1254 
1255 	if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1256 		upd_node = thr->run_node;
1257 
1258 		if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1259 			/* If a cascaded update is done as defined by a
1260 			foreign key constraint, do not check that
1261 			constraint for the child row. In ON UPDATE CASCADE
1262 			the update of the parent row is only half done when
1263 			we come here: if we would check the constraint here
1264 			for the child row it would fail.
1265 
1266 			A QUESTION remains: if in the child table there are
1267 			several constraints which refer to the same parent
1268 			table, we should merge all updates to the child as
1269 			one update? And the updates can be contradictory!
1270 			Currently we just perform the update associated
1271 			with each foreign key constraint, one after
1272 			another, and the user has problems predicting in
1273 			which order they are performed. */
1274 
1275 			goto exit_func;
1276 		}
1277 	}
1278 
1279 	if (check_ref) {
1280 		check_table = foreign->referenced_table;
1281 		check_index = foreign->referenced_index;
1282 	} else {
1283 		check_table = foreign->foreign_table;
1284 		check_index = foreign->foreign_index;
1285 	}
1286 
1287 	if (check_table == NULL || check_table->ibd_file_missing
1288 	    || check_index == NULL) {
1289 		if (check_ref) {
1290 			FILE*	ef = dict_foreign_err_file;
1291 
1292 			row_ins_set_detailed(trx, foreign);
1293 
1294 			mutex_enter(&dict_foreign_err_mutex);
1295 			rewind(ef);
1296 			ut_print_timestamp(ef);
1297 			fputs(" Transaction:\n", ef);
1298 			trx_print(ef, trx, 600);
1299 			fputs("Foreign key constraint fails for table ", ef);
1300 			ut_print_name(ef, trx, TRUE,
1301 				      foreign->foreign_table_name);
1302 			fputs(":\n", ef);
1303 			dict_print_info_on_foreign_key_in_create_format(
1304 				ef, trx, foreign, TRUE);
1305 			fputs("\nTrying to add to index ", ef);
1306 			ut_print_name(ef, trx, FALSE,
1307 				      foreign->foreign_index->name);
1308 			fputs(" tuple:\n", ef);
1309 			dtuple_print(ef, entry);
1310 			fputs("\nBut the parent table ", ef);
1311 			ut_print_name(ef, trx, TRUE,
1312 				      foreign->referenced_table_name);
1313 			fputs("\nor its .ibd file does"
1314 			      " not currently exist!\n", ef);
1315 			mutex_exit(&dict_foreign_err_mutex);
1316 
1317 			err = DB_NO_REFERENCED_ROW;
1318 		}
1319 
1320 		goto exit_func;
1321 	}
1322 
1323 	if (check_table != table) {
1324 		/* We already have a LOCK_IX on table, but not necessarily
1325 		on check_table */
1326 
1327 		err = lock_table(0, check_table, LOCK_IS, thr);
1328 
1329 		if (err != DB_SUCCESS) {
1330 
1331 			goto do_possible_lock_wait;
1332 		}
1333 	}
1334 
1335 	mtr_start(&mtr);
1336 
1337 	/* Store old value on n_fields_cmp */
1338 
1339 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1340 
1341 	dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1342 
1343 	btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1344 		      BTR_SEARCH_LEAF, &pcur, &mtr);
1345 
1346 	/* Scan index records and check if there is a matching record */
1347 
1348 	do {
1349 		const rec_t*		rec = btr_pcur_get_rec(&pcur);
1350 		const buf_block_t*	block = btr_pcur_get_block(&pcur);
1351 
1352 		if (page_rec_is_infimum(rec)) {
1353 
1354 			continue;
1355 		}
1356 
1357 		offsets = rec_get_offsets(rec, check_index,
1358 					  offsets, ULINT_UNDEFINED, &heap);
1359 
1360 		if (page_rec_is_supremum(rec)) {
1361 
1362 			err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1363 							  rec, check_index,
1364 							  offsets, thr);
1365 			switch (err) {
1366 			case DB_SUCCESS_LOCKED_REC:
1367 			case DB_SUCCESS:
1368 				continue;
1369 			default:
1370 				goto end_scan;
1371 			}
1372 		}
1373 
1374 		cmp = cmp_dtuple_rec(entry, rec, offsets);
1375 
1376 		if (cmp == 0) {
1377 			if (rec_get_deleted_flag(rec,
1378 						 rec_offs_comp(offsets))) {
1379 				err = row_ins_set_shared_rec_lock(
1380 					LOCK_ORDINARY, block,
1381 					rec, check_index, offsets, thr);
1382 				switch (err) {
1383 				case DB_SUCCESS_LOCKED_REC:
1384 				case DB_SUCCESS:
1385 					break;
1386 				default:
1387 					goto end_scan;
1388 				}
1389 			} else {
1390 				/* Found a matching record. Lock only
1391 				a record because we can allow inserts
1392 				into gaps */
1393 
1394 				err = row_ins_set_shared_rec_lock(
1395 					LOCK_REC_NOT_GAP, block,
1396 					rec, check_index, offsets, thr);
1397 
1398 				switch (err) {
1399 				case DB_SUCCESS_LOCKED_REC:
1400 				case DB_SUCCESS:
1401 					break;
1402 				default:
1403 					goto end_scan;
1404 				}
1405 
1406 				if (check_ref) {
1407 					err = DB_SUCCESS;
1408 
1409 					goto end_scan;
1410 				} else if (foreign->type != 0) {
1411 					/* There is an ON UPDATE or ON DELETE
1412 					condition: check them in a separate
1413 					function */
1414 
1415 					err = row_ins_foreign_check_on_constraint(
1416 						thr, foreign, &pcur, entry,
1417 						&mtr);
1418 					if (err != DB_SUCCESS) {
1419 						/* Since reporting a plain
1420 						"duplicate key" error
1421 						message to the user in
1422 						cases where a long CASCADE
1423 						operation would lead to a
1424 						duplicate key in some
1425 						other table is very
1426 						confusing, map duplicate
1427 						key errors resulting from
1428 						FK constraints to a
1429 						separate error code. */
1430 
1431 						if (err == DB_DUPLICATE_KEY) {
1432 							err = DB_FOREIGN_DUPLICATE_KEY;
1433 						}
1434 
1435 						goto end_scan;
1436 					}
1437 
1438 					/* row_ins_foreign_check_on_constraint
1439 					may have repositioned pcur on a
1440 					different block */
1441 					block = btr_pcur_get_block(&pcur);
1442 				} else {
1443 					row_ins_foreign_report_err(
1444 						"Trying to delete or update",
1445 						thr, foreign, rec, entry);
1446 
1447 					err = DB_ROW_IS_REFERENCED;
1448 					goto end_scan;
1449 				}
1450 			}
1451 		} else {
1452 			ut_a(cmp < 0);
1453 
1454 			err = row_ins_set_shared_rec_lock(
1455 				LOCK_GAP, block,
1456 				rec, check_index, offsets, thr);
1457 
1458 			switch (err) {
1459 			case DB_SUCCESS_LOCKED_REC:
1460 			case DB_SUCCESS:
1461 				if (check_ref) {
1462 					err = DB_NO_REFERENCED_ROW;
1463 					row_ins_foreign_report_add_err(
1464 						trx, foreign, rec, entry);
1465 				} else {
1466 					err = DB_SUCCESS;
1467 				}
1468 			}
1469 
1470 			goto end_scan;
1471 		}
1472 	} while (btr_pcur_move_to_next(&pcur, &mtr));
1473 
1474 	if (check_ref) {
1475 		row_ins_foreign_report_add_err(
1476 			trx, foreign, btr_pcur_get_rec(&pcur), entry);
1477 		err = DB_NO_REFERENCED_ROW;
1478 	} else {
1479 		err = DB_SUCCESS;
1480 	}
1481 
1482 end_scan:
1483 	btr_pcur_close(&pcur);
1484 
1485 	mtr_commit(&mtr);
1486 
1487 	/* Restore old value */
1488 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1489 
1490 do_possible_lock_wait:
1491 	if (err == DB_LOCK_WAIT) {
1492 		trx->error_state = err;
1493 
1494 		que_thr_stop_for_mysql(thr);
1495 
1496 		srv_suspend_mysql_thread(thr);
1497 
1498 		if (trx->error_state == DB_SUCCESS) {
1499 
1500 			goto run_again;
1501 		}
1502 
1503 		err = trx->error_state;
1504 	}
1505 
1506 exit_func:
1507 	if (UNIV_LIKELY_NULL(heap)) {
1508 		mem_heap_free(heap);
1509 	}
1510 	return(err);
1511 }
1512 
1513 /***************************************************************//**
1514 Checks if foreign key constraints fail for an index entry. If index
1515 is not mentioned in any constraint, this function does nothing,
1516 Otherwise does searches to the indexes of referenced tables and
1517 sets shared locks which lock either the success or the failure of
1518 a constraint.
1519 @return	DB_SUCCESS or error code */
1520 static
1521 ulint
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1522 row_ins_check_foreign_constraints(
1523 /*==============================*/
1524 	dict_table_t*	table,	/*!< in: table */
1525 	dict_index_t*	index,	/*!< in: index */
1526 	dtuple_t*	entry,	/*!< in: index entry for index */
1527 	que_thr_t*	thr)	/*!< in: query thread */
1528 {
1529 	dict_foreign_t*	foreign;
1530 	ulint		err;
1531 	trx_t*		trx;
1532 	ibool		got_s_lock	= FALSE;
1533 
1534 	trx = thr_get_trx(thr);
1535 
1536 	foreign = UT_LIST_GET_FIRST(table->foreign_list);
1537 
1538 	while (foreign) {
1539 		if (foreign->foreign_index == index) {
1540 
1541 			if (foreign->referenced_table == NULL) {
1542 				dict_table_get(foreign->referenced_table_name_lookup,
1543 					       FALSE,
1544 					       DICT_ERR_IGNORE_NONE);
1545 			}
1546 
1547 			if (0 == trx->dict_operation_lock_mode) {
1548 				got_s_lock = TRUE;
1549 
1550 				row_mysql_freeze_data_dictionary(trx);
1551 			}
1552 
1553 			if (foreign->referenced_table) {
1554 				mutex_enter(&(dict_sys->mutex));
1555 
1556 				(foreign->referenced_table
1557 				 ->n_foreign_key_checks_running)++;
1558 
1559 				mutex_exit(&(dict_sys->mutex));
1560 			}
1561 
1562 			/* NOTE that if the thread ends up waiting for a lock
1563 			we will release dict_operation_lock temporarily!
1564 			But the counter on the table protects the referenced
1565 			table from being dropped while the check is running. */
1566 
1567 			err = row_ins_check_foreign_constraint(
1568 				TRUE, foreign, table, entry, thr);
1569 
1570 			if (foreign->referenced_table) {
1571 				mutex_enter(&(dict_sys->mutex));
1572 
1573 				ut_a(foreign->referenced_table
1574 				     ->n_foreign_key_checks_running > 0);
1575 				(foreign->referenced_table
1576 				 ->n_foreign_key_checks_running)--;
1577 
1578 				mutex_exit(&(dict_sys->mutex));
1579 			}
1580 
1581 			if (got_s_lock) {
1582 				row_mysql_unfreeze_data_dictionary(trx);
1583 			}
1584 
1585 			if (err != DB_SUCCESS) {
1586 				return(err);
1587 			}
1588 		}
1589 
1590 		foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
1591 	}
1592 
1593 	return(DB_SUCCESS);
1594 }
1595 
1596 /***************************************************************//**
1597 Checks if a unique key violation to rec would occur at the index entry
1598 insert.
1599 @return	TRUE if error */
1600 static
1601 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1602 row_ins_dupl_error_with_rec(
1603 /*========================*/
1604 	const rec_t*	rec,	/*!< in: user record; NOTE that we assume
1605 				that the caller already has a record lock on
1606 				the record! */
1607 	const dtuple_t*	entry,	/*!< in: entry to insert */
1608 	dict_index_t*	index,	/*!< in: index */
1609 	const ulint*	offsets)/*!< in: rec_get_offsets(rec, index) */
1610 {
1611 	ulint	matched_fields;
1612 	ulint	matched_bytes;
1613 	ulint	n_unique;
1614 	ulint	i;
1615 
1616 	ut_ad(rec_offs_validate(rec, index, offsets));
1617 
1618 	n_unique = dict_index_get_n_unique(index);
1619 
1620 	matched_fields = 0;
1621 	matched_bytes = 0;
1622 
1623 	cmp_dtuple_rec_with_match(entry, rec, offsets,
1624 				  &matched_fields, &matched_bytes);
1625 
1626 	if (matched_fields < n_unique) {
1627 
1628 		return(FALSE);
1629 	}
1630 
1631 	/* In a unique secondary index we allow equal key values if they
1632 	contain SQL NULLs */
1633 
1634 	if (!dict_index_is_clust(index)) {
1635 
1636 		for (i = 0; i < n_unique; i++) {
1637 			if (UNIV_SQL_NULL == dfield_get_len(
1638 				    dtuple_get_nth_field(entry, i))) {
1639 
1640 				return(FALSE);
1641 			}
1642 		}
1643 	}
1644 
1645 	return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1646 }
1647 
1648 /***************************************************************//**
1649 Scans a unique non-clustered index at a given index entry to determine
1650 whether a uniqueness violation has occurred for the key value of the entry.
1651 Set shared locks on possible duplicate records.
1652 @return	DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
1653 static
1654 ulint
row_ins_scan_sec_index_for_duplicate(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1655 row_ins_scan_sec_index_for_duplicate(
1656 /*=================================*/
1657 	dict_index_t*	index,	/*!< in: non-clustered unique index */
1658 	dtuple_t*	entry,	/*!< in: index entry */
1659 	que_thr_t*	thr)	/*!< in: query thread */
1660 {
1661 	ulint		n_unique;
1662 	ulint		i;
1663 	int		cmp;
1664 	ulint		n_fields_cmp;
1665 	btr_pcur_t	pcur;
1666 	ulint		err		= DB_SUCCESS;
1667 	ulint		allow_duplicates;
1668 	mtr_t		mtr;
1669 	mem_heap_t*	heap		= NULL;
1670 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1671 	ulint*		offsets		= offsets_;
1672 	rec_offs_init(offsets_);
1673 
1674 	n_unique = dict_index_get_n_unique(index);
1675 
1676 	/* If the secondary index is unique, but one of the fields in the
1677 	n_unique first fields is NULL, a unique key violation cannot occur,
1678 	since we define NULL != NULL in this case */
1679 
1680 	for (i = 0; i < n_unique; i++) {
1681 		if (UNIV_SQL_NULL == dfield_get_len(
1682 			    dtuple_get_nth_field(entry, i))) {
1683 
1684 			return(DB_SUCCESS);
1685 		}
1686 	}
1687 
1688 	mtr_start(&mtr);
1689 
1690 	/* Store old value on n_fields_cmp */
1691 
1692 	n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1693 
1694 	dtuple_set_n_fields_cmp(entry, dict_index_get_n_unique(index));
1695 
1696 	btr_pcur_open(index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
1697 
1698 	allow_duplicates = thr_get_trx(thr)->duplicates;
1699 
1700 	/* Scan index records and check if there is a duplicate */
1701 
1702 	do {
1703 		const rec_t*		rec	= btr_pcur_get_rec(&pcur);
1704 		const buf_block_t*	block	= btr_pcur_get_block(&pcur);
1705 		const ulint		lock_type = LOCK_ORDINARY;
1706 
1707 		if (page_rec_is_infimum(rec)) {
1708 
1709 			continue;
1710 		}
1711 
1712 		offsets = rec_get_offsets(rec, index, offsets,
1713 					  ULINT_UNDEFINED, &heap);
1714 
1715 		if (allow_duplicates) {
1716 
1717 			/* If the SQL-query will update or replace
1718 			duplicate key we will take X-lock for
1719 			duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1720 			INSERT ON DUPLICATE KEY UPDATE). */
1721 
1722 			err = row_ins_set_exclusive_rec_lock(
1723 				lock_type, block, rec, index, offsets, thr);
1724 		} else {
1725 
1726 			err = row_ins_set_shared_rec_lock(
1727 				lock_type, block, rec, index, offsets, thr);
1728 		}
1729 
1730 		switch (err) {
1731 		case DB_SUCCESS_LOCKED_REC:
1732 			err = DB_SUCCESS;
1733 		case DB_SUCCESS:
1734 			break;
1735 		default:
1736 			goto end_scan;
1737 		}
1738 
1739 		if (page_rec_is_supremum(rec)) {
1740 
1741 			continue;
1742 		}
1743 
1744 		cmp = cmp_dtuple_rec(entry, rec, offsets);
1745 
1746 		if (cmp == 0) {
1747 			if (row_ins_dupl_error_with_rec(rec, entry,
1748 							index, offsets)) {
1749 				err = DB_DUPLICATE_KEY;
1750 
1751 				thr_get_trx(thr)->error_info = index;
1752 
1753 				goto end_scan;
1754 			}
1755 		} else {
1756 			ut_a(cmp < 0);
1757 			goto end_scan;
1758 		}
1759 	} while (btr_pcur_move_to_next(&pcur, &mtr));
1760 
1761 end_scan:
1762 	if (UNIV_LIKELY_NULL(heap)) {
1763 		mem_heap_free(heap);
1764 	}
1765 	mtr_commit(&mtr);
1766 
1767 	/* Restore old value */
1768 	dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1769 
1770 	return(err);
1771 }
1772 
1773 /***************************************************************//**
1774 Checks if a unique key violation error would occur at an index entry
1775 insert. Sets shared locks on possible duplicate records. Works only
1776 for a clustered index!
1777 @return DB_SUCCESS if no error, DB_DUPLICATE_KEY if error,
1778 DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
1779 record */
1780 static
1781 ulint
row_ins_duplicate_error_in_clust(btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)1782 row_ins_duplicate_error_in_clust(
1783 /*=============================*/
1784 	btr_cur_t*	cursor,	/*!< in: B-tree cursor */
1785 	const dtuple_t*	entry,	/*!< in: entry to insert */
1786 	que_thr_t*	thr,	/*!< in: query thread */
1787 	mtr_t*		mtr)	/*!< in: mtr */
1788 {
1789 	ulint	err;
1790 	rec_t*	rec;
1791 	ulint	n_unique;
1792 	trx_t*	trx		= thr_get_trx(thr);
1793 	mem_heap_t*heap		= NULL;
1794 	ulint	offsets_[REC_OFFS_NORMAL_SIZE];
1795 	ulint*	offsets		= offsets_;
1796 	rec_offs_init(offsets_);
1797 
1798 	UT_NOT_USED(mtr);
1799 
1800 	ut_a(dict_index_is_clust(cursor->index));
1801 	ut_ad(dict_index_is_unique(cursor->index));
1802 
1803 	/* NOTE: For unique non-clustered indexes there may be any number
1804 	of delete marked records with the same value for the non-clustered
1805 	index key (remember multiversioning), and which differ only in
1806 	the row refererence part of the index record, containing the
1807 	clustered index key fields. For such a secondary index record,
1808 	to avoid race condition, we must FIRST do the insertion and after
1809 	that check that the uniqueness condition is not breached! */
1810 
1811 	/* NOTE: A problem is that in the B-tree node pointers on an
1812 	upper level may match more to the entry than the actual existing
1813 	user records on the leaf level. So, even if low_match would suggest
1814 	that a duplicate key violation may occur, this may not be the case. */
1815 
1816 	n_unique = dict_index_get_n_unique(cursor->index);
1817 
1818 	if (cursor->low_match >= n_unique) {
1819 
1820 		rec = btr_cur_get_rec(cursor);
1821 
1822 		if (!page_rec_is_infimum(rec)) {
1823 			offsets = rec_get_offsets(rec, cursor->index, offsets,
1824 						  ULINT_UNDEFINED, &heap);
1825 
1826 			/* We set a lock on the possible duplicate: this
1827 			is needed in logical logging of MySQL to make
1828 			sure that in roll-forward we get the same duplicate
1829 			errors as in original execution */
1830 
1831 			if (trx->duplicates) {
1832 
1833 				/* If the SQL-query will update or replace
1834 				duplicate key we will take X-lock for
1835 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1836 				INSERT ON DUPLICATE KEY UPDATE). */
1837 
1838 				err = row_ins_set_exclusive_rec_lock(
1839 					LOCK_REC_NOT_GAP,
1840 					btr_cur_get_block(cursor),
1841 					rec, cursor->index, offsets, thr);
1842 			} else {
1843 
1844 				err = row_ins_set_shared_rec_lock(
1845 					LOCK_REC_NOT_GAP,
1846 					btr_cur_get_block(cursor), rec,
1847 					cursor->index, offsets, thr);
1848 			}
1849 
1850 			switch (err) {
1851 			case DB_SUCCESS_LOCKED_REC:
1852 			case DB_SUCCESS:
1853 				break;
1854 			default:
1855 				goto func_exit;
1856 			}
1857 
1858 			if (row_ins_dupl_error_with_rec(
1859 				    rec, entry, cursor->index, offsets)) {
1860 				trx->error_info = cursor->index;
1861 				err = DB_DUPLICATE_KEY;
1862 				goto func_exit;
1863 			}
1864 		}
1865 	}
1866 
1867 	if (cursor->up_match >= n_unique) {
1868 
1869 		rec = page_rec_get_next(btr_cur_get_rec(cursor));
1870 
1871 		if (!page_rec_is_supremum(rec)) {
1872 			offsets = rec_get_offsets(rec, cursor->index, offsets,
1873 						  ULINT_UNDEFINED, &heap);
1874 
1875 			if (trx->duplicates) {
1876 
1877 				/* If the SQL-query will update or replace
1878 				duplicate key we will take X-lock for
1879 				duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1880 				INSERT ON DUPLICATE KEY UPDATE). */
1881 
1882 				err = row_ins_set_exclusive_rec_lock(
1883 					LOCK_REC_NOT_GAP,
1884 					btr_cur_get_block(cursor),
1885 					rec, cursor->index, offsets, thr);
1886 			} else {
1887 
1888 				err = row_ins_set_shared_rec_lock(
1889 					LOCK_REC_NOT_GAP,
1890 					btr_cur_get_block(cursor),
1891 					rec, cursor->index, offsets, thr);
1892 			}
1893 
1894 			switch (err) {
1895 			case DB_SUCCESS_LOCKED_REC:
1896 			case DB_SUCCESS:
1897 				break;
1898 			default:
1899 				goto func_exit;
1900 			}
1901 
1902 			if (row_ins_dupl_error_with_rec(
1903 				    rec, entry, cursor->index, offsets)) {
1904 				trx->error_info = cursor->index;
1905 				err = DB_DUPLICATE_KEY;
1906 				goto func_exit;
1907 			}
1908 		}
1909 
1910 		ut_a(!dict_index_is_clust(cursor->index));
1911 		/* This should never happen */
1912 	}
1913 
1914 	err = DB_SUCCESS;
1915 func_exit:
1916 	if (UNIV_LIKELY_NULL(heap)) {
1917 		mem_heap_free(heap);
1918 	}
1919 	return(err);
1920 }
1921 
1922 /***************************************************************//**
1923 Checks if an index entry has long enough common prefix with an existing
1924 record so that the intended insert of the entry must be changed to a modify of
1925 the existing record. In the case of a clustered index, the prefix must be
1926 n_unique fields long, and in the case of a secondary index, all fields must be
1927 equal.
1928 @return 0 if no update, ROW_INS_PREV if previous should be updated;
1929 currently we do the search so that only the low_match record can match
1930 enough to the search tuple, not the next record */
1931 UNIV_INLINE
1932 ulint
row_ins_must_modify(btr_cur_t * cursor)1933 row_ins_must_modify(
1934 /*================*/
1935 	btr_cur_t*	cursor)	/*!< in: B-tree cursor */
1936 {
1937 	ulint	enough_match;
1938 	rec_t*	rec;
1939 
1940 	/* NOTE: (compare to the note in row_ins_duplicate_error) Because node
1941 	pointers on upper levels of the B-tree may match more to entry than
1942 	to actual user records on the leaf level, we have to check if the
1943 	candidate record is actually a user record. In a clustered index
1944 	node pointers contain index->n_unique first fields, and in the case
1945 	of a secondary index, all fields of the index. */
1946 
1947 	enough_match = dict_index_get_n_unique_in_tree(cursor->index);
1948 
1949 	if (cursor->low_match >= enough_match) {
1950 
1951 		rec = btr_cur_get_rec(cursor);
1952 
1953 		if (!page_rec_is_infimum(rec)) {
1954 
1955 			return(ROW_INS_PREV);
1956 		}
1957 	}
1958 
1959 	return(0);
1960 }
1961 
1962 /***************************************************************//**
1963 Tries to insert an index entry to an index. If the index is clustered
1964 and a record with the same unique key is found, the other record is
1965 necessarily marked deleted by a committed transaction, or a unique key
1966 violation error occurs. The delete marked record is then updated to an
1967 existing record, and we must write an undo log record on the delete
1968 marked record. If the index is secondary, and a record with exactly the
1969 same fields is found, the other record is necessarily marked deleted.
1970 It is then unmarked. Otherwise, the entry is just inserted to the index.
1971 @return DB_SUCCESS, DB_LOCK_WAIT, DB_FAIL if pessimistic retry needed,
1972 or error code */
1973 static
1974 ulint
row_ins_index_entry_low(ulint mode,dict_index_t * index,dtuple_t * entry,ulint n_ext,que_thr_t * thr)1975 row_ins_index_entry_low(
1976 /*====================*/
1977 	ulint		mode,	/*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
1978 				depending on whether we wish optimistic or
1979 				pessimistic descent down the index tree */
1980 	dict_index_t*	index,	/*!< in: index */
1981 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
1982 	ulint		n_ext,	/*!< in: number of externally stored columns */
1983 	que_thr_t*	thr)	/*!< in: query thread */
1984 {
1985 	btr_cur_t	cursor;
1986 	ulint		search_mode;
1987 	ulint		modify = 0; /* remove warning */
1988 	rec_t*		insert_rec;
1989 	rec_t*		rec;
1990 	ulint*		offsets;
1991 	ulint		err;
1992 	ulint		n_unique;
1993 	big_rec_t*	big_rec			= NULL;
1994 	mtr_t		mtr;
1995 	mem_heap_t*	heap			= NULL;
1996 
1997 	log_free_check();
1998 
1999 	mtr_start(&mtr);
2000 
2001 	cursor.thr = thr;
2002 
2003 	/* Note that we use PAGE_CUR_LE as the search mode, because then
2004 	the function will return in both low_match and up_match of the
2005 	cursor sensible values */
2006 
2007 	if (dict_index_is_clust(index)) {
2008 		search_mode = mode;
2009 	} else if (!(thr_get_trx(thr)->check_unique_secondary)) {
2010 		search_mode = mode | BTR_INSERT | BTR_IGNORE_SEC_UNIQUE;
2011 	} else {
2012 		search_mode = mode | BTR_INSERT;
2013 	}
2014 
2015 	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2016 				    search_mode,
2017 				    &cursor, 0, __FILE__, __LINE__, &mtr);
2018 
2019 	if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2020 		/* The insertion was made to the insert buffer already during
2021 		the search: we are done */
2022 
2023 		ut_ad(search_mode & BTR_INSERT);
2024 		err = DB_SUCCESS;
2025 
2026 		goto function_exit;
2027 	}
2028 
2029 #ifdef UNIV_DEBUG
2030 	{
2031 		page_t*	page = btr_cur_get_page(&cursor);
2032 		rec_t*	first_rec = page_rec_get_next(
2033 			page_get_infimum_rec(page));
2034 
2035 		ut_ad(page_rec_is_supremum(first_rec)
2036 		      || rec_get_n_fields(first_rec, index)
2037 		      == dtuple_get_n_fields(entry));
2038 	}
2039 #endif
2040 
2041 	n_unique = dict_index_get_n_unique(index);
2042 
2043 	if (dict_index_is_unique(index) && (cursor.up_match >= n_unique
2044 					    || cursor.low_match >= n_unique)) {
2045 
2046 		if (dict_index_is_clust(index)) {
2047 			/* Note that the following may return also
2048 			DB_LOCK_WAIT */
2049 
2050 			err = row_ins_duplicate_error_in_clust(
2051 				&cursor, entry, thr, &mtr);
2052 			if (err != DB_SUCCESS) {
2053 
2054 				goto function_exit;
2055 			}
2056 		} else {
2057 			mtr_commit(&mtr);
2058 			err = row_ins_scan_sec_index_for_duplicate(
2059 				index, entry, thr);
2060 			mtr_start(&mtr);
2061 
2062 			if (err != DB_SUCCESS) {
2063 				goto function_exit;
2064 			}
2065 
2066 			/* We did not find a duplicate and we have now
2067 			locked with s-locks the necessary records to
2068 			prevent any insertion of a duplicate by another
2069 			transaction. Let us now reposition the cursor and
2070 			continue the insertion. */
2071 
2072 			btr_cur_search_to_nth_level(index, 0, entry,
2073 						    PAGE_CUR_LE,
2074 						    mode | BTR_INSERT,
2075 						    &cursor, 0,
2076 						    __FILE__, __LINE__, &mtr);
2077 		}
2078 	}
2079 
2080 	modify = row_ins_must_modify(&cursor);
2081 
2082 	if (modify != 0) {
2083 		/* There is already an index entry with a long enough common
2084 		prefix, we must convert the insert into a modify of an
2085 		existing record */
2086 
2087 		if (modify == ROW_INS_NEXT) {
2088 			rec = page_rec_get_next(btr_cur_get_rec(&cursor));
2089 
2090 			btr_cur_position(index, rec,
2091 					 btr_cur_get_block(&cursor),&cursor);
2092 		}
2093 
2094 		if (dict_index_is_clust(index)) {
2095 			err = row_ins_clust_index_entry_by_modify(
2096 				mode, &cursor, &heap, &big_rec, entry,
2097 				thr, &mtr);
2098 
2099 			if (big_rec) {
2100 				ut_a(err == DB_SUCCESS);
2101 				/* Write out the externally stored
2102 				columns while still x-latching
2103 				index->lock and block->lock. Allocate
2104 				pages for big_rec in the mtr that
2105 				modified the B-tree, but be sure to skip
2106 				any pages that were freed in mtr. We will
2107 				write out the big_rec pages before
2108 				committing the B-tree mini-transaction. If
2109 				the system crashes so that crash recovery
2110 				will not replay the mtr_commit(&mtr), the
2111 				big_rec pages will be left orphaned until
2112 				the pages are allocated for something else.
2113 
2114 				TODO: If the allocation extends the
2115 				tablespace, it will not be redo
2116 				logged, in either mini-transaction.
2117 				Tablespace extension should be
2118 				redo-logged in the big_rec
2119 				mini-transaction, so that recovery
2120 				will not fail when the big_rec was
2121 				written to the extended portion of the
2122 				file, in case the file was somehow
2123 				truncated in the crash. */
2124 
2125 				rec = btr_cur_get_rec(&cursor);
2126 				offsets = rec_get_offsets(
2127 					rec, index, NULL,
2128 					ULINT_UNDEFINED, &heap);
2129 
2130 				DEBUG_SYNC_C("before_row_ins_upd_extern");
2131 				err = btr_store_big_rec_extern_fields(
2132 					index, btr_cur_get_block(&cursor),
2133 					rec, offsets, big_rec, &mtr,
2134 					BTR_STORE_INSERT_UPDATE);
2135 				DEBUG_SYNC_C("after_row_ins_upd_extern");
2136 				/* If writing big_rec fails (for
2137 				example, because of DB_OUT_OF_FILE_SPACE),
2138 				the record will be corrupted. Even if
2139 				we did not update any externally
2140 				stored columns, our update could cause
2141 				the record to grow so that a
2142 				non-updated column was selected for
2143 				external storage. This non-update
2144 				would not have been written to the
2145 				undo log, and thus the record cannot
2146 				be rolled back.
2147 
2148 				However, because we have not executed
2149 				mtr_commit(mtr) yet, the update will
2150 				not be replayed in crash recovery, and
2151 				the following assertion failure will
2152 				effectively "roll back" the operation. */
2153 				ut_a(err == DB_SUCCESS);
2154 				goto stored_big_rec;
2155 			}
2156 		} else {
2157 			ut_ad(!n_ext);
2158 			err = row_ins_sec_index_entry_by_modify(
2159 				mode, &cursor, entry, thr, &mtr);
2160 		}
2161 	} else {
2162 		if (mode == BTR_MODIFY_LEAF) {
2163 			err = btr_cur_optimistic_insert(
2164 				0, &cursor, entry, &insert_rec, &big_rec,
2165 				n_ext, thr, &mtr);
2166 		} else {
2167 			ut_a(mode == BTR_MODIFY_TREE);
2168 			if (buf_LRU_buf_pool_running_out()) {
2169 
2170 				err = DB_LOCK_TABLE_FULL;
2171 
2172 				goto function_exit;
2173 			}
2174 
2175 			err = btr_cur_optimistic_insert(
2176 				0, &cursor, entry, &insert_rec, &big_rec,
2177 				n_ext, thr, &mtr);
2178 
2179 			if (err == DB_FAIL) {
2180 				err = btr_cur_pessimistic_insert(
2181 					0, &cursor, entry, &insert_rec,
2182 					&big_rec, n_ext, thr, &mtr);
2183 			}
2184 		}
2185 	}
2186 
2187 function_exit:
2188 	mtr_commit(&mtr);
2189 
2190 	if (UNIV_LIKELY_NULL(big_rec)) {
2191 		DBUG_EXECUTE_IF(
2192 			"row_ins_extern_checkpoint",
2193 			log_make_checkpoint_at(IB_ULONGLONG_MAX, TRUE););
2194 
2195 		mtr_start(&mtr);
2196 
2197 		DEBUG_SYNC_C("before_row_ins_extern_latch");
2198 		btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2199 					    BTR_MODIFY_TREE, &cursor, 0,
2200 					    __FILE__, __LINE__, &mtr);
2201 		rec = btr_cur_get_rec(&cursor);
2202 		offsets = rec_get_offsets(rec, index, NULL,
2203 					  ULINT_UNDEFINED, &heap);
2204 
2205 		DEBUG_SYNC_C("before_row_ins_extern");
2206 		err = btr_store_big_rec_extern_fields(
2207 			index, btr_cur_get_block(&cursor),
2208 			rec, offsets, big_rec, &mtr, BTR_STORE_INSERT);
2209 		DEBUG_SYNC_C("after_row_ins_extern");
2210 
2211 stored_big_rec:
2212 		if (modify) {
2213 			dtuple_big_rec_free(big_rec);
2214 		} else {
2215 			dtuple_convert_back_big_rec(index, entry, big_rec);
2216 		}
2217 
2218 		mtr_commit(&mtr);
2219 	}
2220 
2221 	if (UNIV_LIKELY_NULL(heap)) {
2222 		mem_heap_free(heap);
2223 	}
2224 	return(err);
2225 }
2226 
2227 /***************************************************************//**
2228 Inserts an index entry to index. Tries first optimistic, then pessimistic
2229 descent down the tree. If the entry matches enough to a delete marked record,
2230 performs the insert by updating or delete unmarking the delete marked
2231 record.
2232 @return	DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2233 UNIV_INTERN
2234 ulint
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,ulint n_ext,ibool foreign,que_thr_t * thr)2235 row_ins_index_entry(
2236 /*================*/
2237 	dict_index_t*	index,	/*!< in: index */
2238 	dtuple_t*	entry,	/*!< in/out: index entry to insert */
2239 	ulint		n_ext,	/*!< in: number of externally stored columns */
2240 	ibool		foreign,/*!< in: TRUE=check foreign key constraints
2241 				(foreign=FALSE only during CREATE INDEX) */
2242 	que_thr_t*	thr)	/*!< in: query thread */
2243 {
2244 	ulint	err;
2245 
2246 	DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
2247 			DBUG_SET("-d,row_ins_index_entry_timeout");
2248 			return(DB_LOCK_WAIT);});
2249 
2250 	if (foreign && UT_LIST_GET_FIRST(index->table->foreign_list)) {
2251 		err = row_ins_check_foreign_constraints(index->table, index,
2252 							entry, thr);
2253 		if (err != DB_SUCCESS) {
2254 
2255 			return(err);
2256 		}
2257 	}
2258 
2259 	/* Try first optimistic descent to the B-tree */
2260 
2261 	err = row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry,
2262 				      n_ext, thr);
2263 	if (err != DB_FAIL) {
2264 		if (index == dict_table_get_first_index(index->table)
2265 		    && thr_get_trx(thr)->mysql_thd != 0) {
2266 			DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
2267 		}
2268 		return(err);
2269 	}
2270 
2271 	/* Try then pessimistic descent to the B-tree */
2272 
2273 	err = row_ins_index_entry_low(BTR_MODIFY_TREE, index, entry,
2274 				      n_ext, thr);
2275 	return(err);
2276 }
2277 
2278 /***********************************************************//**
2279 Sets the values of the dtuple fields in entry from the values of appropriate
2280 columns in row. */
2281 static
2282 void
row_ins_index_entry_set_vals(dict_index_t * index,dtuple_t * entry,const dtuple_t * row)2283 row_ins_index_entry_set_vals(
2284 /*=========================*/
2285 	dict_index_t*	index,	/*!< in: index */
2286 	dtuple_t*	entry,	/*!< in: index entry to make */
2287 	const dtuple_t*	row)	/*!< in: row */
2288 {
2289 	ulint	n_fields;
2290 	ulint	i;
2291 
2292 	ut_ad(entry && row);
2293 
2294 	n_fields = dtuple_get_n_fields(entry);
2295 
2296 	for (i = 0; i < n_fields; i++) {
2297 		dict_field_t*	ind_field;
2298 		dfield_t*	field;
2299 		const dfield_t*	row_field;
2300 		ulint		len;
2301 
2302 		field = dtuple_get_nth_field(entry, i);
2303 		ind_field = dict_index_get_nth_field(index, i);
2304 		row_field = dtuple_get_nth_field(row, ind_field->col->ind);
2305 		len = dfield_get_len(row_field);
2306 
2307 		/* Check column prefix indexes */
2308 		if (ind_field->prefix_len > 0
2309 		    && dfield_get_len(row_field) != UNIV_SQL_NULL) {
2310 
2311 			const	dict_col_t*	col
2312 				= dict_field_get_col(ind_field);
2313 
2314 			len = dtype_get_at_most_n_mbchars(
2315 				col->prtype, col->mbminmaxlen,
2316 				ind_field->prefix_len,
2317 				len, dfield_get_data(row_field));
2318 
2319 			ut_ad(!dfield_is_ext(row_field));
2320 		}
2321 
2322 		dfield_set_data(field, dfield_get_data(row_field), len);
2323 		if (dfield_is_ext(row_field)) {
2324 			ut_ad(dict_index_is_clust(index));
2325 			dfield_set_ext(field);
2326 		}
2327 	}
2328 }
2329 
2330 /***********************************************************//**
2331 Inserts a single index entry to the table.
2332 @return DB_SUCCESS if operation successfully completed, else error
2333 code or DB_LOCK_WAIT */
2334 static
2335 ulint
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)2336 row_ins_index_entry_step(
2337 /*=====================*/
2338 	ins_node_t*	node,	/*!< in: row insert node */
2339 	que_thr_t*	thr)	/*!< in: query thread */
2340 {
2341 	ulint	err;
2342 
2343 	ut_ad(dtuple_check_typed(node->row));
2344 
2345 	row_ins_index_entry_set_vals(node->index, node->entry, node->row);
2346 
2347 	ut_ad(dtuple_check_typed(node->entry));
2348 
2349 	err = row_ins_index_entry(node->index, node->entry, 0, TRUE, thr);
2350 
2351 	return(err);
2352 }
2353 
2354 /***********************************************************//**
2355 Allocates a row id for row and inits the node->index field. */
2356 UNIV_INLINE
2357 void
row_ins_alloc_row_id_step(ins_node_t * node)2358 row_ins_alloc_row_id_step(
2359 /*======================*/
2360 	ins_node_t*	node)	/*!< in: row insert node */
2361 {
2362 	row_id_t	row_id;
2363 
2364 	ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
2365 
2366 	if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
2367 
2368 		/* No row id is stored if the clustered index is unique */
2369 
2370 		return;
2371 	}
2372 
2373 	/* Fill in row id value to row */
2374 
2375 	row_id = dict_sys_get_new_row_id();
2376 
2377 	dict_sys_write_row_id(node->row_id_buf, row_id);
2378 }
2379 
2380 /***********************************************************//**
2381 Gets a row to insert from the values list. */
2382 UNIV_INLINE
2383 void
row_ins_get_row_from_values(ins_node_t * node)2384 row_ins_get_row_from_values(
2385 /*========================*/
2386 	ins_node_t*	node)	/*!< in: row insert node */
2387 {
2388 	que_node_t*	list_node;
2389 	dfield_t*	dfield;
2390 	dtuple_t*	row;
2391 	ulint		i;
2392 
2393 	/* The field values are copied in the buffers of the select node and
2394 	it is safe to use them until we fetch from select again: therefore
2395 	we can just copy the pointers */
2396 
2397 	row = node->row;
2398 
2399 	i = 0;
2400 	list_node = node->values_list;
2401 
2402 	while (list_node) {
2403 		eval_exp(list_node);
2404 
2405 		dfield = dtuple_get_nth_field(row, i);
2406 		dfield_copy_data(dfield, que_node_get_val(list_node));
2407 
2408 		i++;
2409 		list_node = que_node_get_next(list_node);
2410 	}
2411 }
2412 
2413 /***********************************************************//**
2414 Gets a row to insert from the select list. */
2415 UNIV_INLINE
2416 void
row_ins_get_row_from_select(ins_node_t * node)2417 row_ins_get_row_from_select(
2418 /*========================*/
2419 	ins_node_t*	node)	/*!< in: row insert node */
2420 {
2421 	que_node_t*	list_node;
2422 	dfield_t*	dfield;
2423 	dtuple_t*	row;
2424 	ulint		i;
2425 
2426 	/* The field values are copied in the buffers of the select node and
2427 	it is safe to use them until we fetch from select again: therefore
2428 	we can just copy the pointers */
2429 
2430 	row = node->row;
2431 
2432 	i = 0;
2433 	list_node = node->select->select_list;
2434 
2435 	while (list_node) {
2436 		dfield = dtuple_get_nth_field(row, i);
2437 		dfield_copy_data(dfield, que_node_get_val(list_node));
2438 
2439 		i++;
2440 		list_node = que_node_get_next(list_node);
2441 	}
2442 }
2443 
2444 /***********************************************************//**
2445 Inserts a row to a table.
2446 @return DB_SUCCESS if operation successfully completed, else error
2447 code or DB_LOCK_WAIT */
2448 static
2449 ulint
row_ins(ins_node_t * node,que_thr_t * thr)2450 row_ins(
2451 /*====*/
2452 	ins_node_t*	node,	/*!< in: row insert node */
2453 	que_thr_t*	thr)	/*!< in: query thread */
2454 {
2455 	ulint	err;
2456 
2457 	ut_ad(node && thr);
2458 
2459 	if (node->state == INS_NODE_ALLOC_ROW_ID) {
2460 
2461 		row_ins_alloc_row_id_step(node);
2462 
2463 		node->index = dict_table_get_first_index(node->table);
2464 		node->entry = UT_LIST_GET_FIRST(node->entry_list);
2465 
2466 		if (node->ins_type == INS_SEARCHED) {
2467 
2468 			row_ins_get_row_from_select(node);
2469 
2470 		} else if (node->ins_type == INS_VALUES) {
2471 
2472 			row_ins_get_row_from_values(node);
2473 		}
2474 
2475 		node->state = INS_NODE_INSERT_ENTRIES;
2476 	}
2477 
2478 	ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
2479 
2480 	while (node->index != NULL) {
2481 		err = row_ins_index_entry_step(node, thr);
2482 
2483 		if (err != DB_SUCCESS) {
2484 
2485 			return(err);
2486 		}
2487 
2488 		node->index = dict_table_get_next_index(node->index);
2489 		node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
2490 
2491 		/* Skip corrupted secondar index and its entry */
2492 		while (node->index && dict_index_is_corrupted(node->index)) {
2493 
2494 			node->index = dict_table_get_next_index(node->index);
2495 			node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
2496 		}
2497 	}
2498 
2499 	ut_ad(node->entry == NULL);
2500 
2501 	node->state = INS_NODE_ALLOC_ROW_ID;
2502 
2503 	return(DB_SUCCESS);
2504 }
2505 
2506 /***********************************************************//**
2507 Inserts a row to a table. This is a high-level function used in SQL execution
2508 graphs.
2509 @return	query thread to run next or NULL */
2510 UNIV_INTERN
2511 que_thr_t*
row_ins_step(que_thr_t * thr)2512 row_ins_step(
2513 /*=========*/
2514 	que_thr_t*	thr)	/*!< in: query thread */
2515 {
2516 	ins_node_t*	node;
2517 	que_node_t*	parent;
2518 	sel_node_t*	sel_node;
2519 	trx_t*		trx;
2520 	ulint		err;
2521 
2522 	ut_ad(thr);
2523 
2524 	trx = thr_get_trx(thr);
2525 
2526 	trx_start_if_not_started(trx);
2527 
2528 	node = thr->run_node;
2529 
2530 	ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
2531 
2532 	parent = que_node_get_parent(node);
2533 	sel_node = node->select;
2534 
2535 	if (thr->prev_node == parent) {
2536 		node->state = INS_NODE_SET_IX_LOCK;
2537 	}
2538 
2539 	/* If this is the first time this node is executed (or when
2540 	execution resumes after wait for the table IX lock), set an
2541 	IX lock on the table and reset the possible select node. MySQL's
2542 	partitioned table code may also call an insert within the same
2543 	SQL statement AFTER it has used this table handle to do a search.
2544 	This happens, for example, when a row update moves it to another
2545 	partition. In that case, we have already set the IX lock on the
2546 	table during the search operation, and there is no need to set
2547 	it again here. But we must write trx->id to node->trx_id_buf. */
2548 
2549 	trx_write_trx_id(node->trx_id_buf, trx->id);
2550 
2551 	if (node->state == INS_NODE_SET_IX_LOCK) {
2552 
2553 		/* It may be that the current session has not yet started
2554 		its transaction, or it has been committed: */
2555 
2556 		if (trx->id == node->trx_id) {
2557 			/* No need to do IX-locking */
2558 
2559 			goto same_trx;
2560 		}
2561 
2562 		err = lock_table(0, node->table, LOCK_IX, thr);
2563 
2564 		if (err != DB_SUCCESS) {
2565 
2566 			goto error_handling;
2567 		}
2568 
2569 		node->trx_id = trx->id;
2570 same_trx:
2571 		node->state = INS_NODE_ALLOC_ROW_ID;
2572 
2573 		if (node->ins_type == INS_SEARCHED) {
2574 			/* Reset the cursor */
2575 			sel_node->state = SEL_NODE_OPEN;
2576 
2577 			/* Fetch a row to insert */
2578 
2579 			thr->run_node = sel_node;
2580 
2581 			return(thr);
2582 		}
2583 	}
2584 
2585 	if ((node->ins_type == INS_SEARCHED)
2586 	    && (sel_node->state != SEL_NODE_FETCH)) {
2587 
2588 		ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
2589 
2590 		/* No more rows to insert */
2591 		thr->run_node = parent;
2592 
2593 		return(thr);
2594 	}
2595 
2596 	/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
2597 
2598 	err = row_ins(node, thr);
2599 
2600 error_handling:
2601 	trx->error_state = err;
2602 
2603 	if (err != DB_SUCCESS) {
2604 		/* err == DB_LOCK_WAIT or SQL error detected */
2605 		return(NULL);
2606 	}
2607 
2608 	/* DO THE TRIGGER ACTIONS HERE */
2609 
2610 	if (node->ins_type == INS_SEARCHED) {
2611 		/* Fetch a row to insert */
2612 
2613 		thr->run_node = sel_node;
2614 	} else {
2615 		thr->run_node = que_node_get_parent(node);
2616 	}
2617 
2618 	return(thr);
2619 }
2620