1 /*****************************************************************************
2
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0ins.cc
29 Insert into a table
30
31 Created 4/20/1996 Heikki Tuuri
32 *******************************************************/
33
34 #include "row0ins.h"
35
36 #ifdef UNIV_NONINL
37 #include "row0ins.ic"
38 #endif
39
40 #include "ha_prototypes.h"
41 #include "dict0dict.h"
42 #include "dict0boot.h"
43 #include "trx0rec.h"
44 #include "trx0undo.h"
45 #include "btr0btr.h"
46 #include "btr0cur.h"
47 #include "mach0data.h"
48 #include "que0que.h"
49 #include "row0upd.h"
50 #include "row0sel.h"
51 #include "row0row.h"
52 #include "row0log.h"
53 #include "rem0cmp.h"
54 #include "lock0lock.h"
55 #include "log0log.h"
56 #include "eval0eval.h"
57 #include "data0data.h"
58 #include "usr0sess.h"
59 #include "buf0lru.h"
60 #include "fts0fts.h"
61 #include "fts0types.h"
62 #include "m_string.h"
63
64 /*************************************************************************
65 IMPORTANT NOTE: Any operation that generates redo MUST check that there
66 is enough space in the redo log before for that operation. This is
67 done by calling log_free_check(). The reason for checking the
68 availability of the redo log space before the start of the operation is
69 that we MUST not hold any synchonization objects when performing the
70 check.
71 If you make a change in this module make sure that no codepath is
72 introduced where a call to log_free_check() is bypassed. */
73
74 /*********************************************************************//**
75 Creates an insert node struct.
76 @return own: insert node struct */
77 UNIV_INTERN
78 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)79 ins_node_create(
80 /*============*/
81 ulint ins_type, /*!< in: INS_VALUES, ... */
82 dict_table_t* table, /*!< in: table where to insert */
83 mem_heap_t* heap) /*!< in: mem heap where created */
84 {
85 ins_node_t* node;
86
87 node = static_cast<ins_node_t*>(
88 mem_heap_alloc(heap, sizeof(ins_node_t)));
89
90 node->common.type = QUE_NODE_INSERT;
91
92 node->ins_type = ins_type;
93
94 node->state = INS_NODE_SET_IX_LOCK;
95 node->table = table;
96 node->index = NULL;
97 node->entry = NULL;
98
99 node->select = NULL;
100
101 node->trx_id = 0;
102
103 node->entry_sys_heap = mem_heap_create(128);
104
105 node->magic_n = INS_NODE_MAGIC_N;
106
107 return(node);
108 }
109
110 /***********************************************************//**
111 Creates an entry template for each index of a table. */
112 static
113 void
ins_node_create_entry_list(ins_node_t * node)114 ins_node_create_entry_list(
115 /*=======================*/
116 ins_node_t* node) /*!< in: row insert node */
117 {
118 dict_index_t* index;
119 dtuple_t* entry;
120
121 ut_ad(node->entry_sys_heap);
122
123 UT_LIST_INIT(node->entry_list);
124
125 /* We will include all indexes (include those corrupted
126 secondary indexes) in the entry list. Filteration of
127 these corrupted index will be done in row_ins() */
128
129 for (index = dict_table_get_first_index(node->table);
130 index != 0;
131 index = dict_table_get_next_index(index)) {
132
133 entry = row_build_index_entry(
134 node->row, NULL, index, node->entry_sys_heap);
135
136 UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
137 }
138 }
139
140 /*****************************************************************//**
141 Adds system field buffers to a row. */
142 static
143 void
row_ins_alloc_sys_fields(ins_node_t * node)144 row_ins_alloc_sys_fields(
145 /*=====================*/
146 ins_node_t* node) /*!< in: insert node */
147 {
148 dtuple_t* row;
149 dict_table_t* table;
150 mem_heap_t* heap;
151 const dict_col_t* col;
152 dfield_t* dfield;
153 byte* ptr;
154
155 row = node->row;
156 table = node->table;
157 heap = node->entry_sys_heap;
158
159 ut_ad(row && table && heap);
160 ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
161
162 /* allocate buffer to hold the needed system created hidden columns. */
163 uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
164 ptr = static_cast<byte*>(mem_heap_zalloc(heap, len));
165
166 /* 1. Populate row-id */
167 col = dict_table_get_sys_col(table, DATA_ROW_ID);
168
169 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
170
171 dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
172
173 node->row_id_buf = ptr;
174
175 ptr += DATA_ROW_ID_LEN;
176
177 /* 2. Populate trx id */
178 col = dict_table_get_sys_col(table, DATA_TRX_ID);
179
180 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
181
182 dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
183
184 node->trx_id_buf = ptr;
185
186 ptr += DATA_TRX_ID_LEN;
187
188 /* 3. Populate roll ptr */
189
190 col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
191
192 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
193
194 dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
195 }
196
197 /*********************************************************************//**
198 Sets a new row to insert for an INS_DIRECT node. This function is only used
199 if we have constructed the row separately, which is a rare case; this
200 function is quite slow. */
201 UNIV_INTERN
202 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)203 ins_node_set_new_row(
204 /*=================*/
205 ins_node_t* node, /*!< in: insert node */
206 dtuple_t* row) /*!< in: new row (or first row) for the node */
207 {
208 node->state = INS_NODE_SET_IX_LOCK;
209 node->index = NULL;
210 node->entry = NULL;
211
212 node->row = row;
213
214 mem_heap_empty(node->entry_sys_heap);
215
216 /* Create templates for index entries */
217
218 ins_node_create_entry_list(node);
219
220 /* Allocate from entry_sys_heap buffers for sys fields */
221
222 row_ins_alloc_sys_fields(node);
223
224 /* As we allocated a new trx id buf, the trx id should be written
225 there again: */
226
227 node->trx_id = 0;
228 }
229
230 /*******************************************************************//**
231 Does an insert operation by updating a delete-marked existing record
232 in the index. This situation can occur if the delete-marked record is
233 kept in the index for consistent reads.
234 @return DB_SUCCESS or error code */
235 static MY_ATTRIBUTE((nonnull, warn_unused_result))
236 dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)237 row_ins_sec_index_entry_by_modify(
238 /*==============================*/
239 ulint flags, /*!< in: undo logging and locking flags */
240 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
241 depending on whether mtr holds just a leaf
242 latch or also a tree latch */
243 btr_cur_t* cursor, /*!< in: B-tree cursor */
244 ulint** offsets,/*!< in/out: offsets on cursor->page_cur.rec */
245 mem_heap_t* offsets_heap,
246 /*!< in/out: memory heap that can be emptied */
247 mem_heap_t* heap, /*!< in/out: memory heap */
248 const dtuple_t* entry, /*!< in: index entry to insert */
249 que_thr_t* thr, /*!< in: query thread */
250 mtr_t* mtr) /*!< in: mtr; must be committed before
251 latching any further pages */
252 {
253 big_rec_t* dummy_big_rec;
254 upd_t* update;
255 rec_t* rec;
256 dberr_t err;
257
258 rec = btr_cur_get_rec(cursor);
259
260 ut_ad(!dict_index_is_clust(cursor->index));
261 ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
262 ut_ad(!entry->info_bits);
263
264 /* We know that in the alphabetical ordering, entry and rec are
265 identified. But in their binary form there may be differences if
266 there are char fields in them. Therefore we have to calculate the
267 difference. */
268
269 update = row_upd_build_sec_rec_difference_binary(
270 rec, cursor->index, *offsets, entry, heap);
271
272 /* If operating in fake_change mode then flow will not mark the record
273 deleted but will still assume it and take delete-mark path. Condition
274 below has a different path if record is not marked deleted but we need
275 to still by-pass it given that original flow has taken this path for
276 fake_change mode execution assuming record is delete-marked. */
277 if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))
278 && UNIV_UNLIKELY(!thr_get_trx(thr)->fake_changes)) {
279 /* We should never insert in place of a record that
280 has not been delete-marked. The only exception is when
281 online CREATE INDEX copied the changes that we already
282 made to the clustered index, and completed the
283 secondary index creation before we got here. In this
284 case, the change would already be there. The CREATE
285 INDEX should be waiting for a MySQL meta-data lock
286 upgrade at least until this INSERT or UPDATE
287 returns. After that point, the TEMP_INDEX_PREFIX
288 would be dropped from the index name in
289 commit_inplace_alter_table(). */
290 ut_a(update->n_fields == 0);
291 ut_a(*cursor->index->name == TEMP_INDEX_PREFIX);
292 ut_ad(!dict_index_is_online_ddl(cursor->index));
293 return(DB_SUCCESS);
294 }
295
296 if (mode == BTR_MODIFY_LEAF) {
297 /* Try an optimistic updating of the record, keeping changes
298 within the page */
299
300 /* TODO: pass only *offsets */
301 err = btr_cur_optimistic_update(
302 flags | BTR_KEEP_SYS_FLAG, cursor,
303 offsets, &offsets_heap, update, 0, thr,
304 thr_get_trx(thr)->id, mtr);
305 switch (err) {
306 case DB_OVERFLOW:
307 case DB_UNDERFLOW:
308 case DB_ZIP_OVERFLOW:
309 err = DB_FAIL;
310 default:
311 break;
312 }
313 } else {
314 ut_a(mode == BTR_MODIFY_TREE);
315 if (buf_LRU_buf_pool_running_out()) {
316
317 return(DB_LOCK_TABLE_FULL);
318 }
319
320 err = btr_cur_pessimistic_update(
321 flags | BTR_KEEP_SYS_FLAG, cursor,
322 offsets, &offsets_heap,
323 heap, &dummy_big_rec, update, 0,
324 thr, thr_get_trx(thr)->id, mtr);
325 ut_ad(!dummy_big_rec);
326 }
327
328 return(err);
329 }
330
331 /*******************************************************************//**
332 Does an insert operation by delete unmarking and updating a delete marked
333 existing record in the index. This situation can occur if the delete marked
334 record is kept in the index for consistent reads.
335 @return DB_SUCCESS, DB_FAIL, or error code */
336 static MY_ATTRIBUTE((nonnull, warn_unused_result))
337 dberr_t
row_ins_clust_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,big_rec_t ** big_rec,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)338 row_ins_clust_index_entry_by_modify(
339 /*================================*/
340 ulint flags, /*!< in: undo logging and locking flags */
341 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
342 depending on whether mtr holds just a leaf
343 latch or also a tree latch */
344 btr_cur_t* cursor, /*!< in: B-tree cursor */
345 ulint** offsets,/*!< out: offsets on cursor->page_cur.rec */
346 mem_heap_t** offsets_heap,
347 /*!< in/out: pointer to memory heap that can
348 be emptied, or NULL */
349 mem_heap_t* heap, /*!< in/out: memory heap */
350 big_rec_t** big_rec,/*!< out: possible big rec vector of fields
351 which have to be stored externally by the
352 caller */
353 const dtuple_t* entry, /*!< in: index entry to insert */
354 que_thr_t* thr, /*!< in: query thread */
355 mtr_t* mtr) /*!< in: mtr; must be committed before
356 latching any further pages */
357 {
358 const rec_t* rec;
359 const upd_t* update;
360 dberr_t err;
361
362 ut_ad(dict_index_is_clust(cursor->index));
363
364 *big_rec = NULL;
365
366 rec = btr_cur_get_rec(cursor);
367
368 ut_ad(rec_get_deleted_flag(rec,
369 dict_table_is_comp(cursor->index->table)));
370
371 /* Build an update vector containing all the fields to be modified;
372 NOTE that this vector may NOT contain system columns trx_id or
373 roll_ptr */
374
375 update = row_upd_build_difference_binary(
376 cursor->index, entry, rec, NULL, true,
377 thr_get_trx(thr), heap);
378 if (mode != BTR_MODIFY_TREE) {
379 ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
380
381 /* Try optimistic updating of the record, keeping changes
382 within the page */
383
384 err = btr_cur_optimistic_update(
385 flags, cursor, offsets, offsets_heap, update, 0, thr,
386 thr_get_trx(thr)->id, mtr);
387 switch (err) {
388 case DB_OVERFLOW:
389 case DB_UNDERFLOW:
390 case DB_ZIP_OVERFLOW:
391 err = DB_FAIL;
392 default:
393 break;
394 }
395 } else {
396 if (buf_LRU_buf_pool_running_out()) {
397
398 return(DB_LOCK_TABLE_FULL);
399
400 }
401 err = btr_cur_pessimistic_update(
402 flags | BTR_KEEP_POS_FLAG,
403 cursor, offsets, offsets_heap, heap,
404 big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
405 }
406
407 return(err);
408 }
409
410 /*********************************************************************//**
411 Returns TRUE if in a cascaded update/delete an ancestor node of node
412 updates (not DELETE, but UPDATE) table.
413 @return TRUE if an ancestor updates table */
414 static
415 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)416 row_ins_cascade_ancestor_updates_table(
417 /*===================================*/
418 que_node_t* node, /*!< in: node in a query graph */
419 dict_table_t* table) /*!< in: table */
420 {
421 que_node_t* parent;
422
423 for (parent = que_node_get_parent(node);
424 que_node_get_type(parent) == QUE_NODE_UPDATE;
425 parent = que_node_get_parent(parent)) {
426
427 upd_node_t* upd_node;
428
429 upd_node = static_cast<upd_node_t*>(parent);
430
431 if (upd_node->table == table && upd_node->is_delete == FALSE) {
432
433 return(TRUE);
434 }
435 }
436
437 return(FALSE);
438 }
439
440 /*********************************************************************//**
441 Returns the number of ancestor UPDATE or DELETE nodes of a
442 cascaded update/delete node.
443 @return number of ancestors */
444 static MY_ATTRIBUTE((nonnull, warn_unused_result))
445 ulint
row_ins_cascade_n_ancestors(que_node_t * node)446 row_ins_cascade_n_ancestors(
447 /*========================*/
448 que_node_t* node) /*!< in: node in a query graph */
449 {
450 que_node_t* parent;
451 ulint n_ancestors = 0;
452
453 for (parent = que_node_get_parent(node);
454 que_node_get_type(parent) == QUE_NODE_UPDATE;
455 parent = que_node_get_parent(parent)) {
456
457 n_ancestors++;
458 }
459
460 return(n_ancestors);
461 }
462
463 /******************************************************************//**
464 Calculates the update vector node->cascade->update for a child table in
465 a cascaded update.
466 @return number of fields in the calculated update vector; the value
467 can also be 0 if no foreign key fields changed; the returned value is
468 ULINT_UNDEFINED if the column type in the child table is too short to
469 fit the new value in the parent table: that means the update fails */
470 static MY_ATTRIBUTE((nonnull, warn_unused_result))
471 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)472 row_ins_cascade_calc_update_vec(
473 /*============================*/
474 upd_node_t* node, /*!< in: update node of the parent
475 table */
476 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
477 type is != 0 */
478 mem_heap_t* heap, /*!< in: memory heap to use as
479 temporary storage */
480 trx_t* trx, /*!< in: update transaction */
481 ibool* fts_col_affected)/*!< out: is FTS column affected */
482 {
483 upd_node_t* cascade = node->cascade_node;
484 dict_table_t* table = foreign->foreign_table;
485 dict_index_t* index = foreign->foreign_index;
486 upd_t* update;
487 dict_table_t* parent_table;
488 dict_index_t* parent_index;
489 upd_t* parent_update;
490 ulint n_fields_updated;
491 ulint parent_field_no;
492 ulint i;
493 ulint j;
494 ibool doc_id_updated = FALSE;
495 ulint doc_id_pos = 0;
496 doc_id_t new_doc_id = FTS_NULL_DOC_ID;
497
498 ut_a(node);
499 ut_a(foreign);
500 ut_a(cascade);
501 ut_a(table);
502 ut_a(index);
503
504 /* Calculate the appropriate update vector which will set the fields
505 in the child index record to the same value (possibly padded with
506 spaces if the column is a fixed length CHAR or FIXBINARY column) as
507 the referenced index record will get in the update. */
508
509 parent_table = node->table;
510 ut_a(parent_table == foreign->referenced_table);
511 parent_index = foreign->referenced_index;
512 parent_update = node->update;
513
514 update = cascade->update;
515
516 update->info_bits = 0;
517 update->n_fields = foreign->n_fields;
518
519 n_fields_updated = 0;
520
521 *fts_col_affected = FALSE;
522
523 if (table->fts) {
524 doc_id_pos = dict_table_get_nth_col_pos(
525 table, table->fts->doc_col);
526 }
527
528 for (i = 0; i < foreign->n_fields; i++) {
529
530 parent_field_no = dict_table_get_nth_col_pos(
531 parent_table,
532 dict_index_get_nth_col_no(parent_index, i));
533
534 for (j = 0; j < parent_update->n_fields; j++) {
535 const upd_field_t* parent_ufield
536 = &parent_update->fields[j];
537
538 if (parent_ufield->field_no == parent_field_no) {
539
540 ulint min_size;
541 const dict_col_t* col;
542 ulint ufield_len;
543 upd_field_t* ufield;
544
545 col = dict_index_get_nth_col(index, i);
546
547 /* A field in the parent index record is
548 updated. Let us make the update vector
549 field for the child table. */
550
551 ufield = update->fields + n_fields_updated;
552
553 ufield->field_no
554 = dict_table_get_nth_col_pos(
555 table, dict_col_get_no(col));
556
557 ufield->orig_len = 0;
558 ufield->exp = NULL;
559
560 ufield->new_val = parent_ufield->new_val;
561 ufield_len = dfield_get_len(&ufield->new_val);
562
563 /* Clear the "external storage" flag */
564 dfield_set_len(&ufield->new_val, ufield_len);
565
566 /* Do not allow a NOT NULL column to be
567 updated as NULL */
568
569 if (dfield_is_null(&ufield->new_val)
570 && (col->prtype & DATA_NOT_NULL)) {
571
572 return(ULINT_UNDEFINED);
573 }
574
575 /* If the new value would not fit in the
576 column, do not allow the update */
577
578 if (!dfield_is_null(&ufield->new_val)
579 && dtype_get_at_most_n_mbchars(
580 col->prtype, col->mbminmaxlen,
581 col->len,
582 ufield_len,
583 static_cast<char*>(
584 dfield_get_data(
585 &ufield->new_val)))
586 < ufield_len) {
587
588 return(ULINT_UNDEFINED);
589 }
590
591 /* If the parent column type has a different
592 length than the child column type, we may
593 need to pad with spaces the new value of the
594 child column */
595
596 min_size = dict_col_get_min_size(col);
597
598 /* Because UNIV_SQL_NULL (the marker
599 of SQL NULL values) exceeds all possible
600 values of min_size, the test below will
601 not hold for SQL NULL columns. */
602
603 if (min_size > ufield_len) {
604
605 byte* pad;
606 ulint pad_len;
607 byte* padded_data;
608 ulint mbminlen;
609
610 padded_data = static_cast<byte*>(
611 mem_heap_alloc(
612 heap, min_size));
613
614 pad = padded_data + ufield_len;
615 pad_len = min_size - ufield_len;
616
617 memcpy(padded_data,
618 dfield_get_data(&ufield
619 ->new_val),
620 ufield_len);
621
622 mbminlen = dict_col_get_mbminlen(col);
623
624 ut_ad(!(ufield_len % mbminlen));
625 ut_ad(!(min_size % mbminlen));
626
627 if (mbminlen == 1
628 && dtype_get_charset_coll(
629 col->prtype)
630 == DATA_MYSQL_BINARY_CHARSET_COLL) {
631 /* Do not pad BINARY columns */
632 return(ULINT_UNDEFINED);
633 }
634
635 row_mysql_pad_col(mbminlen,
636 pad, pad_len);
637 dfield_set_data(&ufield->new_val,
638 padded_data, min_size);
639 }
640
641 /* Check whether the current column has
642 FTS index on it */
643 if (table->fts
644 && dict_table_is_fts_column(
645 table->fts->indexes,
646 dict_col_get_no(col))
647 != ULINT_UNDEFINED) {
648 *fts_col_affected = TRUE;
649 }
650
651 /* If Doc ID is updated, check whether the
652 Doc ID is valid */
653 if (table->fts
654 && ufield->field_no == doc_id_pos) {
655 doc_id_t n_doc_id;
656
657 n_doc_id =
658 table->fts->cache->next_doc_id;
659
660 new_doc_id = fts_read_doc_id(
661 static_cast<const byte*>(
662 dfield_get_data(
663 &ufield->new_val)));
664
665 if (new_doc_id <= 0) {
666 fprintf(stderr,
667 "InnoDB: FTS Doc ID "
668 "must be larger than "
669 "0 \n");
670 return(ULINT_UNDEFINED);
671 }
672
673 if (new_doc_id < n_doc_id) {
674 fprintf(stderr,
675 "InnoDB: FTS Doc ID "
676 "must be larger than "
677 IB_ID_FMT" for table",
678 n_doc_id -1);
679
680 ut_print_name(stderr, trx,
681 TRUE,
682 table->name);
683
684 putc('\n', stderr);
685 return(ULINT_UNDEFINED);
686 }
687
688 *fts_col_affected = TRUE;
689 doc_id_updated = TRUE;
690 }
691
692 n_fields_updated++;
693 }
694 }
695 }
696
697 /* Generate a new Doc ID if FTS index columns get updated */
698 if (table->fts && *fts_col_affected) {
699 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
700 doc_id_t doc_id;
701 upd_field_t* ufield;
702
703 ut_ad(!doc_id_updated);
704 ufield = update->fields + n_fields_updated;
705 fts_get_next_doc_id(table, &trx->fts_next_doc_id);
706 doc_id = fts_update_doc_id(table, ufield,
707 &trx->fts_next_doc_id);
708 n_fields_updated++;
709 fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
710 } else {
711 if (doc_id_updated) {
712 ut_ad(new_doc_id);
713 fts_trx_add_op(trx, table, new_doc_id,
714 FTS_INSERT, NULL);
715 } else {
716 fprintf(stderr, "InnoDB: FTS Doc ID must be "
717 "updated along with FTS indexed "
718 "column for table ");
719 ut_print_name(stderr, trx, TRUE, table->name);
720 putc('\n', stderr);
721 return(ULINT_UNDEFINED);
722 }
723 }
724 }
725
726 update->n_fields = n_fields_updated;
727
728 return(n_fields_updated);
729 }
730
731 /*********************************************************************//**
732 Set detailed error message associated with foreign key errors for
733 the given transaction. */
734 static
735 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)736 row_ins_set_detailed(
737 /*=================*/
738 trx_t* trx, /*!< in: transaction */
739 dict_foreign_t* foreign) /*!< in: foreign key constraint */
740 {
741 ut_ad(!srv_read_only_mode);
742
743 mutex_enter(&srv_misc_tmpfile_mutex);
744 rewind(srv_misc_tmpfile);
745
746 if (os_file_set_eof(srv_misc_tmpfile)) {
747 ut_print_name(srv_misc_tmpfile, trx, TRUE,
748 foreign->foreign_table_name);
749 dict_print_info_on_foreign_key_in_create_format(
750 srv_misc_tmpfile, trx, foreign, FALSE);
751 trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
752 } else {
753 trx_set_detailed_error(trx, "temp file operation failed");
754 }
755
756 mutex_exit(&srv_misc_tmpfile_mutex);
757 }
758
759 /*********************************************************************//**
760 Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
761 and displays information about the given transaction.
762 The caller must release dict_foreign_err_mutex. */
763 static
764 void
row_ins_foreign_trx_print(trx_t * trx)765 row_ins_foreign_trx_print(
766 /*======================*/
767 trx_t* trx) /*!< in: transaction */
768 {
769 ulint n_rec_locks;
770 ulint n_trx_locks;
771 ulint heap_size;
772
773 if (srv_read_only_mode) {
774 return;
775 }
776
777 lock_mutex_enter();
778 n_rec_locks = lock_number_of_rows_locked(&trx->lock);
779 n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
780 heap_size = mem_heap_get_size(trx->lock.lock_heap);
781 lock_mutex_exit();
782
783 mutex_enter(&trx_sys->mutex);
784
785 mutex_enter(&dict_foreign_err_mutex);
786 rewind(dict_foreign_err_file);
787 ut_print_timestamp(dict_foreign_err_file);
788 fputs(" Transaction:\n", dict_foreign_err_file);
789
790 trx_print_low(dict_foreign_err_file, trx, 600,
791 n_rec_locks, n_trx_locks, heap_size);
792
793 mutex_exit(&trx_sys->mutex);
794
795 ut_ad(mutex_own(&dict_foreign_err_mutex));
796 }
797
798 /*********************************************************************//**
799 Reports a foreign key error associated with an update or a delete of a
800 parent table index entry. */
801 static
802 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)803 row_ins_foreign_report_err(
804 /*=======================*/
805 const char* errstr, /*!< in: error string from the viewpoint
806 of the parent table */
807 que_thr_t* thr, /*!< in: query thread whose run_node
808 is an update node */
809 dict_foreign_t* foreign, /*!< in: foreign key constraint */
810 const rec_t* rec, /*!< in: a matching index record in the
811 child table */
812 const dtuple_t* entry) /*!< in: index entry in the parent
813 table */
814 {
815 if (srv_read_only_mode) {
816 return;
817 }
818
819 FILE* ef = dict_foreign_err_file;
820 trx_t* trx = thr_get_trx(thr);
821
822 row_ins_set_detailed(trx, foreign);
823
824 row_ins_foreign_trx_print(trx);
825
826 fputs("Foreign key constraint fails for table ", ef);
827 ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
828 fputs(":\n", ef);
829 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
830 TRUE);
831 putc('\n', ef);
832 fputs(errstr, ef);
833 fputs(" in parent table, in index ", ef);
834 ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
835 if (entry) {
836 fputs(" tuple:\n", ef);
837 dtuple_print(ef, entry);
838 }
839 fputs("\nBut in child table ", ef);
840 ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
841 fputs(", in index ", ef);
842 ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
843 if (rec) {
844 fputs(", there is a record:\n", ef);
845 rec_print(ef, rec, foreign->foreign_index);
846 } else {
847 fputs(", the record is not available\n", ef);
848 }
849 putc('\n', ef);
850
851 mutex_exit(&dict_foreign_err_mutex);
852 }
853
854 /*********************************************************************//**
855 Reports a foreign key error to dict_foreign_err_file when we are trying
856 to add an index entry to a child table. Note that the adding may be the result
857 of an update, too. */
858 static
859 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)860 row_ins_foreign_report_add_err(
861 /*===========================*/
862 trx_t* trx, /*!< in: transaction */
863 dict_foreign_t* foreign, /*!< in: foreign key constraint */
864 const rec_t* rec, /*!< in: a record in the parent table:
865 it does not match entry because we
866 have an error! */
867 const dtuple_t* entry) /*!< in: index entry to insert in the
868 child table */
869 {
870 if (srv_read_only_mode) {
871 return;
872 }
873
874 FILE* ef = dict_foreign_err_file;
875
876 row_ins_set_detailed(trx, foreign);
877
878 row_ins_foreign_trx_print(trx);
879
880 fputs("Foreign key constraint fails for table ", ef);
881 ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
882 fputs(":\n", ef);
883 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
884 TRUE);
885 fputs("\nTrying to add in child table, in index ", ef);
886 ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
887 if (entry) {
888 fputs(" tuple:\n", ef);
889 /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
890 It would be better to only display the user columns. */
891 dtuple_print(ef, entry);
892 }
893 fputs("\nBut in parent table ", ef);
894 ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
895 fputs(", in index ", ef);
896 ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
897 fputs(",\nthe closest match we can find is record:\n", ef);
898 if (rec && page_rec_is_supremum(rec)) {
899 /* If the cursor ended on a supremum record, it is better
900 to report the previous record in the error message, so that
901 the user gets a more descriptive error message. */
902 rec = page_rec_get_prev_const(rec);
903 }
904
905 if (rec) {
906 rec_print(ef, rec, foreign->referenced_index);
907 }
908 putc('\n', ef);
909
910 mutex_exit(&dict_foreign_err_mutex);
911 }
912
913 /*********************************************************************//**
914 Invalidate the query cache for the given table. */
915 static
916 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)917 row_ins_invalidate_query_cache(
918 /*===========================*/
919 que_thr_t* thr, /*!< in: query thread whose run_node
920 is an update node */
921 const char* name) /*!< in: table name prefixed with
922 database name and a '/' character */
923 {
924 char* buf;
925 char* ptr;
926 ulint len = strlen(name) + 1;
927
928 buf = mem_strdupl(name, len);
929
930 ptr = strchr(buf, '/');
931 ut_a(ptr);
932 *ptr = '\0';
933
934 innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
935 mem_free(buf);
936 }
937
938 /*********************************************************************//**
939 Perform referential actions or checks when a parent row is deleted or updated
940 and the constraint had an ON DELETE or ON UPDATE condition which was not
941 RESTRICT.
942 @return DB_SUCCESS, DB_LOCK_WAIT, or error code */
943 static MY_ATTRIBUTE((nonnull, warn_unused_result))
944 dberr_t
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)945 row_ins_foreign_check_on_constraint(
946 /*================================*/
947 que_thr_t* thr, /*!< in: query thread whose run_node
948 is an update node */
949 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
950 type is != 0 */
951 btr_pcur_t* pcur, /*!< in: cursor placed on a matching
952 index record in the child table */
953 dtuple_t* entry, /*!< in: index entry in the parent
954 table */
955 mtr_t* mtr) /*!< in: mtr holding the latch of pcur
956 page */
957 {
958 upd_node_t* node;
959 upd_node_t* cascade;
960 dict_table_t* table = foreign->foreign_table;
961 dict_index_t* index;
962 dict_index_t* clust_index;
963 dtuple_t* ref;
964 mem_heap_t* upd_vec_heap = NULL;
965 const rec_t* rec;
966 const rec_t* clust_rec;
967 const buf_block_t* clust_block;
968 upd_t* update;
969 ulint n_to_update;
970 dberr_t err;
971 ulint i;
972 trx_t* trx;
973 mem_heap_t* tmp_heap = NULL;
974 doc_id_t doc_id = FTS_NULL_DOC_ID;
975 ibool fts_col_affacted = FALSE;
976
977 ut_a(thr);
978 ut_a(foreign);
979 ut_a(pcur);
980 ut_a(mtr);
981
982 trx = thr_get_trx(thr);
983
984 /* Since we are going to delete or update a row, we have to invalidate
985 the MySQL query cache for table. A deadlock of threads is not possible
986 here because the caller of this function does not hold any latches with
987 the sync0sync.h rank above the lock_sys_t::mutex. The query cache mutex
988 has a rank just above the lock_sys_t::mutex. */
989
990 row_ins_invalidate_query_cache(thr, table->name);
991
992 node = static_cast<upd_node_t*>(thr->run_node);
993
994 if (node->is_delete && 0 == (foreign->type
995 & (DICT_FOREIGN_ON_DELETE_CASCADE
996 | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
997
998 row_ins_foreign_report_err("Trying to delete",
999 thr, foreign,
1000 btr_pcur_get_rec(pcur), entry);
1001
1002 return(DB_ROW_IS_REFERENCED);
1003 }
1004
1005 if (!node->is_delete && 0 == (foreign->type
1006 & (DICT_FOREIGN_ON_UPDATE_CASCADE
1007 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
1008
1009 /* This is an UPDATE */
1010
1011 row_ins_foreign_report_err("Trying to update",
1012 thr, foreign,
1013 btr_pcur_get_rec(pcur), entry);
1014
1015 return(DB_ROW_IS_REFERENCED);
1016 }
1017
1018 if (node->cascade_node == NULL) {
1019 /* Extend our query graph by creating a child to current
1020 update node. The child is used in the cascade or set null
1021 operation. */
1022
1023 node->cascade_heap = mem_heap_create(128);
1024 node->cascade_node = row_create_update_node_for_mysql(
1025 table, node->cascade_heap);
1026 que_node_set_parent(node->cascade_node, node);
1027 }
1028
1029 /* Initialize cascade_node to do the operation we want. Note that we
1030 use the SAME cascade node to do all foreign key operations of the
1031 SQL DELETE: the table of the cascade node may change if there are
1032 several child tables to the table where the delete is done! */
1033
1034 cascade = node->cascade_node;
1035
1036 cascade->table = table;
1037
1038 cascade->foreign = foreign;
1039
1040 if (node->is_delete
1041 && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1042 cascade->is_delete = TRUE;
1043 } else {
1044 cascade->is_delete = FALSE;
1045
1046 if (foreign->n_fields > cascade->update_n_fields) {
1047 /* We have to make the update vector longer */
1048
1049 cascade->update = upd_create(foreign->n_fields,
1050 node->cascade_heap);
1051 cascade->update_n_fields = foreign->n_fields;
1052 }
1053 }
1054
1055 /* We do not allow cyclic cascaded updating (DELETE is allowed,
1056 but not UPDATE) of the same table, as this can lead to an infinite
1057 cycle. Check that we are not updating the same table which is
1058 already being modified in this cascade chain. We have to check
1059 this also because the modification of the indexes of a 'parent'
1060 table may still be incomplete, and we must avoid seeing the indexes
1061 of the parent table in an inconsistent state! */
1062
1063 if (!cascade->is_delete
1064 && row_ins_cascade_ancestor_updates_table(cascade, table)) {
1065
1066 /* We do not know if this would break foreign key
1067 constraints, but play safe and return an error */
1068
1069 err = DB_ROW_IS_REFERENCED;
1070
1071 row_ins_foreign_report_err(
1072 "Trying an update, possibly causing a cyclic"
1073 " cascaded update\n"
1074 "in the child table,", thr, foreign,
1075 btr_pcur_get_rec(pcur), entry);
1076
1077 goto nonstandard_exit_func;
1078 }
1079
1080 if (row_ins_cascade_n_ancestors(cascade) >= 15) {
1081 err = DB_ROW_IS_REFERENCED;
1082
1083 row_ins_foreign_report_err(
1084 "Trying a too deep cascaded delete or update\n",
1085 thr, foreign, btr_pcur_get_rec(pcur), entry);
1086
1087 goto nonstandard_exit_func;
1088 }
1089
1090 index = btr_pcur_get_btr_cur(pcur)->index;
1091
1092 ut_a(index == foreign->foreign_index);
1093
1094 rec = btr_pcur_get_rec(pcur);
1095
1096 tmp_heap = mem_heap_create(256);
1097
1098 if (dict_index_is_clust(index)) {
1099 /* pcur is already positioned in the clustered index of
1100 the child table */
1101
1102 clust_index = index;
1103 clust_rec = rec;
1104 clust_block = btr_pcur_get_block(pcur);
1105 } else {
1106 /* We have to look for the record in the clustered index
1107 in the child table */
1108
1109 clust_index = dict_table_get_first_index(table);
1110
1111 ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1112 tmp_heap);
1113 btr_pcur_open_with_no_init(clust_index, ref,
1114 PAGE_CUR_LE, BTR_SEARCH_LEAF,
1115 cascade->pcur, 0, mtr);
1116
1117 clust_rec = btr_pcur_get_rec(cascade->pcur);
1118 clust_block = btr_pcur_get_block(cascade->pcur);
1119
1120 if (!page_rec_is_user_rec(clust_rec)
1121 || btr_pcur_get_low_match(cascade->pcur)
1122 < dict_index_get_n_unique(clust_index)) {
1123
1124 fputs("InnoDB: error in cascade of a foreign key op\n"
1125 "InnoDB: ", stderr);
1126 dict_index_name_print(stderr, trx, index);
1127
1128 fputs("\n"
1129 "InnoDB: record ", stderr);
1130 rec_print(stderr, rec, index);
1131 fputs("\n"
1132 "InnoDB: clustered record ", stderr);
1133 rec_print(stderr, clust_rec, clust_index);
1134 fputs("\n"
1135 "InnoDB: Submit a detailed bug report to"
1136 " http://bugs.mysql.com\n", stderr);
1137 ut_ad(0);
1138 err = DB_SUCCESS;
1139
1140 goto nonstandard_exit_func;
1141 }
1142 }
1143
1144 /* Set an X-lock on the row to delete or update in the child table */
1145
1146 err = lock_table(0, table, LOCK_IX, thr);
1147
1148 if (err == DB_SUCCESS) {
1149 /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1150 we already have a normal shared lock on the appropriate
1151 gap if the search criterion was not unique */
1152
1153 err = lock_clust_rec_read_check_and_lock_alt(
1154 0, clust_block, clust_rec, clust_index,
1155 LOCK_X, LOCK_REC_NOT_GAP, thr);
1156 }
1157
1158 if (err != DB_SUCCESS) {
1159
1160 goto nonstandard_exit_func;
1161 }
1162
1163 if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1164 /* This can happen if there is a circular reference of
1165 rows such that cascading delete comes to delete a row
1166 already in the process of being delete marked */
1167 err = DB_SUCCESS;
1168
1169 goto nonstandard_exit_func;
1170 }
1171
1172 if (table->fts) {
1173 doc_id = fts_get_doc_id_from_rec(table, clust_rec, tmp_heap);
1174 }
1175
1176 if (node->is_delete
1177 ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1178 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1179
1180 /* Build the appropriate update vector which sets
1181 foreign->n_fields first fields in rec to SQL NULL */
1182
1183 update = cascade->update;
1184
1185 update->info_bits = 0;
1186 update->n_fields = foreign->n_fields;
1187 UNIV_MEM_INVALID(update->fields,
1188 update->n_fields * sizeof *update->fields);
1189
1190 for (i = 0; i < foreign->n_fields; i++) {
1191 upd_field_t* ufield = &update->fields[i];
1192
1193 ufield->field_no = dict_table_get_nth_col_pos(
1194 table,
1195 dict_index_get_nth_col_no(index, i));
1196 ufield->orig_len = 0;
1197 ufield->exp = NULL;
1198 dfield_set_null(&ufield->new_val);
1199
1200 if (table->fts && dict_table_is_fts_column(
1201 table->fts->indexes,
1202 dict_index_get_nth_col_no(index, i))
1203 != ULINT_UNDEFINED) {
1204 fts_col_affacted = TRUE;
1205 }
1206 }
1207
1208 if (fts_col_affacted) {
1209 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1210 }
1211 } else if (table->fts && cascade->is_delete) {
1212 /* DICT_FOREIGN_ON_DELETE_CASCADE case */
1213 for (i = 0; i < foreign->n_fields; i++) {
1214 if (table->fts && dict_table_is_fts_column(
1215 table->fts->indexes,
1216 dict_index_get_nth_col_no(index, i))
1217 != ULINT_UNDEFINED) {
1218 fts_col_affacted = TRUE;
1219 }
1220 }
1221
1222 if (fts_col_affacted) {
1223 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1224 }
1225 }
1226
1227 if (!node->is_delete
1228 && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1229
1230 /* Build the appropriate update vector which sets changing
1231 foreign->n_fields first fields in rec to new values */
1232
1233 upd_vec_heap = mem_heap_create(256);
1234
1235 n_to_update = row_ins_cascade_calc_update_vec(
1236 node, foreign, upd_vec_heap, trx, &fts_col_affacted);
1237
1238 if (n_to_update == ULINT_UNDEFINED) {
1239 err = DB_ROW_IS_REFERENCED;
1240
1241 row_ins_foreign_report_err(
1242 "Trying a cascaded update where the"
1243 " updated value in the child\n"
1244 "table would not fit in the length"
1245 " of the column, or the value would\n"
1246 "be NULL and the column is"
1247 " declared as not NULL in the child table,",
1248 thr, foreign, btr_pcur_get_rec(pcur), entry);
1249
1250 goto nonstandard_exit_func;
1251 }
1252
1253 if (cascade->update->n_fields == 0) {
1254
1255 /* The update does not change any columns referred
1256 to in this foreign key constraint: no need to do
1257 anything */
1258
1259 err = DB_SUCCESS;
1260
1261 goto nonstandard_exit_func;
1262 }
1263
1264 /* Mark the old Doc ID as deleted */
1265 if (fts_col_affacted) {
1266 ut_ad(table->fts);
1267 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1268 }
1269 }
1270
1271 /* Store pcur position and initialize or store the cascade node
1272 pcur stored position */
1273
1274 btr_pcur_store_position(pcur, mtr);
1275
1276 if (index == clust_index) {
1277 btr_pcur_copy_stored_position(cascade->pcur, pcur);
1278 } else {
1279 btr_pcur_store_position(cascade->pcur, mtr);
1280 }
1281
1282 mtr_commit(mtr);
1283
1284 ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1285
1286 cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1287
1288 err = row_update_cascade_for_mysql(thr, cascade,
1289 foreign->foreign_table);
1290
1291 if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
1292 fprintf(stderr,
1293 "InnoDB: error: table %s has the counter 0"
1294 " though there is\n"
1295 "InnoDB: a FOREIGN KEY check running on it.\n",
1296 foreign->foreign_table->name);
1297 }
1298
1299 /* Release the data dictionary latch for a while, so that we do not
1300 starve other threads from doing CREATE TABLE etc. if we have a huge
1301 cascaded operation running. The counter n_foreign_key_checks_running
1302 will prevent other users from dropping or ALTERing the table when we
1303 release the latch. */
1304
1305 row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1306
1307 DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1308
1309 row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1310
1311 mtr_start(mtr);
1312
1313 /* Restore pcur position */
1314
1315 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1316
1317 if (tmp_heap) {
1318 mem_heap_free(tmp_heap);
1319 }
1320
1321 if (upd_vec_heap) {
1322 mem_heap_free(upd_vec_heap);
1323 }
1324
1325 return(err);
1326
1327 nonstandard_exit_func:
1328 if (tmp_heap) {
1329 mem_heap_free(tmp_heap);
1330 }
1331
1332 if (upd_vec_heap) {
1333 mem_heap_free(upd_vec_heap);
1334 }
1335
1336 btr_pcur_store_position(pcur, mtr);
1337
1338 mtr_commit(mtr);
1339 mtr_start(mtr);
1340
1341 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1342
1343 return(err);
1344 }
1345
1346 /*********************************************************************//**
1347 Sets a shared lock on a record. Used in locking possible duplicate key
1348 records and also in checking foreign key constraints.
1349 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1350 static
1351 dberr_t
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1352 row_ins_set_shared_rec_lock(
1353 /*========================*/
1354 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1355 LOCK_REC_NOT_GAP type lock */
1356 const buf_block_t* block, /*!< in: buffer block of rec */
1357 const rec_t* rec, /*!< in: record */
1358 dict_index_t* index, /*!< in: index */
1359 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1360 que_thr_t* thr) /*!< in: query thread */
1361 {
1362 dberr_t err;
1363
1364 ut_ad(rec_offs_validate(rec, index, offsets));
1365
1366 if (dict_index_is_clust(index)) {
1367 err = lock_clust_rec_read_check_and_lock(
1368 0, block, rec, index, offsets, LOCK_S, type, thr);
1369 } else {
1370 err = lock_sec_rec_read_check_and_lock(
1371 0, block, rec, index, offsets, LOCK_S, type, thr);
1372 }
1373
1374 return(err);
1375 }
1376
1377 /*********************************************************************//**
1378 Sets a exclusive lock on a record. Used in locking possible duplicate key
1379 records
1380 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1381 static
1382 dberr_t
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1383 row_ins_set_exclusive_rec_lock(
1384 /*===========================*/
1385 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1386 LOCK_REC_NOT_GAP type lock */
1387 const buf_block_t* block, /*!< in: buffer block of rec */
1388 const rec_t* rec, /*!< in: record */
1389 dict_index_t* index, /*!< in: index */
1390 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1391 que_thr_t* thr) /*!< in: query thread */
1392 {
1393 dberr_t err;
1394
1395 ut_ad(rec_offs_validate(rec, index, offsets));
1396
1397 if (dict_index_is_clust(index)) {
1398 err = lock_clust_rec_read_check_and_lock(
1399 0, block, rec, index, offsets, LOCK_X, type, thr);
1400 } else {
1401 err = lock_sec_rec_read_check_and_lock(
1402 0, block, rec, index, offsets, LOCK_X, type, thr);
1403 }
1404
1405 return(err);
1406 }
1407
1408 /***************************************************************//**
1409 Checks if foreign key constraint fails for an index entry. Sets shared locks
1410 which lock either the success or the failure of the constraint. NOTE that
1411 the caller must have a shared latch on dict_operation_lock.
1412 @return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1413 UNIV_INTERN
1414 dberr_t
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1415 row_ins_check_foreign_constraint(
1416 /*=============================*/
1417 ibool check_ref,/*!< in: TRUE if we want to check that
1418 the referenced table is ok, FALSE if we
1419 want to check the foreign key table */
1420 dict_foreign_t* foreign,/*!< in: foreign constraint; NOTE that the
1421 tables mentioned in it must be in the
1422 dictionary cache if they exist at all */
1423 dict_table_t* table, /*!< in: if check_ref is TRUE, then the foreign
1424 table, else the referenced table */
1425 dtuple_t* entry, /*!< in: index entry for index */
1426 que_thr_t* thr) /*!< in: query thread */
1427 {
1428 dberr_t err;
1429 upd_node_t* upd_node;
1430 dict_table_t* check_table;
1431 dict_index_t* check_index;
1432 ulint n_fields_cmp;
1433 btr_pcur_t pcur;
1434 int cmp;
1435 ulint i;
1436 mtr_t mtr;
1437 trx_t* trx = thr_get_trx(thr);
1438 mem_heap_t* heap = NULL;
1439 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1440 ulint* offsets = offsets_;
1441 rec_offs_init(offsets_);
1442
1443 run_again:
1444 #ifdef UNIV_SYNC_DEBUG
1445 ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
1446 #endif /* UNIV_SYNC_DEBUG */
1447
1448 err = DB_SUCCESS;
1449
1450 if (trx->check_foreigns == FALSE) {
1451 /* The user has suppressed foreign key checks currently for
1452 this session */
1453 goto exit_func;
1454 }
1455
1456 /* If any of the foreign key fields in entry is SQL NULL, we
1457 suppress the foreign key check: this is compatible with Oracle,
1458 for example */
1459
1460 for (i = 0; i < foreign->n_fields; i++) {
1461 if (UNIV_SQL_NULL == dfield_get_len(
1462 dtuple_get_nth_field(entry, i))) {
1463
1464 goto exit_func;
1465 }
1466 }
1467
1468 if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1469 upd_node = static_cast<upd_node_t*>(thr->run_node);
1470
1471 if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1472 /* If a cascaded update is done as defined by a
1473 foreign key constraint, do not check that
1474 constraint for the child row. In ON UPDATE CASCADE
1475 the update of the parent row is only half done when
1476 we come here: if we would check the constraint here
1477 for the child row it would fail.
1478
1479 A QUESTION remains: if in the child table there are
1480 several constraints which refer to the same parent
1481 table, we should merge all updates to the child as
1482 one update? And the updates can be contradictory!
1483 Currently we just perform the update associated
1484 with each foreign key constraint, one after
1485 another, and the user has problems predicting in
1486 which order they are performed. */
1487
1488 goto exit_func;
1489 }
1490 }
1491
1492 if (check_ref) {
1493 check_table = foreign->referenced_table;
1494 check_index = foreign->referenced_index;
1495 } else {
1496 check_table = foreign->foreign_table;
1497 check_index = foreign->foreign_index;
1498 }
1499
1500 if (check_table == NULL
1501 || check_table->ibd_file_missing
1502 || check_index == NULL) {
1503
1504 if (!srv_read_only_mode && check_ref) {
1505 FILE* ef = dict_foreign_err_file;
1506
1507 row_ins_set_detailed(trx, foreign);
1508
1509 row_ins_foreign_trx_print(trx);
1510
1511 fputs("Foreign key constraint fails for table ", ef);
1512 ut_print_name(ef, trx, TRUE,
1513 foreign->foreign_table_name);
1514 fputs(":\n", ef);
1515 dict_print_info_on_foreign_key_in_create_format(
1516 ef, trx, foreign, TRUE);
1517 fputs("\nTrying to add to index ", ef);
1518 ut_print_name(ef, trx, FALSE,
1519 foreign->foreign_index->name);
1520 fputs(" tuple:\n", ef);
1521 dtuple_print(ef, entry);
1522 fputs("\nBut the parent table ", ef);
1523 ut_print_name(ef, trx, TRUE,
1524 foreign->referenced_table_name);
1525 fputs("\nor its .ibd file does"
1526 " not currently exist!\n", ef);
1527 mutex_exit(&dict_foreign_err_mutex);
1528
1529 err = DB_NO_REFERENCED_ROW;
1530 }
1531
1532 goto exit_func;
1533 }
1534
1535 if (check_table != table) {
1536 /* We already have a LOCK_IX on table, but not necessarily
1537 on check_table */
1538
1539 err = lock_table(0, check_table, LOCK_IS, thr);
1540
1541 if (err != DB_SUCCESS) {
1542
1543 goto do_possible_lock_wait;
1544 }
1545 }
1546
1547 mtr_start(&mtr);
1548
1549 /* Store old value on n_fields_cmp */
1550
1551 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1552
1553 dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1554
1555 btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1556 BTR_SEARCH_LEAF, &pcur, &mtr);
1557
1558 /* Scan index records and check if there is a matching record */
1559
1560 do {
1561 const rec_t* rec = btr_pcur_get_rec(&pcur);
1562 const buf_block_t* block = btr_pcur_get_block(&pcur);
1563
1564 SRV_CORRUPT_TABLE_CHECK(block,
1565 {
1566 err = DB_CORRUPTION;
1567 goto exit_loop;
1568 });
1569
1570 if (page_rec_is_infimum(rec)) {
1571
1572 continue;
1573 }
1574
1575 offsets = rec_get_offsets(rec, check_index,
1576 offsets, ULINT_UNDEFINED, &heap);
1577
1578 if (page_rec_is_supremum(rec)) {
1579
1580 err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1581 rec, check_index,
1582 offsets, thr);
1583 switch (err) {
1584 case DB_SUCCESS_LOCKED_REC:
1585 case DB_SUCCESS:
1586 continue;
1587 default:
1588 goto end_scan;
1589 }
1590 }
1591
1592 cmp = cmp_dtuple_rec(entry, rec, offsets);
1593
1594 if (cmp == 0) {
1595 if (rec_get_deleted_flag(rec,
1596 rec_offs_comp(offsets))) {
1597 err = row_ins_set_shared_rec_lock(
1598 LOCK_ORDINARY, block,
1599 rec, check_index, offsets, thr);
1600 switch (err) {
1601 case DB_SUCCESS_LOCKED_REC:
1602 case DB_SUCCESS:
1603 break;
1604 default:
1605 goto end_scan;
1606 }
1607 } else {
1608 /* Found a matching record. Lock only
1609 a record because we can allow inserts
1610 into gaps */
1611
1612 err = row_ins_set_shared_rec_lock(
1613 LOCK_REC_NOT_GAP, block,
1614 rec, check_index, offsets, thr);
1615
1616 switch (err) {
1617 case DB_SUCCESS_LOCKED_REC:
1618 case DB_SUCCESS:
1619 break;
1620 default:
1621 goto end_scan;
1622 }
1623
1624 if (check_ref) {
1625 err = DB_SUCCESS;
1626
1627 goto end_scan;
1628 } else if (foreign->type != 0) {
1629 /* There is an ON UPDATE or ON DELETE
1630 condition: check them in a separate
1631 function */
1632
1633 err = row_ins_foreign_check_on_constraint(
1634 thr, foreign, &pcur, entry,
1635 &mtr);
1636 if (err != DB_SUCCESS) {
1637 /* Since reporting a plain
1638 "duplicate key" error
1639 message to the user in
1640 cases where a long CASCADE
1641 operation would lead to a
1642 duplicate key in some
1643 other table is very
1644 confusing, map duplicate
1645 key errors resulting from
1646 FK constraints to a
1647 separate error code. */
1648
1649 if (err == DB_DUPLICATE_KEY) {
1650 err = DB_FOREIGN_DUPLICATE_KEY;
1651 }
1652
1653 goto end_scan;
1654 }
1655
1656 /* row_ins_foreign_check_on_constraint
1657 may have repositioned pcur on a
1658 different block */
1659 block = btr_pcur_get_block(&pcur);
1660 } else {
1661 row_ins_foreign_report_err(
1662 "Trying to delete or update",
1663 thr, foreign, rec, entry);
1664
1665 err = DB_ROW_IS_REFERENCED;
1666 goto end_scan;
1667 }
1668 }
1669 } else {
1670 ut_a(cmp < 0);
1671
1672 err = row_ins_set_shared_rec_lock(
1673 LOCK_GAP, block,
1674 rec, check_index, offsets, thr);
1675
1676 switch (err) {
1677 case DB_SUCCESS_LOCKED_REC:
1678 case DB_SUCCESS:
1679 if (check_ref) {
1680 err = DB_NO_REFERENCED_ROW;
1681 row_ins_foreign_report_add_err(
1682 trx, foreign, rec, entry);
1683 } else {
1684 err = DB_SUCCESS;
1685 }
1686 default:
1687 break;
1688 }
1689
1690 goto end_scan;
1691 }
1692 } while (btr_pcur_move_to_next(&pcur, &mtr));
1693
1694 exit_loop:
1695 if (check_ref) {
1696 row_ins_foreign_report_add_err(
1697 trx, foreign, btr_pcur_get_rec(&pcur), entry);
1698 err = DB_NO_REFERENCED_ROW;
1699 } else {
1700 err = DB_SUCCESS;
1701 }
1702
1703 end_scan:
1704 btr_pcur_close(&pcur);
1705
1706 mtr_commit(&mtr);
1707
1708 /* Restore old value */
1709 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1710
1711 do_possible_lock_wait:
1712 if (err == DB_LOCK_WAIT) {
1713 bool verified = false;
1714
1715 trx->error_state = err;
1716
1717 que_thr_stop_for_mysql(thr);
1718
1719 lock_wait_suspend_thread(thr);
1720
1721 if (check_table->to_be_dropped) {
1722 /* The table is being dropped. We shall timeout
1723 this operation */
1724 err = DB_LOCK_WAIT_TIMEOUT;
1725 goto exit_func;
1726 }
1727
1728 /* We had temporarily released dict_operation_lock in
1729 above lock sleep wait, now we have the lock again, and
1730 we will need to re-check whether the foreign key has been
1731 dropped. We only need to verify if the table is referenced
1732 table case (check_ref == 0), since MDL lock will prevent
1733 concurrent DDL and DML on the same table */
1734 if (!check_ref) {
1735 for (dict_foreign_set::iterator it
1736 = table->referenced_set.begin();
1737 it != table->referenced_set.end();
1738 ++it) {
1739 if (*it == foreign) {
1740 verified = true;
1741 break;
1742 }
1743 }
1744 } else {
1745 verified = true;
1746 }
1747
1748 if (!verified) {
1749 err = DB_DICT_CHANGED;
1750 } else if (trx->error_state == DB_SUCCESS) {
1751 goto run_again;
1752 } else {
1753 err = trx->error_state;
1754 }
1755 }
1756
1757 exit_func:
1758 if (UNIV_LIKELY_NULL(heap)) {
1759 mem_heap_free(heap);
1760 }
1761
1762 if (UNIV_UNLIKELY(trx->fake_changes)) {
1763 err = DB_SUCCESS;
1764 }
1765
1766 return(err);
1767 }
1768
1769 /***************************************************************//**
1770 Checks if foreign key constraints fail for an index entry. If index
1771 is not mentioned in any constraint, this function does nothing,
1772 Otherwise does searches to the indexes of referenced tables and
1773 sets shared locks which lock either the success or the failure of
1774 a constraint.
1775 @return DB_SUCCESS or error code */
1776 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1777 dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1778 row_ins_check_foreign_constraints(
1779 /*==============================*/
1780 dict_table_t* table, /*!< in: table */
1781 dict_index_t* index, /*!< in: index */
1782 dtuple_t* entry, /*!< in: index entry for index */
1783 que_thr_t* thr) /*!< in: query thread */
1784 {
1785 dict_foreign_t* foreign;
1786 dberr_t err;
1787 trx_t* trx;
1788 ibool got_s_lock = FALSE;
1789
1790 trx = thr_get_trx(thr);
1791
1792 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1793 "foreign_constraint_check_for_ins");
1794
1795 for (dict_foreign_set::iterator it = table->foreign_set.begin();
1796 it != table->foreign_set.end();
1797 ++it) {
1798
1799 foreign = *it;
1800
1801 if (foreign->foreign_index == index) {
1802 dict_table_t* ref_table = NULL;
1803 dict_table_t* foreign_table = foreign->foreign_table;
1804 dict_table_t* referenced_table
1805 = foreign->referenced_table;
1806
1807 if (referenced_table == NULL) {
1808
1809 ref_table = dict_table_open_on_name(
1810 foreign->referenced_table_name_lookup,
1811 FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1812 }
1813
1814 if (0 == trx->dict_operation_lock_mode) {
1815 got_s_lock = TRUE;
1816
1817 row_mysql_freeze_data_dictionary(trx);
1818 }
1819
1820 if (referenced_table) {
1821 os_inc_counter(dict_sys->mutex,
1822 foreign_table
1823 ->n_foreign_key_checks_running);
1824 }
1825
1826 /* NOTE that if the thread ends up waiting for a lock
1827 we will release dict_operation_lock temporarily!
1828 But the counter on the table protects the referenced
1829 table from being dropped while the check is running. */
1830
1831 err = row_ins_check_foreign_constraint(
1832 TRUE, foreign, table, entry, thr);
1833
1834 DBUG_EXECUTE_IF("row_ins_dict_change_err",
1835 err = DB_DICT_CHANGED;);
1836
1837 if (referenced_table) {
1838 os_dec_counter(dict_sys->mutex,
1839 foreign_table
1840 ->n_foreign_key_checks_running);
1841 }
1842
1843 if (got_s_lock) {
1844 row_mysql_unfreeze_data_dictionary(trx);
1845 }
1846
1847 if (ref_table != NULL) {
1848 dict_table_close(ref_table, FALSE, FALSE);
1849 }
1850
1851 if (err != DB_SUCCESS) {
1852
1853 return(err);
1854 }
1855 }
1856 }
1857
1858 return(DB_SUCCESS);
1859 }
1860
1861 /***************************************************************//**
1862 Checks if a unique key violation to rec would occur at the index entry
1863 insert.
1864 @return TRUE if error */
1865 static
1866 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1867 row_ins_dupl_error_with_rec(
1868 /*========================*/
1869 const rec_t* rec, /*!< in: user record; NOTE that we assume
1870 that the caller already has a record lock on
1871 the record! */
1872 const dtuple_t* entry, /*!< in: entry to insert */
1873 dict_index_t* index, /*!< in: index */
1874 const ulint* offsets)/*!< in: rec_get_offsets(rec, index) */
1875 {
1876 ulint matched_fields;
1877 ulint matched_bytes;
1878 ulint n_unique;
1879 ulint i;
1880
1881 ut_ad(rec_offs_validate(rec, index, offsets));
1882
1883 n_unique = dict_index_get_n_unique(index);
1884
1885 matched_fields = 0;
1886 matched_bytes = 0;
1887
1888 cmp_dtuple_rec_with_match(entry, rec, offsets,
1889 &matched_fields, &matched_bytes);
1890
1891 if (matched_fields < n_unique) {
1892
1893 return(FALSE);
1894 }
1895
1896 /* In a unique secondary index we allow equal key values if they
1897 contain SQL NULLs */
1898
1899 if (!dict_index_is_clust(index)) {
1900
1901 for (i = 0; i < n_unique; i++) {
1902 if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1903
1904 return(FALSE);
1905 }
1906 }
1907 }
1908
1909 return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1910 }
1911
1912 /***************************************************************//**
1913 Scans a unique non-clustered index at a given index entry to determine
1914 whether a uniqueness violation has occurred for the key value of the entry.
1915 Set shared locks on possible duplicate records.
1916 @return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
1917 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1918 dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)1919 row_ins_scan_sec_index_for_duplicate(
1920 /*=================================*/
1921 ulint flags, /*!< in: undo logging and locking flags */
1922 dict_index_t* index, /*!< in: non-clustered unique index */
1923 dtuple_t* entry, /*!< in: index entry */
1924 que_thr_t* thr, /*!< in: query thread */
1925 bool s_latch,/*!< in: whether index->lock is being held */
1926 mtr_t* mtr, /*!< in/out: mini-transaction */
1927 mem_heap_t* offsets_heap)
1928 /*!< in/out: memory heap that can be emptied */
1929 {
1930 ulint n_unique;
1931 int cmp;
1932 ulint n_fields_cmp;
1933 btr_pcur_t pcur;
1934 dberr_t err = DB_SUCCESS;
1935 ulint allow_duplicates;
1936 ulint* offsets = NULL;
1937
1938 #ifdef UNIV_SYNC_DEBUG
1939 ut_ad(s_latch == rw_lock_own(&index->lock, RW_LOCK_SHARED));
1940 #endif /* UNIV_SYNC_DEBUG */
1941
1942 n_unique = dict_index_get_n_unique(index);
1943
1944 /* If the secondary index is unique, but one of the fields in the
1945 n_unique first fields is NULL, a unique key violation cannot occur,
1946 since we define NULL != NULL in this case */
1947
1948 for (ulint i = 0; i < n_unique; i++) {
1949 if (UNIV_SQL_NULL == dfield_get_len(
1950 dtuple_get_nth_field(entry, i))) {
1951
1952 return(DB_SUCCESS);
1953 }
1954 }
1955
1956 /* Store old value on n_fields_cmp */
1957
1958 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1959
1960 dtuple_set_n_fields_cmp(entry, n_unique);
1961
1962 btr_pcur_open(index, entry, PAGE_CUR_GE,
1963 s_latch
1964 ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED
1965 : BTR_SEARCH_LEAF,
1966 &pcur, mtr);
1967
1968 allow_duplicates = thr_get_trx(thr)->duplicates;
1969
1970 /* Scan index records and check if there is a duplicate */
1971
1972 do {
1973 const rec_t* rec = btr_pcur_get_rec(&pcur);
1974 const buf_block_t* block = btr_pcur_get_block(&pcur);
1975 const ulint lock_type = LOCK_ORDINARY;
1976
1977 if (page_rec_is_infimum(rec)) {
1978
1979 continue;
1980 }
1981
1982 offsets = rec_get_offsets(rec, index, offsets,
1983 ULINT_UNDEFINED, &offsets_heap);
1984
1985 if (flags & BTR_NO_LOCKING_FLAG) {
1986 /* Set no locks when applying log
1987 in online table rebuild. */
1988 } else if (allow_duplicates) {
1989
1990 /* If the SQL-query will update or replace
1991 duplicate key we will take X-lock for
1992 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1993 INSERT ON DUPLICATE KEY UPDATE). */
1994
1995 err = row_ins_set_exclusive_rec_lock(
1996 lock_type, block, rec, index, offsets, thr);
1997 } else {
1998
1999 err = row_ins_set_shared_rec_lock(
2000 lock_type, block, rec, index, offsets, thr);
2001 }
2002
2003 switch (err) {
2004 case DB_SUCCESS_LOCKED_REC:
2005 err = DB_SUCCESS;
2006 case DB_SUCCESS:
2007 break;
2008 default:
2009 goto end_scan;
2010 }
2011
2012 if (page_rec_is_supremum(rec)) {
2013
2014 continue;
2015 }
2016
2017 cmp = cmp_dtuple_rec(entry, rec, offsets);
2018
2019 if (cmp == 0) {
2020 if (row_ins_dupl_error_with_rec(rec, entry,
2021 index, offsets)) {
2022 err = DB_DUPLICATE_KEY;
2023
2024 thr_get_trx(thr)->error_info = index;
2025
2026 /* If the duplicate is on hidden FTS_DOC_ID,
2027 state so in the error log */
2028 if (DICT_TF2_FLAG_IS_SET(
2029 index->table,
2030 DICT_TF2_FTS_HAS_DOC_ID)
2031 && strcmp(index->name,
2032 FTS_DOC_ID_INDEX_NAME) == 0) {
2033 ib_logf(IB_LOG_LEVEL_ERROR,
2034 "Duplicate FTS_DOC_ID value"
2035 " on table %s",
2036 index->table->name);
2037 }
2038
2039 goto end_scan;
2040 }
2041 } else {
2042 ut_a(cmp < 0);
2043 goto end_scan;
2044 }
2045 } while (btr_pcur_move_to_next(&pcur, mtr));
2046
2047 end_scan:
2048 /* Restore old value */
2049 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2050
2051 return(err);
2052 }
2053
2054 /** Checks for a duplicate when the table is being rebuilt online.
2055 @retval DB_SUCCESS when no duplicate is detected
2056 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2057 a newer version of entry (the entry should not be inserted)
2058 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2059 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2060 dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,ulint * offsets)2061 row_ins_duplicate_online(
2062 /*=====================*/
2063 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2064 const dtuple_t* entry, /*!< in: entry that is being inserted */
2065 const rec_t* rec, /*!< in: clustered index record */
2066 ulint* offsets)/*!< in/out: rec_get_offsets(rec) */
2067 {
2068 ulint fields = 0;
2069 ulint bytes = 0;
2070
2071 /* During rebuild, there should not be any delete-marked rows
2072 in the new table. */
2073 ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2074 ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2075
2076 /* Compare the PRIMARY KEY fields and the
2077 DB_TRX_ID, DB_ROLL_PTR. */
2078 cmp_dtuple_rec_with_match_low(
2079 entry, rec, offsets, n_uniq + 2, &fields, &bytes);
2080
2081 if (fields < n_uniq) {
2082 /* Not a duplicate. */
2083 return(DB_SUCCESS);
2084 }
2085
2086 if (fields == n_uniq + 2) {
2087 /* rec is an exact match of entry. */
2088 ut_ad(bytes == 0);
2089 return(DB_SUCCESS_LOCKED_REC);
2090 }
2091
2092 return(DB_DUPLICATE_KEY);
2093 }
2094
2095 /** Checks for a duplicate when the table is being rebuilt online.
2096 @retval DB_SUCCESS when no duplicate is detected
2097 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2098 a newer version of entry (the entry should not be inserted)
2099 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2100 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2101 dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2102 row_ins_duplicate_error_in_clust_online(
2103 /*====================================*/
2104 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2105 const dtuple_t* entry, /*!< in: entry that is being inserted */
2106 const btr_cur_t*cursor, /*!< in: cursor on insert position */
2107 ulint** offsets,/*!< in/out: rec_get_offsets(rec) */
2108 mem_heap_t** heap) /*!< in/out: heap for offsets */
2109 {
2110 dberr_t err = DB_SUCCESS;
2111 const rec_t* rec = btr_cur_get_rec(cursor);
2112
2113 if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2114 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2115 ULINT_UNDEFINED, heap);
2116 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2117 if (err != DB_SUCCESS) {
2118 return(err);
2119 }
2120 }
2121
2122 rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2123
2124 if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2125 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2126 ULINT_UNDEFINED, heap);
2127 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2128 }
2129
2130 return(err);
2131 }
2132
2133 /***************************************************************//**
2134 Checks if a unique key violation error would occur at an index entry
2135 insert. Sets shared locks on possible duplicate records. Works only
2136 for a clustered index!
2137 @retval DB_SUCCESS if no error
2138 @retval DB_DUPLICATE_KEY if error,
2139 @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2140 record
2141 @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2142 in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2143 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2144 dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2145 row_ins_duplicate_error_in_clust(
2146 /*=============================*/
2147 ulint flags, /*!< in: undo logging and locking flags */
2148 btr_cur_t* cursor, /*!< in: B-tree cursor */
2149 const dtuple_t* entry, /*!< in: entry to insert */
2150 que_thr_t* thr, /*!< in: query thread */
2151 mtr_t* mtr) /*!< in: mtr */
2152 {
2153 dberr_t err;
2154 rec_t* rec;
2155 ulint n_unique;
2156 trx_t* trx = thr_get_trx(thr);
2157 mem_heap_t*heap = NULL;
2158 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2159 ulint* offsets = offsets_;
2160 rec_offs_init(offsets_);
2161
2162 UT_NOT_USED(mtr);
2163
2164 ut_ad(dict_index_is_clust(cursor->index));
2165
2166 /* NOTE: For unique non-clustered indexes there may be any number
2167 of delete marked records with the same value for the non-clustered
2168 index key (remember multiversioning), and which differ only in
2169 the row refererence part of the index record, containing the
2170 clustered index key fields. For such a secondary index record,
2171 to avoid race condition, we must FIRST do the insertion and after
2172 that check that the uniqueness condition is not breached! */
2173
2174 /* NOTE: A problem is that in the B-tree node pointers on an
2175 upper level may match more to the entry than the actual existing
2176 user records on the leaf level. So, even if low_match would suggest
2177 that a duplicate key violation may occur, this may not be the case. */
2178
2179 n_unique = dict_index_get_n_unique(cursor->index);
2180
2181 if (cursor->low_match >= n_unique) {
2182
2183 rec = btr_cur_get_rec(cursor);
2184
2185 if (!page_rec_is_infimum(rec)) {
2186 offsets = rec_get_offsets(rec, cursor->index, offsets,
2187 ULINT_UNDEFINED, &heap);
2188
2189 /* We set a lock on the possible duplicate: this
2190 is needed in logical logging of MySQL to make
2191 sure that in roll-forward we get the same duplicate
2192 errors as in original execution */
2193
2194 if (trx->duplicates) {
2195
2196 /* If the SQL-query will update or replace
2197 duplicate key we will take X-lock for
2198 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2199 INSERT ON DUPLICATE KEY UPDATE). */
2200
2201 err = row_ins_set_exclusive_rec_lock(
2202 LOCK_REC_NOT_GAP,
2203 btr_cur_get_block(cursor),
2204 rec, cursor->index, offsets, thr);
2205 } else {
2206
2207 err = row_ins_set_shared_rec_lock(
2208 LOCK_REC_NOT_GAP,
2209 btr_cur_get_block(cursor), rec,
2210 cursor->index, offsets, thr);
2211 }
2212
2213 switch (err) {
2214 case DB_SUCCESS_LOCKED_REC:
2215 case DB_SUCCESS:
2216 break;
2217 default:
2218 goto func_exit;
2219 }
2220
2221 if (row_ins_dupl_error_with_rec(
2222 rec, entry, cursor->index, offsets)) {
2223 duplicate:
2224 trx->error_info = cursor->index;
2225 err = DB_DUPLICATE_KEY;
2226 goto func_exit;
2227 }
2228 }
2229 }
2230
2231 if (cursor->up_match >= n_unique) {
2232
2233 rec = page_rec_get_next(btr_cur_get_rec(cursor));
2234
2235 if (!page_rec_is_supremum(rec)) {
2236 offsets = rec_get_offsets(rec, cursor->index, offsets,
2237 ULINT_UNDEFINED, &heap);
2238
2239 if (trx->duplicates) {
2240
2241 /* If the SQL-query will update or replace
2242 duplicate key we will take X-lock for
2243 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2244 INSERT ON DUPLICATE KEY UPDATE). */
2245
2246 err = row_ins_set_exclusive_rec_lock(
2247 LOCK_REC_NOT_GAP,
2248 btr_cur_get_block(cursor),
2249 rec, cursor->index, offsets, thr);
2250 } else {
2251
2252 err = row_ins_set_shared_rec_lock(
2253 LOCK_REC_NOT_GAP,
2254 btr_cur_get_block(cursor),
2255 rec, cursor->index, offsets, thr);
2256 }
2257
2258 switch (err) {
2259 case DB_SUCCESS_LOCKED_REC:
2260 case DB_SUCCESS:
2261 break;
2262 default:
2263 goto func_exit;
2264 }
2265
2266 if (row_ins_dupl_error_with_rec(
2267 rec, entry, cursor->index, offsets)) {
2268 goto duplicate;
2269 }
2270 }
2271
2272 /* This should never happen */
2273 ut_error;
2274 }
2275
2276 err = DB_SUCCESS;
2277 func_exit:
2278 if (UNIV_LIKELY_NULL(heap)) {
2279 mem_heap_free(heap);
2280 }
2281 return(err);
2282 }
2283
2284 /***************************************************************//**
2285 Checks if an index entry has long enough common prefix with an
2286 existing record so that the intended insert of the entry must be
2287 changed to a modify of the existing record. In the case of a clustered
2288 index, the prefix must be n_unique fields long. In the case of a
2289 secondary index, all fields must be equal. InnoDB never updates
2290 secondary index records in place, other than clearing or setting the
2291 delete-mark flag. We could be able to update the non-unique fields
2292 of a unique secondary index record by checking the cursor->up_match,
2293 but we do not do so, because it could have some locking implications.
2294 @return TRUE if the existing record should be updated; FALSE if not */
2295 UNIV_INLINE
2296 ibool
row_ins_must_modify_rec(const btr_cur_t * cursor)2297 row_ins_must_modify_rec(
2298 /*====================*/
2299 const btr_cur_t* cursor) /*!< in: B-tree cursor */
2300 {
2301 /* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2302 Because node pointers on upper levels of the B-tree may match more
2303 to entry than to actual user records on the leaf level, we
2304 have to check if the candidate record is actually a user record.
2305 A clustered index node pointer contains index->n_unique first fields,
2306 and a secondary index node pointer contains all index fields. */
2307
2308 return(cursor->low_match
2309 >= dict_index_get_n_unique_in_tree(cursor->index)
2310 && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2311 }
2312
2313 /***************************************************************//**
2314 Tries to insert an entry into a clustered index, ignoring foreign key
2315 constraints. If a record with the same unique key is found, the other
2316 record is necessarily marked deleted by a committed transaction, or a
2317 unique key violation error occurs. The delete marked record is then
2318 updated to an existing record, and we must write an undo log record on
2319 the delete marked record.
2320 @retval DB_SUCCESS on success
2321 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2322 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2323 @return error code */
2324 UNIV_INTERN
2325 dberr_t
row_ins_clust_index_entry_low(ulint flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,ulint n_ext,que_thr_t * thr)2326 row_ins_clust_index_entry_low(
2327 /*==========================*/
2328 ulint flags, /*!< in: undo logging and locking flags */
2329 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2330 depending on whether we wish optimistic or
2331 pessimistic descent down the index tree */
2332 dict_index_t* index, /*!< in: clustered index */
2333 ulint n_uniq, /*!< in: 0 or index->n_uniq */
2334 dtuple_t* entry, /*!< in/out: index entry to insert */
2335 ulint n_ext, /*!< in: number of externally stored columns */
2336 que_thr_t* thr) /*!< in: query thread */
2337 {
2338 btr_cur_t cursor;
2339 ulint* offsets = NULL;
2340 dberr_t err;
2341 big_rec_t* big_rec = NULL;
2342 mtr_t mtr;
2343 mem_heap_t* offsets_heap = NULL;
2344 ulint search_mode;
2345
2346 ut_ad(dict_index_is_clust(index));
2347 ut_ad(!dict_index_is_unique(index)
2348 || n_uniq == dict_index_get_n_unique(index));
2349 ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2350
2351 /* If running with fake_changes mode on then switch from modify to
2352 search so that code takes only s-latch and not x-latch.
2353 For dry-run (fake-changes) s-latch is acceptable. Taking x-latch will
2354 make it more restrictive and will block real changes/workflow. */
2355 if (UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2356 search_mode = (mode & BTR_MODIFY_TREE)
2357 ? BTR_SEARCH_TREE : BTR_SEARCH_LEAF;
2358 } else {
2359 search_mode = mode;
2360 }
2361
2362 mtr_start(&mtr);
2363
2364 if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2365
2366 /* We really don't need to OR mode but will leave it for
2367 code consistency. */
2368 mode |= BTR_ALREADY_S_LATCHED;
2369 search_mode |= BTR_ALREADY_S_LATCHED;
2370
2371 mtr_s_lock(dict_index_get_lock(index), &mtr);
2372 }
2373
2374 cursor.thr = thr;
2375
2376 /* Note that we use PAGE_CUR_LE as the search mode, because then
2377 the function will return in both low_match and up_match of the
2378 cursor sensible values */
2379
2380 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, search_mode,
2381 &cursor, 0, __FILE__, __LINE__, &mtr);
2382
2383 #ifdef UNIV_DEBUG
2384 {
2385 page_t* page = btr_cur_get_page(&cursor);
2386 rec_t* first_rec = page_rec_get_next(
2387 page_get_infimum_rec(page));
2388
2389 ut_ad(page_rec_is_supremum(first_rec)
2390 || rec_get_n_fields(first_rec, index)
2391 == dtuple_get_n_fields(entry));
2392 }
2393 #endif
2394
2395 if (n_uniq && (cursor.up_match >= n_uniq
2396 || cursor.low_match >= n_uniq)) {
2397
2398 if (flags
2399 == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2400 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
2401 /* Set no locks when applying log
2402 in online table rebuild. Only check for duplicates. */
2403 err = row_ins_duplicate_error_in_clust_online(
2404 n_uniq, entry, &cursor,
2405 &offsets, &offsets_heap);
2406
2407 switch (err) {
2408 case DB_SUCCESS:
2409 break;
2410 default:
2411 ut_ad(0);
2412 /* fall through */
2413 case DB_SUCCESS_LOCKED_REC:
2414 case DB_DUPLICATE_KEY:
2415 thr_get_trx(thr)->error_info = cursor.index;
2416 }
2417 } else {
2418 /* Note that the following may return also
2419 DB_LOCK_WAIT */
2420
2421 err = row_ins_duplicate_error_in_clust(
2422 flags, &cursor, entry, thr, &mtr);
2423 }
2424
2425 if (err != DB_SUCCESS) {
2426 err_exit:
2427 mtr_commit(&mtr);
2428 goto func_exit;
2429 }
2430 }
2431
2432 if (row_ins_must_modify_rec(&cursor)) {
2433 /* There is already an index entry with a long enough common
2434 prefix, we must convert the insert into a modify of an
2435 existing record */
2436 mem_heap_t* entry_heap = mem_heap_create(1024);
2437
2438 err = row_ins_clust_index_entry_by_modify(
2439 flags, mode, &cursor, &offsets, &offsets_heap,
2440 entry_heap, &big_rec, entry, thr, &mtr);
2441
2442 rec_t* rec = btr_cur_get_rec(&cursor);
2443
2444 if (big_rec && UNIV_LIKELY(!thr_get_trx(thr)->fake_changes)) {
2445 ut_a(err == DB_SUCCESS);
2446 /* Write out the externally stored
2447 columns while still x-latching
2448 index->lock and block->lock. Allocate
2449 pages for big_rec in the mtr that
2450 modified the B-tree, but be sure to skip
2451 any pages that were freed in mtr. We will
2452 write out the big_rec pages before
2453 committing the B-tree mini-transaction. If
2454 the system crashes so that crash recovery
2455 will not replay the mtr_commit(&mtr), the
2456 big_rec pages will be left orphaned until
2457 the pages are allocated for something else.
2458
2459 TODO: If the allocation extends the
2460 tablespace, it will not be redo
2461 logged, in either mini-transaction.
2462 Tablespace extension should be
2463 redo-logged in the big_rec
2464 mini-transaction, so that recovery
2465 will not fail when the big_rec was
2466 written to the extended portion of the
2467 file, in case the file was somehow
2468 truncated in the crash. */
2469
2470 DEBUG_SYNC_C_IF_THD(
2471 thr_get_trx(thr)->mysql_thd,
2472 "before_row_ins_upd_extern");
2473 err = btr_store_big_rec_extern_fields(
2474 index, btr_cur_get_block(&cursor),
2475 rec, offsets, big_rec, &mtr,
2476 BTR_STORE_INSERT_UPDATE);
2477 DEBUG_SYNC_C_IF_THD(
2478 thr_get_trx(thr)->mysql_thd,
2479 "after_row_ins_upd_extern");
2480 /* If writing big_rec fails (for
2481 example, because of DB_OUT_OF_FILE_SPACE),
2482 the record will be corrupted. Even if
2483 we did not update any externally
2484 stored columns, our update could cause
2485 the record to grow so that a
2486 non-updated column was selected for
2487 external storage. This non-update
2488 would not have been written to the
2489 undo log, and thus the record cannot
2490 be rolled back.
2491
2492 However, because we have not executed
2493 mtr_commit(mtr) yet, the update will
2494 not be replayed in crash recovery, and
2495 the following assertion failure will
2496 effectively "roll back" the operation. */
2497 ut_a(err == DB_SUCCESS);
2498 dtuple_big_rec_free(big_rec);
2499 } else if (big_rec != NULL
2500 && UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2501 dtuple_big_rec_free(big_rec);
2502 }
2503
2504 if (err == DB_SUCCESS
2505 && dict_index_is_online_ddl(index)
2506 && UNIV_LIKELY(!thr_get_trx(thr)->fake_changes)) {
2507 row_log_table_insert(rec, index, offsets);
2508 }
2509
2510 mtr_commit(&mtr);
2511 mem_heap_free(entry_heap);
2512 } else {
2513 rec_t* insert_rec;
2514
2515 if (mode != BTR_MODIFY_TREE) {
2516 ut_ad(((mode & ~BTR_ALREADY_S_LATCHED)
2517 == BTR_MODIFY_LEAF)
2518 || thr_get_trx(thr)->fake_changes);
2519 err = btr_cur_optimistic_insert(
2520 flags, &cursor, &offsets, &offsets_heap,
2521 entry, &insert_rec, &big_rec,
2522 n_ext, thr, &mtr);
2523 } else {
2524 if (buf_LRU_buf_pool_running_out()) {
2525
2526 err = DB_LOCK_TABLE_FULL;
2527 goto err_exit;
2528 }
2529
2530 err = btr_cur_optimistic_insert(
2531 flags, &cursor,
2532 &offsets, &offsets_heap,
2533 entry, &insert_rec, &big_rec,
2534 n_ext, thr, &mtr);
2535
2536 if (err == DB_FAIL) {
2537 err = btr_cur_pessimistic_insert(
2538 flags, &cursor,
2539 &offsets, &offsets_heap,
2540 entry, &insert_rec, &big_rec,
2541 n_ext, thr, &mtr);
2542 }
2543 }
2544
2545 if (UNIV_LIKELY_NULL(big_rec)) {
2546 mtr_commit(&mtr);
2547
2548 if (UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2549
2550 dtuple_convert_back_big_rec(
2551 index, entry, big_rec);
2552 goto func_exit;
2553 }
2554
2555 /* Online table rebuild could read (and
2556 ignore) the incomplete record at this point.
2557 If online rebuild is in progress, the
2558 row_ins_index_entry_big_rec() will write log. */
2559
2560 DBUG_EXECUTE_IF(
2561 "row_ins_extern_checkpoint",
2562 log_make_checkpoint_at(
2563 LSN_MAX, TRUE););
2564 err = row_ins_index_entry_big_rec(
2565 entry, big_rec, offsets, &offsets_heap, index,
2566 thr_get_trx(thr)->mysql_thd,
2567 __FILE__, __LINE__);
2568 dtuple_convert_back_big_rec(index, entry, big_rec);
2569 } else {
2570 if (err == DB_SUCCESS
2571 && dict_index_is_online_ddl(index)
2572 && !UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2573 row_log_table_insert(
2574 insert_rec, index, offsets);
2575 }
2576
2577 mtr_commit(&mtr);
2578 }
2579 }
2580
2581 func_exit:
2582 if (offsets_heap) {
2583 mem_heap_free(offsets_heap);
2584 }
2585
2586 return(err);
2587 }
2588
2589 /***************************************************************//**
2590 Starts a mini-transaction and checks if the index will be dropped.
2591 @return true if the index is to be dropped */
2592 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2593 bool
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2594 row_ins_sec_mtr_start_and_check_if_aborted(
2595 /*=======================================*/
2596 mtr_t* mtr, /*!< out: mini-transaction */
2597 dict_index_t* index, /*!< in/out: secondary index */
2598 bool check, /*!< in: whether to check */
2599 ulint search_mode)
2600 /*!< in: flags */
2601 {
2602 ut_ad(!dict_index_is_clust(index));
2603
2604 mtr_start(mtr);
2605
2606 if (!check) {
2607 return(false);
2608 }
2609
2610 if (search_mode & BTR_ALREADY_S_LATCHED) {
2611 mtr_s_lock(dict_index_get_lock(index), mtr);
2612 } else {
2613 mtr_x_lock(dict_index_get_lock(index), mtr);
2614 }
2615
2616 switch (index->online_status) {
2617 case ONLINE_INDEX_ABORTED:
2618 case ONLINE_INDEX_ABORTED_DROPPED:
2619 ut_ad(*index->name == TEMP_INDEX_PREFIX);
2620 return(true);
2621 case ONLINE_INDEX_COMPLETE:
2622 return(false);
2623 case ONLINE_INDEX_CREATION:
2624 break;
2625 }
2626
2627 ut_error;
2628 return(true);
2629 }
2630
2631 /***************************************************************//**
2632 Tries to insert an entry into a secondary index. If a record with exactly the
2633 same fields is found, the other record is necessarily marked deleted.
2634 It is then unmarked. Otherwise, the entry is just inserted to the index.
2635 @retval DB_SUCCESS on success
2636 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2637 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2638 @return error code */
2639 UNIV_INTERN
2640 dberr_t
row_ins_sec_index_entry_low(ulint flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr)2641 row_ins_sec_index_entry_low(
2642 /*========================*/
2643 ulint flags, /*!< in: undo logging and locking flags */
2644 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2645 depending on whether we wish optimistic or
2646 pessimistic descent down the index tree */
2647 dict_index_t* index, /*!< in: secondary index */
2648 mem_heap_t* offsets_heap,
2649 /*!< in/out: memory heap that can be emptied */
2650 mem_heap_t* heap, /*!< in/out: memory heap */
2651 dtuple_t* entry, /*!< in/out: index entry to insert */
2652 trx_id_t trx_id, /*!< in: PAGE_MAX_TRX_ID during
2653 row_log_table_apply(), or 0 */
2654 que_thr_t* thr) /*!< in: query thread */
2655 {
2656 btr_cur_t cursor;
2657 ulint search_mode;
2658 dberr_t err = DB_SUCCESS;
2659 ulint n_unique;
2660 mtr_t mtr;
2661 ulint* offsets = NULL;
2662
2663 ut_ad(!dict_index_is_clust(index));
2664 ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2665
2666 cursor.thr = thr;
2667 ut_ad(thr_get_trx(thr)->id);
2668 mtr_start(&mtr);
2669
2670 /* If running with fake_changes mode on then avoid using insert buffer
2671 and also switch from modify to search so that code takes only s-latch
2672 and not x-latch. For dry-run (fake-changes) s-latch is acceptable.
2673 Taking x-latch will make it more restrictive and will block real
2674 changes/workflow. */
2675 if (UNIV_UNLIKELY(thr_get_trx(thr)->fake_changes)) {
2676 search_mode = (mode & BTR_MODIFY_TREE)
2677 ? BTR_SEARCH_TREE : BTR_SEARCH_LEAF;
2678 } else {
2679 search_mode = mode | BTR_INSERT;
2680 }
2681
2682 /* Ensure that we acquire index->lock when inserting into an
2683 index with index->online_status == ONLINE_INDEX_COMPLETE, but
2684 could still be subject to rollback_inplace_alter_table().
2685 This prevents a concurrent change of index->online_status.
2686 The memory object cannot be freed as long as we have an open
2687 reference to the table, or index->table->n_ref_count > 0. */
2688 const bool check = *index->name == TEMP_INDEX_PREFIX;
2689
2690 if (check) {
2691
2692 DEBUG_SYNC_C("row_ins_sec_index_enter");
2693
2694 /* mode = MODIFY_LEAF is synonymous to search_mode = SEARCH_LEAF
2695 search_mode = SEARCH_TREE suggest operation in fake_change mode
2696 so continue to s-latch in this mode too. */
2697
2698 if (mode == BTR_MODIFY_LEAF || search_mode == BTR_SEARCH_TREE) {
2699
2700 ut_ad((search_mode == BTR_SEARCH_TREE
2701 && thr_get_trx(thr)->fake_changes)
2702 || mode == BTR_MODIFY_LEAF);
2703
2704 search_mode |= BTR_ALREADY_S_LATCHED;
2705 mtr_s_lock(dict_index_get_lock(index), &mtr);
2706
2707 } else {
2708 mtr_x_lock(dict_index_get_lock(index), &mtr);
2709 }
2710
2711 if (row_log_online_op_try(
2712 index, entry, thr_get_trx(thr)->id)) {
2713 goto func_exit;
2714 }
2715 }
2716
2717 if (!thr_get_trx(thr)->check_unique_secondary) {
2718 search_mode |= BTR_IGNORE_SEC_UNIQUE;
2719 }
2720
2721 /* Note that we use PAGE_CUR_LE as the search mode, because then
2722 the function will return in both low_match and up_match of the
2723 cursor sensible values */
2724 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2725 search_mode,
2726 &cursor, 0, __FILE__, __LINE__, &mtr);
2727
2728 if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2729 /* The insert was buffered during the search: we are done */
2730 goto func_exit;
2731 }
2732
2733 #ifdef UNIV_DEBUG
2734 {
2735 page_t* page = btr_cur_get_page(&cursor);
2736 rec_t* first_rec = page_rec_get_next(
2737 page_get_infimum_rec(page));
2738
2739 ut_ad(page_rec_is_supremum(first_rec)
2740 || rec_get_n_fields(first_rec, index)
2741 == dtuple_get_n_fields(entry));
2742 }
2743 #endif
2744
2745 n_unique = dict_index_get_n_unique(index);
2746
2747 if (dict_index_is_unique(index)
2748 && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
2749 mtr_commit(&mtr);
2750
2751 DEBUG_SYNC_C("row_ins_sec_index_unique");
2752
2753 if (row_ins_sec_mtr_start_and_check_if_aborted(
2754 &mtr, index, check, search_mode)) {
2755 goto func_exit;
2756 }
2757
2758 err = row_ins_scan_sec_index_for_duplicate(
2759 flags, index, entry, thr, check, &mtr, offsets_heap);
2760
2761 mtr_commit(&mtr);
2762
2763 switch (err) {
2764 case DB_SUCCESS:
2765 break;
2766 case DB_DUPLICATE_KEY:
2767 if (*index->name == TEMP_INDEX_PREFIX) {
2768 ut_ad(!thr_get_trx(thr)
2769 ->dict_operation_lock_mode);
2770 mutex_enter(&dict_sys->mutex);
2771 dict_set_corrupted_index_cache_only(
2772 index, index->table);
2773 mutex_exit(&dict_sys->mutex);
2774 /* Do not return any error to the
2775 caller. The duplicate will be reported
2776 by ALTER TABLE or CREATE UNIQUE INDEX.
2777 Unfortunately we cannot report the
2778 duplicate key value to the DDL thread,
2779 because the altered_table object is
2780 private to its call stack. */
2781 err = DB_SUCCESS;
2782 }
2783 /* fall through */
2784 default:
2785 return(err);
2786 }
2787
2788 if (row_ins_sec_mtr_start_and_check_if_aborted(
2789 &mtr, index, check, search_mode)) {
2790 goto func_exit;
2791 }
2792
2793 DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
2794
2795 /* We did not find a duplicate and we have now
2796 locked with s-locks the necessary records to
2797 prevent any insertion of a duplicate by another
2798 transaction. Let us now reposition the cursor and
2799 continue the insertion. */
2800
2801 btr_cur_search_to_nth_level(
2802 index, 0, entry, PAGE_CUR_LE,
2803 search_mode & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE),
2804 &cursor, 0, __FILE__, __LINE__, &mtr);
2805 }
2806
2807 if (row_ins_must_modify_rec(&cursor)) {
2808 /* There is already an index entry with a long enough common
2809 prefix, we must convert the insert into a modify of an
2810 existing record */
2811 offsets = rec_get_offsets(
2812 btr_cur_get_rec(&cursor), index, offsets,
2813 ULINT_UNDEFINED, &offsets_heap);
2814
2815 err = row_ins_sec_index_entry_by_modify(
2816 flags, mode, &cursor, &offsets,
2817 offsets_heap, heap, entry, thr, &mtr);
2818 } else {
2819 rec_t* insert_rec;
2820 big_rec_t* big_rec;
2821
2822 if (mode == BTR_MODIFY_LEAF) {
2823 err = btr_cur_optimistic_insert(
2824 flags, &cursor, &offsets, &offsets_heap,
2825 entry, &insert_rec,
2826 &big_rec, 0, thr, &mtr);
2827 } else {
2828 ut_ad(mode == BTR_MODIFY_TREE);
2829 if (buf_LRU_buf_pool_running_out()) {
2830
2831 err = DB_LOCK_TABLE_FULL;
2832 goto func_exit;
2833 }
2834
2835 err = btr_cur_optimistic_insert(
2836 flags, &cursor,
2837 &offsets, &offsets_heap,
2838 entry, &insert_rec,
2839 &big_rec, 0, thr, &mtr);
2840 if (err == DB_FAIL) {
2841 err = btr_cur_pessimistic_insert(
2842 flags, &cursor,
2843 &offsets, &offsets_heap,
2844 entry, &insert_rec,
2845 &big_rec, 0, thr, &mtr);
2846 }
2847 }
2848
2849 if (err == DB_SUCCESS && trx_id) {
2850 page_update_max_trx_id(
2851 btr_cur_get_block(&cursor),
2852 btr_cur_get_page_zip(&cursor),
2853 trx_id, &mtr);
2854 }
2855
2856 ut_ad(!big_rec);
2857 }
2858
2859 func_exit:
2860 mtr_commit(&mtr);
2861 return(err);
2862 }
2863
2864 /***************************************************************//**
2865 Tries to insert the externally stored fields (off-page columns)
2866 of a clustered index entry.
2867 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
2868 UNIV_INTERN
2869 dberr_t
row_ins_index_entry_big_rec_func(const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,dict_index_t * index,const char * file,const void * thd,ulint line)2870 row_ins_index_entry_big_rec_func(
2871 /*=============================*/
2872 const dtuple_t* entry, /*!< in/out: index entry to insert */
2873 const big_rec_t* big_rec,/*!< in: externally stored fields */
2874 ulint* offsets,/*!< in/out: rec offsets */
2875 mem_heap_t** heap, /*!< in/out: memory heap */
2876 dict_index_t* index, /*!< in: index */
2877 const char* file, /*!< in: file name of caller */
2878 #ifndef DBUG_OFF
2879 const void* thd, /*!< in: connection, or NULL */
2880 #endif /* DBUG_OFF */
2881 ulint line) /*!< in: line number of caller */
2882 {
2883 mtr_t mtr;
2884 btr_cur_t cursor;
2885 rec_t* rec;
2886 dberr_t error;
2887
2888 ut_ad(dict_index_is_clust(index));
2889
2890 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
2891
2892 mtr_start(&mtr);
2893 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2894 BTR_MODIFY_TREE, &cursor, 0,
2895 file, line, &mtr);
2896 rec = btr_cur_get_rec(&cursor);
2897 offsets = rec_get_offsets(rec, index, offsets,
2898 ULINT_UNDEFINED, heap);
2899
2900 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
2901 error = btr_store_big_rec_extern_fields(
2902 index, btr_cur_get_block(&cursor),
2903 rec, offsets, big_rec, &mtr, BTR_STORE_INSERT);
2904 DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
2905
2906 if (error == DB_SUCCESS
2907 && dict_index_is_online_ddl(index)) {
2908 row_log_table_insert(rec, index, offsets);
2909 }
2910
2911 mtr_commit(&mtr);
2912
2913 return(error);
2914 }
2915
2916 /***************************************************************//**
2917 Inserts an entry into a clustered index. Tries first optimistic,
2918 then pessimistic descent down the tree. If the entry matches enough
2919 to a delete marked record, performs the insert by updating or delete
2920 unmarking the delete marked record.
2921 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2922 UNIV_INTERN
2923 dberr_t
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,ulint n_ext)2924 row_ins_clust_index_entry(
2925 /*======================*/
2926 dict_index_t* index, /*!< in: clustered index */
2927 dtuple_t* entry, /*!< in/out: index entry to insert */
2928 que_thr_t* thr, /*!< in: query thread */
2929 ulint n_ext) /*!< in: number of externally stored columns */
2930 {
2931 dberr_t err;
2932 ulint n_uniq;
2933
2934 if (!index->table->foreign_set.empty()) {
2935 err = row_ins_check_foreign_constraints(
2936 index->table, index, entry, thr);
2937 if (err != DB_SUCCESS) {
2938
2939 return(err);
2940 }
2941 }
2942
2943 n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
2944
2945 /* Try first optimistic descent to the B-tree */
2946
2947 log_free_check();
2948
2949 err = row_ins_clust_index_entry_low(
2950 0, BTR_MODIFY_LEAF, index, n_uniq, entry, n_ext, thr);
2951
2952 #ifdef UNIV_DEBUG
2953 /* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
2954 Once it is fixed, remove the 'ifdef', 'if' and this comment. */
2955 if (!thr_get_trx(thr)->ddl) {
2956 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
2957 "after_row_ins_clust_index_entry_leaf");
2958 }
2959 #endif /* UNIV_DEBUG */
2960
2961 if (err != DB_FAIL) {
2962 DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
2963 return(err);
2964 }
2965
2966 /* Try then pessimistic descent to the B-tree */
2967
2968 log_free_check();
2969
2970 return(row_ins_clust_index_entry_low(
2971 0, BTR_MODIFY_TREE, index, n_uniq, entry, n_ext, thr));
2972 }
2973
2974 /***************************************************************//**
2975 Inserts an entry into a secondary index. Tries first optimistic,
2976 then pessimistic descent down the tree. If the entry matches enough
2977 to a delete marked record, performs the insert by updating or delete
2978 unmarking the delete marked record.
2979 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2980 UNIV_INTERN
2981 dberr_t
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)2982 row_ins_sec_index_entry(
2983 /*====================*/
2984 dict_index_t* index, /*!< in: secondary index */
2985 dtuple_t* entry, /*!< in/out: index entry to insert */
2986 que_thr_t* thr) /*!< in: query thread */
2987 {
2988 dberr_t err;
2989 mem_heap_t* offsets_heap;
2990 mem_heap_t* heap;
2991
2992 if (!index->table->foreign_set.empty()) {
2993 err = row_ins_check_foreign_constraints(index->table, index,
2994 entry, thr);
2995 if (err != DB_SUCCESS) {
2996
2997 return(err);
2998 }
2999 }
3000
3001 ut_ad(thr_get_trx(thr)->id);
3002
3003 offsets_heap = mem_heap_create(1024);
3004 heap = mem_heap_create(1024);
3005
3006 /* Try first optimistic descent to the B-tree */
3007
3008 log_free_check();
3009
3010 err = row_ins_sec_index_entry_low(
3011 0, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry, 0, thr);
3012 if (err == DB_FAIL) {
3013 mem_heap_empty(heap);
3014
3015 /* Try then pessimistic descent to the B-tree */
3016
3017 log_free_check();
3018
3019 err = row_ins_sec_index_entry_low(
3020 0, BTR_MODIFY_TREE, index,
3021 offsets_heap, heap, entry, 0, thr);
3022 }
3023
3024 mem_heap_free(heap);
3025 mem_heap_free(offsets_heap);
3026 return(err);
3027 }
3028
3029 /***************************************************************//**
3030 Inserts an index entry to index. Tries first optimistic, then pessimistic
3031 descent down the tree. If the entry matches enough to a delete marked record,
3032 performs the insert by updating or delete unmarking the delete marked
3033 record.
3034 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3035 static
3036 dberr_t
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)3037 row_ins_index_entry(
3038 /*================*/
3039 dict_index_t* index, /*!< in: index */
3040 dtuple_t* entry, /*!< in/out: index entry to insert */
3041 que_thr_t* thr) /*!< in: query thread */
3042 {
3043 DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
3044 DBUG_SET("-d,row_ins_index_entry_timeout");
3045 return(DB_LOCK_WAIT);});
3046
3047 if (dict_index_is_clust(index)) {
3048 return(row_ins_clust_index_entry(index, entry, thr, 0));
3049 } else {
3050 return(row_ins_sec_index_entry(index, entry, thr));
3051 }
3052 }
3053
3054 /***********************************************************//**
3055 Sets the values of the dtuple fields in entry from the values of appropriate
3056 columns in row. */
3057 static MY_ATTRIBUTE((nonnull))
3058 void
row_ins_index_entry_set_vals(dict_index_t * index,dtuple_t * entry,const dtuple_t * row)3059 row_ins_index_entry_set_vals(
3060 /*=========================*/
3061 dict_index_t* index, /*!< in: index */
3062 dtuple_t* entry, /*!< in: index entry to make */
3063 const dtuple_t* row) /*!< in: row */
3064 {
3065 ulint n_fields;
3066 ulint i;
3067
3068 n_fields = dtuple_get_n_fields(entry);
3069
3070 for (i = 0; i < n_fields; i++) {
3071 dict_field_t* ind_field;
3072 dfield_t* field;
3073 const dfield_t* row_field;
3074 ulint len;
3075
3076 field = dtuple_get_nth_field(entry, i);
3077 ind_field = dict_index_get_nth_field(index, i);
3078 row_field = dtuple_get_nth_field(row, ind_field->col->ind);
3079 len = dfield_get_len(row_field);
3080
3081 /* Check column prefix indexes */
3082 if (ind_field->prefix_len > 0
3083 && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3084
3085 const dict_col_t* col
3086 = dict_field_get_col(ind_field);
3087
3088 len = dtype_get_at_most_n_mbchars(
3089 col->prtype, col->mbminmaxlen,
3090 ind_field->prefix_len,
3091 len,
3092 static_cast<const char*>(
3093 dfield_get_data(row_field)));
3094
3095 ut_ad(!dfield_is_ext(row_field));
3096 }
3097
3098 dfield_set_data(field, dfield_get_data(row_field), len);
3099 if (dfield_is_ext(row_field)) {
3100 ut_ad(dict_index_is_clust(index));
3101 dfield_set_ext(field);
3102 }
3103 }
3104 }
3105
3106 /***********************************************************//**
3107 Inserts a single index entry to the table.
3108 @return DB_SUCCESS if operation successfully completed, else error
3109 code or DB_LOCK_WAIT */
3110 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3111 dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3112 row_ins_index_entry_step(
3113 /*=====================*/
3114 ins_node_t* node, /*!< in: row insert node */
3115 que_thr_t* thr) /*!< in: query thread */
3116 {
3117 dberr_t err;
3118
3119 ut_ad(dtuple_check_typed(node->row));
3120
3121 row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3122
3123 ut_ad(dtuple_check_typed(node->entry));
3124
3125 err = row_ins_index_entry(node->index, node->entry, thr);
3126
3127 #ifdef UNIV_DEBUG
3128 /* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
3129 Once it is fixed, remove the 'ifdef', 'if' and this comment. */
3130 if (!thr_get_trx(thr)->ddl) {
3131 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3132 "after_row_ins_index_entry_step");
3133 }
3134 #endif /* UNIV_DEBUG */
3135
3136 return(err);
3137 }
3138
3139 /***********************************************************//**
3140 Allocates a row id for row and inits the node->index field. */
3141 UNIV_INLINE
3142 void
row_ins_alloc_row_id_step(ins_node_t * node)3143 row_ins_alloc_row_id_step(
3144 /*======================*/
3145 ins_node_t* node) /*!< in: row insert node */
3146 {
3147 row_id_t row_id;
3148
3149 ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3150
3151 if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3152
3153 /* No row id is stored if the clustered index is unique */
3154
3155 return;
3156 }
3157
3158 /* Fill in row id value to row */
3159
3160 row_id = dict_sys_get_new_row_id();
3161
3162 dict_sys_write_row_id(node->row_id_buf, row_id);
3163 }
3164
3165 /***********************************************************//**
3166 Gets a row to insert from the values list. */
3167 UNIV_INLINE
3168 void
row_ins_get_row_from_values(ins_node_t * node)3169 row_ins_get_row_from_values(
3170 /*========================*/
3171 ins_node_t* node) /*!< in: row insert node */
3172 {
3173 que_node_t* list_node;
3174 dfield_t* dfield;
3175 dtuple_t* row;
3176 ulint i;
3177
3178 /* The field values are copied in the buffers of the select node and
3179 it is safe to use them until we fetch from select again: therefore
3180 we can just copy the pointers */
3181
3182 row = node->row;
3183
3184 i = 0;
3185 list_node = node->values_list;
3186
3187 while (list_node) {
3188 eval_exp(list_node);
3189
3190 dfield = dtuple_get_nth_field(row, i);
3191 dfield_copy_data(dfield, que_node_get_val(list_node));
3192
3193 i++;
3194 list_node = que_node_get_next(list_node);
3195 }
3196 }
3197
3198 /***********************************************************//**
3199 Gets a row to insert from the select list. */
3200 UNIV_INLINE
3201 void
row_ins_get_row_from_select(ins_node_t * node)3202 row_ins_get_row_from_select(
3203 /*========================*/
3204 ins_node_t* node) /*!< in: row insert node */
3205 {
3206 que_node_t* list_node;
3207 dfield_t* dfield;
3208 dtuple_t* row;
3209 ulint i;
3210
3211 /* The field values are copied in the buffers of the select node and
3212 it is safe to use them until we fetch from select again: therefore
3213 we can just copy the pointers */
3214
3215 row = node->row;
3216
3217 i = 0;
3218 list_node = node->select->select_list;
3219
3220 while (list_node) {
3221 dfield = dtuple_get_nth_field(row, i);
3222 dfield_copy_data(dfield, que_node_get_val(list_node));
3223
3224 i++;
3225 list_node = que_node_get_next(list_node);
3226 }
3227 }
3228
3229 /***********************************************************//**
3230 Inserts a row to a table.
3231 @return DB_SUCCESS if operation successfully completed, else error
3232 code or DB_LOCK_WAIT */
3233 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3234 dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3235 row_ins(
3236 /*====*/
3237 ins_node_t* node, /*!< in: row insert node */
3238 que_thr_t* thr) /*!< in: query thread */
3239 {
3240 dberr_t err;
3241
3242 if (node->state == INS_NODE_ALLOC_ROW_ID) {
3243
3244 row_ins_alloc_row_id_step(node);
3245
3246 node->index = dict_table_get_first_index(node->table);
3247 node->entry = UT_LIST_GET_FIRST(node->entry_list);
3248
3249 if (node->ins_type == INS_SEARCHED) {
3250
3251 row_ins_get_row_from_select(node);
3252
3253 } else if (node->ins_type == INS_VALUES) {
3254
3255 row_ins_get_row_from_values(node);
3256 }
3257
3258 node->state = INS_NODE_INSERT_ENTRIES;
3259 }
3260
3261 ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3262
3263 while (node->index != NULL) {
3264 if (node->index->type != DICT_FTS) {
3265 err = row_ins_index_entry_step(node, thr);
3266
3267 if (err != DB_SUCCESS) {
3268
3269 return(err);
3270 }
3271 }
3272
3273 node->index = dict_table_get_next_index(node->index);
3274 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3275
3276 DBUG_EXECUTE_IF(
3277 "row_ins_skip_sec",
3278 node->index = NULL; node->entry = NULL; break;);
3279
3280 /* Skip corrupted secondary index and its entry */
3281 while (node->index && dict_index_is_corrupted(node->index)) {
3282
3283 node->index = dict_table_get_next_index(node->index);
3284 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3285 }
3286 }
3287
3288 ut_ad(node->entry == NULL);
3289
3290 node->state = INS_NODE_ALLOC_ROW_ID;
3291
3292 return(DB_SUCCESS);
3293 }
3294
3295 /***********************************************************//**
3296 Inserts a row to a table. This is a high-level function used in SQL execution
3297 graphs.
3298 @return query thread to run next or NULL */
3299 UNIV_INTERN
3300 que_thr_t*
row_ins_step(que_thr_t * thr)3301 row_ins_step(
3302 /*=========*/
3303 que_thr_t* thr) /*!< in: query thread */
3304 {
3305 ins_node_t* node;
3306 que_node_t* parent;
3307 sel_node_t* sel_node;
3308 trx_t* trx;
3309 dberr_t err;
3310
3311 ut_ad(thr);
3312
3313 trx = thr_get_trx(thr);
3314
3315 trx_start_if_not_started_xa(trx);
3316
3317 node = static_cast<ins_node_t*>(thr->run_node);
3318
3319 ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3320
3321 parent = que_node_get_parent(node);
3322 sel_node = node->select;
3323
3324 if (thr->prev_node == parent) {
3325 node->state = INS_NODE_SET_IX_LOCK;
3326 }
3327
3328 /* If this is the first time this node is executed (or when
3329 execution resumes after wait for the table IX lock), set an
3330 IX lock on the table and reset the possible select node. MySQL's
3331 partitioned table code may also call an insert within the same
3332 SQL statement AFTER it has used this table handle to do a search.
3333 This happens, for example, when a row update moves it to another
3334 partition. In that case, we have already set the IX lock on the
3335 table during the search operation, and there is no need to set
3336 it again here. But we must write trx->id to node->trx_id_buf. */
3337
3338 trx_write_trx_id(node->trx_id_buf, trx->id);
3339
3340 if (node->state == INS_NODE_SET_IX_LOCK) {
3341
3342 node->state = INS_NODE_ALLOC_ROW_ID;
3343
3344 /* It may be that the current session has not yet started
3345 its transaction, or it has been committed: */
3346
3347 if (trx->id == node->trx_id) {
3348 /* No need to do IX-locking */
3349
3350 goto same_trx;
3351 }
3352
3353 err = lock_table(0, node->table, LOCK_IX, thr);
3354
3355 DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3356 err = DB_LOCK_WAIT;);
3357
3358 if (err != DB_SUCCESS) {
3359
3360 goto error_handling;
3361 }
3362
3363 node->trx_id = trx->id;
3364 same_trx:
3365 if (node->ins_type == INS_SEARCHED) {
3366 /* Reset the cursor */
3367 sel_node->state = SEL_NODE_OPEN;
3368
3369 /* Fetch a row to insert */
3370
3371 thr->run_node = sel_node;
3372
3373 return(thr);
3374 }
3375 }
3376
3377 if ((node->ins_type == INS_SEARCHED)
3378 && (sel_node->state != SEL_NODE_FETCH)) {
3379
3380 ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3381
3382 /* No more rows to insert */
3383 thr->run_node = parent;
3384
3385 return(thr);
3386 }
3387
3388 /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3389
3390 err = row_ins(node, thr);
3391
3392 error_handling:
3393 trx->error_state = err;
3394
3395 if (err != DB_SUCCESS) {
3396 /* err == DB_LOCK_WAIT or SQL error detected */
3397 return(NULL);
3398 }
3399
3400 /* DO THE TRIGGER ACTIONS HERE */
3401
3402 if (node->ins_type == INS_SEARCHED) {
3403 /* Fetch a row to insert */
3404
3405 thr->run_node = sel_node;
3406 } else {
3407 thr->run_node = que_node_get_parent(node);
3408 }
3409
3410 return(thr);
3411 }
3412