1 /*****************************************************************************
2
3 Copyright (c) 1996, 2020, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25 *****************************************************************************/
26
27 /** @file row/row0ins.cc
28 Insert into a table
29
30 Created 4/20/1996 Heikki Tuuri
31 *******************************************************/
32
33 #include <sys/types.h>
34
35 #include "btr0btr.h"
36 #include "btr0cur.h"
37 #include "buf0lru.h"
38 #include "current_thd.h"
39 #include "data0data.h"
40 #include "dict0boot.h"
41 #include "dict0dd.h"
42 #include "dict0dict.h"
43 #include "eval0eval.h"
44 #include "fts0fts.h"
45 #include "fts0types.h"
46 #include "gis0geo.h"
47 #include "ha_prototypes.h"
48 #include "lob0lob.h"
49 #include "lock0lock.h"
50 #include "log0log.h"
51 #include "m_string.h"
52 #include "mach0data.h"
53 #include "que0que.h"
54 #include "rem0cmp.h"
55 #include "row0ins.h"
56 #include "row0log.h"
57 #include "row0row.h"
58 #include "row0sel.h"
59 #include "row0upd.h"
60 #include "trx0rec.h"
61 #include "trx0undo.h"
62 #include "usr0sess.h"
63
64 #include "my_dbug.h"
65
66 /*************************************************************************
67 IMPORTANT NOTE: Any operation that generates redo MUST check that there
68 is enough space in the redo log before for that operation. This is
69 done by calling log_free_check(). The reason for checking the
70 availability of the redo log space before the start of the operation is
71 that we MUST not hold any synchonization objects when performing the
72 check.
73 If you make a change in this module make sure that no codepath is
74 introduced where a call to log_free_check() is bypassed. */
75
76 /** Creates an insert node struct.
77 @return own: insert node struct */
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)78 ins_node_t *ins_node_create(
79 ulint ins_type, /*!< in: INS_VALUES, ... */
80 dict_table_t *table, /*!< in: table where to insert */
81 mem_heap_t *heap) /*!< in: mem heap where created */
82 {
83 ins_node_t *node;
84
85 node = static_cast<ins_node_t *>(mem_heap_alloc(heap, sizeof(ins_node_t)));
86
87 node->common.type = QUE_NODE_INSERT;
88
89 node->ins_type = ins_type;
90
91 node->state = INS_NODE_SET_IX_LOCK;
92 node->table = table;
93 node->index = nullptr;
94 node->entry = nullptr;
95
96 node->select = nullptr;
97
98 node->trx_id = 0;
99
100 node->entry_sys_heap = mem_heap_create(128);
101
102 node->magic_n = INS_NODE_MAGIC_N;
103
104 node->ins_multi_val_pos = 0;
105
106 return (node);
107 }
108
109 /** Creates an entry template for each index of a table. */
ins_node_create_entry_list(ins_node_t * node)110 static void ins_node_create_entry_list(
111 ins_node_t *node) /*!< in: row insert node */
112 {
113 dict_index_t *index;
114 dtuple_t *entry;
115
116 ut_ad(node->entry_sys_heap);
117
118 UT_LIST_INIT(node->entry_list, &dtuple_t::tuple_list);
119
120 /* We will include all indexes (include those corrupted
121 secondary indexes) in the entry list. Filteration of
122 these corrupted index will be done in row_ins() */
123
124 for (index = node->table->first_index(); index != nullptr;
125 index = index->next()) {
126 entry = row_build_index_entry_low(
127 node->row, nullptr, index, node->entry_sys_heap, ROW_BUILD_FOR_INSERT);
128
129 UT_LIST_ADD_LAST(node->entry_list, entry);
130 }
131 }
132
133 /** Adds system field buffers to a row. */
row_ins_alloc_sys_fields(ins_node_t * node)134 static void row_ins_alloc_sys_fields(ins_node_t *node) /*!< in: insert node */
135 {
136 dtuple_t *row;
137 dict_table_t *table;
138 mem_heap_t *heap;
139 const dict_col_t *col;
140 dfield_t *dfield;
141 byte *ptr;
142
143 row = node->row;
144 table = node->table;
145 heap = node->entry_sys_heap;
146
147 ut_ad(row && table && heap);
148 ut_ad(dtuple_get_n_fields(row) == table->get_n_cols());
149
150 /* allocate buffer to hold the needed system created hidden columns. */
151 uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN;
152 if (!table->is_intrinsic()) {
153 len += DATA_ROLL_PTR_LEN;
154 }
155 ptr = static_cast<byte *>(mem_heap_zalloc(heap, len));
156
157 /* 1. Populate row-id */
158 col = table->get_sys_col(DATA_ROW_ID);
159
160 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
161
162 dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
163
164 node->row_id_buf = ptr;
165
166 ptr += DATA_ROW_ID_LEN;
167
168 /* 2. Populate trx id */
169 col = table->get_sys_col(DATA_TRX_ID);
170
171 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
172
173 dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
174
175 node->trx_id_buf = ptr;
176
177 ptr += DATA_TRX_ID_LEN;
178
179 if (!table->is_intrinsic()) {
180 col = table->get_sys_col(DATA_ROLL_PTR);
181
182 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
183
184 dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
185 }
186 }
187
188 /** Sets a new row to insert for an INS_DIRECT node. This function is only used
189 if we have constructed the row separately, which is a rare case; this
190 function is quite slow. */
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)191 void ins_node_set_new_row(
192 ins_node_t *node, /*!< in: insert node */
193 dtuple_t *row) /*!< in: new row (or first row) for the node */
194 {
195 node->state = INS_NODE_SET_IX_LOCK;
196 node->index = nullptr;
197 node->entry = nullptr;
198
199 node->row = row;
200
201 mem_heap_empty(node->entry_sys_heap);
202
203 /* Create templates for index entries */
204
205 ins_node_create_entry_list(node);
206
207 /* Allocate from entry_sys_heap buffers for sys fields */
208
209 row_ins_alloc_sys_fields(node);
210
211 /* As we allocated a new trx id buf, the trx id should be written
212 there again: */
213
214 node->trx_id = 0;
215 }
216
217 /** Does an insert operation by updating a delete-marked existing record
218 in the index. This situation can occur if the delete-marked record is
219 kept in the index for consistent reads.
220 @return DB_SUCCESS or error code */
221 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)222 row_ins_sec_index_entry_by_modify(
223 ulint flags, /*!< in: undo logging and locking flags */
224 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
225 depending on whether mtr holds just a leaf
226 latch or also a tree latch */
227 btr_cur_t *cursor, /*!< in: B-tree cursor */
228 ulint **offsets, /*!< in/out: offsets on cursor->page_cur.rec */
229 mem_heap_t *offsets_heap,
230 /*!< in/out: memory heap that can be emptied */
231 mem_heap_t *heap, /*!< in/out: memory heap */
232 const dtuple_t *entry, /*!< in: index entry to insert */
233 que_thr_t *thr, /*!< in: query thread */
234 mtr_t *mtr) /*!< in: mtr; must be committed before
235 latching any further pages */
236 {
237 big_rec_t *dummy_big_rec;
238 upd_t *update;
239 rec_t *rec;
240 dberr_t err;
241
242 rec = btr_cur_get_rec(cursor);
243
244 ut_ad(!cursor->index->is_clustered());
245 ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
246 ut_ad(!entry->info_bits);
247
248 /* We know that in the alphabetical ordering, entry and rec are
249 identified. But in their binary form there may be differences if
250 there are char fields in them. Therefore we have to calculate the
251 difference. */
252
253 update = row_upd_build_sec_rec_difference_binary(rec, cursor->index, *offsets,
254 entry, heap);
255
256 if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
257 /* We should never insert in place of a record that
258 has not been delete-marked. The only exception is when
259 online CREATE INDEX copied the changes that we already
260 made to the clustered index, and completed the
261 secondary index creation before we got here. In this
262 case, the change would already be there. The CREATE
263 INDEX should be waiting for a MySQL meta-data lock
264 upgrade at least until this INSERT or UPDATE
265 returns. After that point, set_committed(true)
266 would be invoked in commit_inplace_alter_table(). */
267 ut_a(update->n_fields == 0);
268 ut_a(!cursor->index->is_committed());
269 ut_ad(!dict_index_is_online_ddl(cursor->index));
270 return (DB_SUCCESS);
271 }
272
273 if (mode == BTR_MODIFY_LEAF) {
274 /* Try an optimistic updating of the record, keeping changes
275 within the page */
276
277 /* TODO: pass only *offsets */
278 err = btr_cur_optimistic_update(flags | BTR_KEEP_SYS_FLAG, cursor, offsets,
279 &offsets_heap, update, 0, thr,
280 thr_get_trx(thr)->id, mtr);
281 switch (err) {
282 case DB_OVERFLOW:
283 case DB_UNDERFLOW:
284 case DB_ZIP_OVERFLOW:
285 err = DB_FAIL;
286 default:
287 break;
288 }
289 } else {
290 ut_a(mode == BTR_MODIFY_TREE);
291 if (buf_LRU_buf_pool_running_out()) {
292 return (DB_LOCK_TABLE_FULL);
293 }
294
295 trx_t *trx = thr_get_trx(thr);
296 err = btr_cur_pessimistic_update(
297 flags | BTR_KEEP_SYS_FLAG, cursor, offsets, &offsets_heap, heap,
298 &dummy_big_rec, update, 0, thr, trx->id, trx->undo_no, mtr);
299 ut_ad(!dummy_big_rec);
300 }
301
302 return (err);
303 }
304
305 /** Does an insert operation by delete unmarking and updating a delete marked
306 existing record in the index. This situation can occur if the delete marked
307 record is kept in the index for consistent reads.
308 @return DB_SUCCESS, DB_FAIL, or error code */
309 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_clust_index_entry_by_modify(btr_pcur_t * pcur,ulint flags,ulint mode,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)310 row_ins_clust_index_entry_by_modify(
311 btr_pcur_t *pcur, /*!< in/out: a persistent cursor pointing
312 to the clust_rec that is being modified. */
313 ulint flags, /*!< in: undo logging and locking flags */
314 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
315 depending on whether mtr holds just a leaf
316 latch or also a tree latch */
317 ulint **offsets, /*!< out: offsets on cursor->page_cur.rec */
318 mem_heap_t **offsets_heap,
319 /*!< in/out: pointer to memory heap that can
320 be emptied, or NULL */
321 mem_heap_t *heap, /*!< in/out: memory heap */
322 const dtuple_t *entry, /*!< in: index entry to insert */
323 que_thr_t *thr, /*!< in: query thread */
324 mtr_t *mtr) /*!< in: mtr; must be committed before
325 latching any further pages */
326 {
327 const rec_t *rec;
328 upd_t *update;
329 dberr_t err = DB_SUCCESS;
330 btr_cur_t *cursor = btr_pcur_get_btr_cur(pcur);
331 TABLE *mysql_table = nullptr;
332 ut_ad(cursor->index->is_clustered());
333
334 rec = btr_cur_get_rec(cursor);
335
336 ut_ad(rec_get_deleted_flag(rec, dict_table_is_comp(cursor->index->table)));
337
338 /* Build an update vector containing all the fields to be modified;
339 NOTE that this vector may NOT contain system columns trx_id or
340 roll_ptr */
341 if (thr->prebuilt != nullptr) {
342 mysql_table = thr->prebuilt->m_mysql_table;
343 ut_ad(thr->prebuilt->trx == thr_get_trx(thr));
344 }
345
346 update = row_upd_build_difference_binary(cursor->index, entry, rec, nullptr,
347 true, thr_get_trx(thr), heap,
348 mysql_table, &err);
349 if (err != DB_SUCCESS) {
350 return (err);
351 }
352 if (mode != BTR_MODIFY_TREE) {
353 ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
354
355 /* Try optimistic updating of the record, keeping changes
356 within the page */
357
358 err = btr_cur_optimistic_update(flags, cursor, offsets, offsets_heap,
359 update, 0, thr, thr_get_trx(thr)->id, mtr);
360 switch (err) {
361 case DB_OVERFLOW:
362 case DB_UNDERFLOW:
363 case DB_ZIP_OVERFLOW:
364 err = DB_FAIL;
365 default:
366 break;
367 }
368 } else {
369 if (buf_LRU_buf_pool_running_out()) {
370 return (DB_LOCK_TABLE_FULL);
371 }
372
373 big_rec_t *big_rec = nullptr;
374 trx_t *trx = thr_get_trx(thr);
375 trx_id_t trx_id = thr_get_trx(thr)->id;
376
377 DEBUG_SYNC_C("before_row_ins_upd_pessimistic");
378
379 err = btr_cur_pessimistic_update(flags | BTR_KEEP_POS_FLAG, cursor, offsets,
380 offsets_heap, heap, &big_rec, update, 0,
381 thr, trx_id, trx->undo_no, mtr);
382
383 if (big_rec) {
384 ut_a(err == DB_SUCCESS);
385
386 DEBUG_SYNC_C("before_row_ins_upd_extern");
387 err = lob::btr_store_big_rec_extern_fields(
388 trx, pcur, update, *offsets, big_rec, mtr, lob::OPCODE_INSERT_UPDATE);
389 DEBUG_SYNC_C("after_row_ins_upd_extern");
390 dtuple_big_rec_free(big_rec);
391 }
392 }
393
394 return (err);
395 }
396
397 /** Returns TRUE if in a cascaded update/delete an ancestor node of node
398 updates (not DELETE, but UPDATE) table.
399 @return true if an ancestor updates table */
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)400 static ibool row_ins_cascade_ancestor_updates_table(
401 que_node_t *node, /*!< in: node in a query graph */
402 dict_table_t *table) /*!< in: table */
403 {
404 que_node_t *parent;
405
406 for (parent = que_node_get_parent(node);
407 que_node_get_type(parent) == QUE_NODE_UPDATE;
408 parent = que_node_get_parent(parent)) {
409 upd_node_t *upd_node;
410
411 upd_node = static_cast<upd_node_t *>(parent);
412
413 if (upd_node->table == table && upd_node->is_delete == FALSE) {
414 return (TRUE);
415 }
416 }
417
418 return (FALSE);
419 }
420
421 /** Returns the number of ancestor UPDATE or DELETE nodes of a
422 cascaded update/delete node.
423 @return number of ancestors */
row_ins_cascade_n_ancestors(que_node_t * node)424 static MY_ATTRIBUTE((warn_unused_result)) ulint row_ins_cascade_n_ancestors(
425 que_node_t *node) /*!< in: node in a query graph */
426 {
427 que_node_t *parent;
428 ulint n_ancestors = 0;
429
430 for (parent = que_node_get_parent(node);
431 que_node_get_type(parent) == QUE_NODE_UPDATE;
432 parent = que_node_get_parent(parent)) {
433 n_ancestors++;
434 }
435
436 return (n_ancestors);
437 }
438
439 /** Calculates the update vector node->cascade->update for a child table in
440 a cascaded update.
441 @return number of fields in the calculated update vector; the value
442 can also be 0 if no foreign key fields changed; the returned value is
443 ULINT_UNDEFINED if the column type in the child table is too short to
444 fit the new value in the parent table: that means the update fails */
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)445 static MY_ATTRIBUTE((warn_unused_result)) ulint row_ins_cascade_calc_update_vec(
446 upd_node_t *node, /*!< in: update node of the parent
447 table */
448 dict_foreign_t *foreign, /*!< in: foreign key constraint whose
449 type is != 0 */
450 mem_heap_t *heap, /*!< in: memory heap to use as
451 temporary storage */
452 trx_t *trx, /*!< in: update transaction */
453 ibool *fts_col_affected)
454 /*!< out: is FTS column affected */
455 {
456 upd_node_t *cascade = node->cascade_node;
457 dict_table_t *table = foreign->foreign_table;
458 dict_index_t *index = foreign->foreign_index;
459 upd_t *update;
460 dict_table_t *parent_table;
461 dict_index_t *parent_index;
462 upd_t *parent_update;
463 ulint n_fields_updated;
464 ulint parent_field_no;
465 ulint i;
466 ulint j;
467 ibool doc_id_updated = FALSE;
468 ulint doc_id_pos = 0;
469 doc_id_t new_doc_id = FTS_NULL_DOC_ID;
470
471 ut_a(node);
472 ut_a(foreign);
473 ut_a(cascade);
474 ut_a(table);
475 ut_a(index);
476
477 /* Calculate the appropriate update vector which will set the fields
478 in the child index record to the same value (possibly padded with
479 spaces if the column is a fixed length CHAR or FIXBINARY column) as
480 the referenced index record will get in the update. */
481
482 parent_table = node->table;
483 ut_a(parent_table == foreign->referenced_table);
484 parent_index = foreign->referenced_index;
485 parent_update = node->update;
486
487 update = cascade->update;
488
489 update->info_bits = 0;
490
491 n_fields_updated = 0;
492
493 *fts_col_affected = FALSE;
494
495 if (table->fts) {
496 doc_id_pos = dict_table_get_nth_col_pos(table, table->fts->doc_col);
497 }
498
499 for (i = 0; i < foreign->n_fields; i++) {
500 parent_field_no =
501 dict_table_get_nth_col_pos(parent_table, parent_index->get_col_no(i));
502
503 for (j = 0; j < parent_update->n_fields; j++) {
504 const upd_field_t *parent_ufield = &parent_update->fields[j];
505
506 if (parent_ufield->field_no == parent_field_no) {
507 ulint min_size;
508 const dict_col_t *col;
509 ulint ufield_len;
510 upd_field_t *ufield;
511
512 col = index->get_col(i);
513
514 /* A field in the parent index record is
515 updated. Let us make the update vector
516 field for the child table. */
517
518 ufield = update->fields + n_fields_updated;
519
520 ufield->field_no =
521 dict_table_get_nth_col_pos(table, dict_col_get_no(col));
522
523 ufield->orig_len = 0;
524 ufield->exp = nullptr;
525
526 ufield->new_val = parent_ufield->new_val;
527 ufield_len = dfield_get_len(&ufield->new_val);
528
529 /* Clear the "external storage" flag */
530 dfield_set_len(&ufield->new_val, ufield_len);
531
532 /* Do not allow a NOT NULL column to be
533 updated as NULL */
534
535 if (dfield_is_null(&ufield->new_val) && (col->prtype & DATA_NOT_NULL)) {
536 return (ULINT_UNDEFINED);
537 }
538
539 /* If the new value would not fit in the
540 column, do not allow the update */
541
542 if (!dfield_is_null(&ufield->new_val) &&
543 dtype_get_at_most_n_mbchars(
544 col->prtype, col->mbminmaxlen, col->len, ufield_len,
545 static_cast<char *>(dfield_get_data(&ufield->new_val))) <
546 ufield_len) {
547 return (ULINT_UNDEFINED);
548 }
549
550 /* If the parent column type has a different
551 length than the child column type, we may
552 need to pad with spaces the new value of the
553 child column */
554
555 min_size = col->get_min_size();
556
557 /* Because UNIV_SQL_NULL (the marker
558 of SQL NULL values) exceeds all possible
559 values of min_size, the test below will
560 not hold for SQL NULL columns. */
561
562 if (min_size > ufield_len) {
563 byte *pad;
564 ulint pad_len;
565 byte *padded_data;
566 ulint mbminlen;
567
568 padded_data = static_cast<byte *>(mem_heap_alloc(heap, min_size));
569
570 pad = padded_data + ufield_len;
571 pad_len = min_size - ufield_len;
572
573 memcpy(padded_data, dfield_get_data(&ufield->new_val), ufield_len);
574
575 mbminlen = col->get_mbminlen();
576
577 ut_ad(!(ufield_len % mbminlen));
578 ut_ad(!(min_size % mbminlen));
579
580 if (mbminlen == 1 && dtype_get_charset_coll(col->prtype) ==
581 DATA_MYSQL_BINARY_CHARSET_COLL) {
582 /* Do not pad BINARY columns */
583 return (ULINT_UNDEFINED);
584 }
585
586 row_mysql_pad_col(mbminlen, pad, pad_len);
587 dfield_set_data(&ufield->new_val, padded_data, min_size);
588 }
589
590 /* Check whether the current column has
591 FTS index on it */
592 if (table->fts &&
593 dict_table_is_fts_column(table->fts->indexes, dict_col_get_no(col),
594 col->is_virtual()) != ULINT_UNDEFINED) {
595 *fts_col_affected = TRUE;
596 }
597
598 /* If Doc ID is updated, check whether the
599 Doc ID is valid */
600 if (table->fts && ufield->field_no == doc_id_pos) {
601 doc_id_t n_doc_id;
602
603 n_doc_id = table->fts->cache->next_doc_id;
604
605 new_doc_id = fts_read_doc_id(
606 static_cast<const byte *>(dfield_get_data(&ufield->new_val)));
607
608 if (new_doc_id <= 0) {
609 ib::error(ER_IB_MSG_954) << "FTS Doc ID"
610 " must be larger than"
611 " 0";
612 return (ULINT_UNDEFINED);
613 }
614
615 if (new_doc_id < n_doc_id) {
616 ib::error(ER_IB_MSG_955)
617 << "FTS Doc ID"
618 " must be larger than "
619 << n_doc_id - 1 << " for table " << table->name;
620
621 return (ULINT_UNDEFINED);
622 }
623
624 *fts_col_affected = TRUE;
625 doc_id_updated = TRUE;
626 }
627
628 n_fields_updated++;
629 }
630 }
631 }
632
633 /* Generate a new Doc ID if FTS index columns get updated */
634 if (table->fts && *fts_col_affected) {
635 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
636 doc_id_t doc_id;
637 doc_id_t *next_doc_id;
638 upd_field_t *ufield;
639
640 next_doc_id =
641 static_cast<doc_id_t *>(mem_heap_alloc(heap, sizeof(doc_id_t)));
642
643 ut_ad(!doc_id_updated);
644 ufield = update->fields + n_fields_updated;
645 fts_get_next_doc_id(table, next_doc_id);
646 doc_id = fts_update_doc_id(table, ufield, next_doc_id);
647 n_fields_updated++;
648 fts_trx_add_op(trx, table, doc_id, FTS_INSERT, nullptr);
649 } else {
650 if (doc_id_updated) {
651 ut_ad(new_doc_id);
652 fts_trx_add_op(trx, table, new_doc_id, FTS_INSERT, nullptr);
653 } else {
654 ib::error(ER_IB_MSG_956) << "FTS Doc ID must be updated"
655 " along with FTS indexed column for"
656 " table "
657 << table->name;
658 return (ULINT_UNDEFINED);
659 }
660 }
661 }
662
663 update->n_fields = n_fields_updated;
664
665 return (n_fields_updated);
666 }
667
668 /** Set detailed error message associated with foreign key errors for
669 the given transaction. */
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)670 static void row_ins_set_detailed(
671 trx_t *trx, /*!< in: transaction */
672 dict_foreign_t *foreign) /*!< in: foreign key constraint */
673 {
674 ut_ad(!srv_read_only_mode);
675
676 mutex_enter(&srv_misc_tmpfile_mutex);
677 rewind(srv_misc_tmpfile);
678
679 if (os_file_set_eof(srv_misc_tmpfile)) {
680 ut_print_name(srv_misc_tmpfile, trx, foreign->foreign_table_name);
681 dict_print_info_on_foreign_key_in_create_format(srv_misc_tmpfile, trx,
682 foreign, FALSE);
683 trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
684 } else {
685 trx_set_detailed_error(trx, "temp file operation failed");
686 }
687
688 mutex_exit(&srv_misc_tmpfile_mutex);
689 }
690
691 /** Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
692 and displays information about the given transaction.
693 The caller must release dict_foreign_err_mutex. */
row_ins_foreign_trx_print(trx_t * trx)694 static void row_ins_foreign_trx_print(trx_t *trx) /*!< in: transaction */
695 {
696 ulint n_rec_locks;
697 ulint n_trx_locks;
698 ulint heap_size;
699
700 if (srv_read_only_mode) {
701 return;
702 }
703
704 {
705 /** lock_number_of_rows_locked() requires global exclusive latch, and so
706 does accessing trx_locks with trx->mutex */
707 locksys::Global_exclusive_latch_guard guard{};
708 n_rec_locks = lock_number_of_rows_locked(&trx->lock);
709 n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
710 heap_size = mem_heap_get_size(trx->lock.lock_heap);
711 }
712
713 trx_sys_mutex_enter();
714
715 mutex_enter(&dict_foreign_err_mutex);
716 rewind(dict_foreign_err_file);
717 ut_print_timestamp(dict_foreign_err_file);
718 fputs(" Transaction:\n", dict_foreign_err_file);
719
720 trx_print_low(dict_foreign_err_file, trx, 600, n_rec_locks, n_trx_locks,
721 heap_size);
722
723 trx_sys_mutex_exit();
724
725 ut_ad(mutex_own(&dict_foreign_err_mutex));
726 }
727
728 /** Reports a foreign key error associated with an update or a delete of a
729 parent table index entry. */
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)730 static void row_ins_foreign_report_err(
731 const char *errstr, /*!< in: error string from the viewpoint
732 of the parent table */
733 que_thr_t *thr, /*!< in: query thread whose run_node
734 is an update node */
735 dict_foreign_t *foreign, /*!< in: foreign key constraint */
736 const rec_t *rec, /*!< in: a matching index record in the
737 child table */
738 const dtuple_t *entry) /*!< in: index entry in the parent
739 table */
740 {
741 if (srv_read_only_mode) {
742 return;
743 }
744
745 FILE *ef = dict_foreign_err_file;
746 trx_t *trx = thr_get_trx(thr);
747
748 row_ins_set_detailed(trx, foreign);
749
750 row_ins_foreign_trx_print(trx);
751
752 fputs("Foreign key constraint fails for table ", ef);
753 ut_print_name(ef, trx, foreign->foreign_table_name);
754 fputs(":\n", ef);
755 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign, TRUE);
756 putc('\n', ef);
757 fputs(errstr, ef);
758 fprintf(ef, " in parent table, in index %s",
759 foreign->referenced_index->name());
760 if (entry) {
761 fputs(" tuple:\n", ef);
762 dtuple_print(ef, entry);
763 }
764 fputs("\nBut in child table ", ef);
765 ut_print_name(ef, trx, foreign->foreign_table_name);
766 fprintf(ef, ", in index %s", foreign->foreign_index->name());
767 if (rec) {
768 fputs(", there is a record:\n", ef);
769 rec_print(ef, rec, foreign->foreign_index);
770 } else {
771 fputs(", the record is not available\n", ef);
772 }
773 putc('\n', ef);
774
775 mutex_exit(&dict_foreign_err_mutex);
776 }
777
778 /** Reports a foreign key error to dict_foreign_err_file when we are trying
779 to add an index entry to a child table. Note that the adding may be the result
780 of an update, too. */
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)781 static void row_ins_foreign_report_add_err(
782 trx_t *trx, /*!< in: transaction */
783 dict_foreign_t *foreign, /*!< in: foreign key constraint */
784 const rec_t *rec, /*!< in: a record in the parent table:
785 it does not match entry because we
786 have an error! */
787 const dtuple_t *entry) /*!< in: index entry to insert in the
788 child table */
789 {
790 if (srv_read_only_mode) {
791 return;
792 }
793
794 FILE *ef = dict_foreign_err_file;
795
796 row_ins_set_detailed(trx, foreign);
797
798 row_ins_foreign_trx_print(trx);
799
800 fputs("Foreign key constraint fails for table ", ef);
801 ut_print_name(ef, trx, foreign->foreign_table_name);
802 fputs(":\n", ef);
803 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign, TRUE);
804 fprintf(ef, "\nTrying to add in child table, in index %s",
805 foreign->foreign_index->name());
806 if (entry) {
807 fputs(" tuple:\n", ef);
808 /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
809 It would be better to only display the user columns. */
810 dtuple_print(ef, entry);
811 }
812 fputs("\nBut in parent table ", ef);
813 ut_print_name(ef, trx, foreign->referenced_table_name);
814 fprintf(ef,
815 ", in index %s,\n"
816 "the closest match we can find is record:\n",
817 foreign->referenced_index->name());
818 if (rec && page_rec_is_supremum(rec)) {
819 /* If the cursor ended on a supremum record, it is better
820 to report the previous record in the error message, so that
821 the user gets a more descriptive error message. */
822 rec = page_rec_get_prev_const(rec);
823 }
824
825 if (rec) {
826 rec_print(ef, rec, foreign->referenced_index);
827 }
828 putc('\n', ef);
829
830 mutex_exit(&dict_foreign_err_mutex);
831 }
832
833 /** Fill virtual column information in cascade node for the child table.
834 @param[in] trx current transaction
835 @param[out] cascade child update node
836 @param[in] rec clustered rec of child table
837 @param[in] index clustered index of child table
838 @param[in] node parent update node
839 @param[in] foreign foreign key information.
840 @param[out] err error code. */
row_ins_foreign_fill_virtual(trx_t * trx,upd_node_t * cascade,const rec_t * rec,dict_index_t * index,upd_node_t * node,dict_foreign_t * foreign,dberr_t * err)841 static void row_ins_foreign_fill_virtual(trx_t *trx, upd_node_t *cascade,
842 const rec_t *rec, dict_index_t *index,
843 upd_node_t *node,
844 dict_foreign_t *foreign,
845 dberr_t *err) {
846 row_ext_t *ext;
847 THD *thd = current_thd;
848 ulint offsets_[REC_OFFS_NORMAL_SIZE];
849 rec_offs_init(offsets_);
850 const ulint *offsets =
851 rec_get_offsets(rec, index, offsets_, ULINT_UNDEFINED, &cascade->heap);
852 mem_heap_t *v_heap = nullptr;
853 upd_t *update = cascade->update;
854 ulint n_v_fld = index->table->n_v_def;
855 ulint n_diff;
856 upd_field_t *upd_field;
857 dict_vcol_set *v_cols = foreign->v_cols;
858
859 update->old_vrow =
860 row_build(ROW_COPY_POINTERS, index, rec, offsets, index->table, nullptr,
861 nullptr, &ext, cascade->heap);
862
863 n_diff = update->n_fields;
864
865 update->n_fields += n_v_fld;
866
867 if (index->table->vc_templ == nullptr) {
868 /** This can occur when there is a cascading
869 delete or update after restart. */
870 innobase_init_vc_templ(index->table);
871 }
872
873 for (ulint i = 0; i < n_v_fld; i++) {
874 dict_v_col_t *col = dict_table_get_nth_v_col(index->table, i);
875 auto it = v_cols->find(col);
876
877 if (it == v_cols->end()) {
878 continue;
879 }
880
881 dfield_t *vfield = innobase_get_computed_value(
882 update->old_vrow, col, index, &v_heap, update->heap, nullptr, thd,
883 nullptr, nullptr, nullptr, nullptr);
884
885 if (vfield == nullptr) {
886 *err = DB_COMPUTE_VALUE_FAILED;
887 goto func_exit;
888 }
889
890 upd_field = upd_get_nth_field(update, n_diff);
891
892 upd_field->old_v_val = static_cast<dfield_t *>(
893 mem_heap_alloc(cascade->heap, sizeof *upd_field->old_v_val));
894
895 dfield_copy(upd_field->old_v_val, vfield);
896
897 upd_field_set_v_field_no(upd_field, i, index);
898
899 if (node->is_delete ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
900 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
901 dfield_set_null(&upd_field->new_val);
902 }
903
904 if (!node->is_delete && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
905 dfield_t *new_vfield = innobase_get_computed_value(
906 update->old_vrow, col, index, &v_heap, update->heap, nullptr, thd,
907 nullptr, nullptr, node->update, foreign);
908
909 if (new_vfield == nullptr) {
910 *err = DB_COMPUTE_VALUE_FAILED;
911 goto func_exit;
912 }
913
914 dfield_copy(&(upd_field->new_val), new_vfield);
915 }
916
917 n_diff++;
918 }
919
920 update->n_fields = n_diff;
921 *err = DB_SUCCESS;
922
923 func_exit:
924 if (v_heap) {
925 mem_heap_free(v_heap);
926 }
927 }
928
929 /** Perform referential actions or checks when a parent row is deleted or
930 updated and the constraint had an ON DELETE or ON UPDATE condition which was
931 not RESTRICT.
932 @return DB_SUCCESS, DB_LOCK_WAIT, or error code
933 Disable inlining because of a bug in gcc8 which may lead to stack exhaustion.
934 */
NO_INLINE(warn_unused_result)935 static NO_INLINE MY_ATTRIBUTE((warn_unused_result)) dberr_t
936 row_ins_foreign_check_on_constraint(
937 que_thr_t *thr, /*!< in: query thread whose run_node
938 is an update node */
939 dict_foreign_t *foreign, /*!< in: foreign key constraint whose
940 type is != 0 */
941 btr_pcur_t *pcur, /*!< in: cursor placed on a matching
942 index record in the child table */
943 dtuple_t *entry, /*!< in: index entry in the parent
944 table */
945 mtr_t *mtr) /*!< in: mtr holding the latch of pcur
946 page */
947 {
948 upd_node_t *node;
949 upd_node_t *cascade;
950 dict_table_t *table = foreign->foreign_table;
951 dict_index_t *index;
952 dict_index_t *clust_index;
953 dtuple_t *ref;
954 const rec_t *rec;
955 const rec_t *clust_rec;
956 const buf_block_t *clust_block;
957 upd_t *update;
958 ulint n_to_update;
959 dberr_t err;
960 ulint i;
961 trx_t *trx;
962 mem_heap_t *tmp_heap = nullptr;
963 doc_id_t doc_id = FTS_NULL_DOC_ID;
964 ibool fts_col_affacted = FALSE;
965
966 DBUG_TRACE;
967 ut_a(thr);
968 ut_a(foreign);
969 ut_a(pcur);
970 ut_a(mtr);
971
972 trx = thr_get_trx(thr);
973
974 node = static_cast<upd_node_t *>(thr->run_node);
975
976 if (node->is_delete &&
977 0 == (foreign->type & (DICT_FOREIGN_ON_DELETE_CASCADE |
978 DICT_FOREIGN_ON_DELETE_SET_NULL))) {
979 row_ins_foreign_report_err("Trying to delete", thr, foreign,
980 btr_pcur_get_rec(pcur), entry);
981
982 return DB_ROW_IS_REFERENCED;
983 }
984
985 if (!node->is_delete &&
986 0 == (foreign->type & (DICT_FOREIGN_ON_UPDATE_CASCADE |
987 DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
988 /* This is an UPDATE */
989
990 row_ins_foreign_report_err("Trying to update", thr, foreign,
991 btr_pcur_get_rec(pcur), entry);
992
993 return DB_ROW_IS_REFERENCED;
994 }
995
996 if (node->cascade_node == nullptr) {
997 node->cascade_heap = mem_heap_create(128);
998 node->cascade_node =
999 row_create_update_node_for_mysql(table, node->cascade_heap);
1000 que_node_set_parent(node->cascade_node, node);
1001 }
1002 cascade = node->cascade_node;
1003 cascade->table = table;
1004 cascade->foreign = foreign;
1005
1006 if (node->is_delete && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1007 cascade->is_delete = TRUE;
1008 } else {
1009 cascade->is_delete = FALSE;
1010
1011 if (foreign->n_fields > cascade->update_n_fields) {
1012 /* We have to make the update vector longer */
1013
1014 cascade->update = upd_create(foreign->n_fields, node->cascade_heap);
1015 cascade->update_n_fields = foreign->n_fields;
1016 }
1017 }
1018
1019 /* We do not allow cyclic cascaded updating (DELETE is allowed,
1020 but not UPDATE) of the same table, as this can lead to an infinite
1021 cycle. Check that we are not updating the same table which is
1022 already being modified in this cascade chain. We have to check
1023 this also because the modification of the indexes of a 'parent'
1024 table may still be incomplete, and we must avoid seeing the indexes
1025 of the parent table in an inconsistent state! */
1026
1027 if (!cascade->is_delete &&
1028 row_ins_cascade_ancestor_updates_table(cascade, table)) {
1029 /* We do not know if this would break foreign key
1030 constraints, but play safe and return an error */
1031
1032 err = DB_ROW_IS_REFERENCED;
1033
1034 row_ins_foreign_report_err(
1035 "Trying an update, possibly causing a cyclic"
1036 " cascaded update\n"
1037 "in the child table,",
1038 thr, foreign, btr_pcur_get_rec(pcur), entry);
1039
1040 goto nonstandard_exit_func;
1041 }
1042
1043 if (row_ins_cascade_n_ancestors(cascade) >= FK_MAX_CASCADE_DEL) {
1044 err = DB_FOREIGN_EXCEED_MAX_CASCADE;
1045
1046 row_ins_foreign_report_err("Trying a too deep cascaded delete or update\n",
1047 thr, foreign, btr_pcur_get_rec(pcur), entry);
1048
1049 goto nonstandard_exit_func;
1050 }
1051
1052 index = btr_pcur_get_btr_cur(pcur)->index;
1053
1054 ut_a(index == foreign->foreign_index);
1055
1056 rec = btr_pcur_get_rec(pcur);
1057
1058 tmp_heap = mem_heap_create(256);
1059
1060 if (index->is_clustered()) {
1061 /* pcur is already positioned in the clustered index of
1062 the child table */
1063
1064 clust_index = index;
1065 clust_rec = rec;
1066 clust_block = btr_pcur_get_block(pcur);
1067 } else {
1068 /* We have to look for the record in the clustered index
1069 in the child table */
1070
1071 clust_index = table->first_index();
1072
1073 ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec, tmp_heap);
1074 btr_pcur_open_with_no_init(clust_index, ref, PAGE_CUR_LE, BTR_SEARCH_LEAF,
1075 cascade->pcur, 0, mtr);
1076
1077 clust_rec = btr_pcur_get_rec(cascade->pcur);
1078 clust_block = btr_pcur_get_block(cascade->pcur);
1079
1080 if (!page_rec_is_user_rec(clust_rec) ||
1081 btr_pcur_get_low_match(cascade->pcur) <
1082 dict_index_get_n_unique(clust_index)) {
1083 ib::error(ER_IB_MSG_957)
1084 << "In cascade of a foreign key op index " << index->name
1085 << " of table " << index->table->name;
1086
1087 fputs("InnoDB: record ", stderr);
1088 rec_print(stderr, rec, index);
1089 fputs(
1090 "\n"
1091 "InnoDB: clustered record ",
1092 stderr);
1093 rec_print(stderr, clust_rec, clust_index);
1094 fputs(
1095 "\n"
1096 "InnoDB: Submit a detailed bug report to"
1097 " http://bugs.mysql.com\n",
1098 stderr);
1099 ut_ad(0);
1100 err = DB_SUCCESS;
1101
1102 goto nonstandard_exit_func;
1103 }
1104 }
1105
1106 /* Set an X-lock on the row to delete or update in the child table */
1107
1108 err = lock_table(0, table, LOCK_IX, thr);
1109
1110 if (err == DB_SUCCESS) {
1111 /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1112 we already have a normal shared lock on the appropriate
1113 gap if the search criterion was not unique */
1114
1115 err = lock_clust_rec_read_check_and_lock_alt(
1116 clust_block, clust_rec, clust_index, LOCK_X, LOCK_REC_NOT_GAP, thr);
1117 }
1118
1119 if (err != DB_SUCCESS) {
1120 goto nonstandard_exit_func;
1121 }
1122
1123 if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1124 /* This can happen if there is a circular reference of
1125 rows such that cascading delete comes to delete a row
1126 already in the process of being delete marked */
1127 err = DB_SUCCESS;
1128
1129 goto nonstandard_exit_func;
1130 }
1131
1132 if (table->fts) {
1133 doc_id = fts_get_doc_id_from_rec(table, clust_rec, clust_index, tmp_heap);
1134 }
1135 if (cascade->is_delete && foreign->v_cols != nullptr &&
1136 foreign->v_cols->size() > 0 && table->vc_templ == nullptr) {
1137 innobase_init_vc_templ(table);
1138 }
1139
1140 if (node->is_delete ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1141 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1142 /* Build the appropriate update vector which sets
1143 foreign->n_fields first fields in rec to SQL NULL */
1144
1145 update = cascade->update;
1146
1147 update->info_bits = 0;
1148 update->n_fields = foreign->n_fields;
1149 UNIV_MEM_INVALID(update->fields, update->n_fields * sizeof *update->fields);
1150
1151 for (i = 0; i < foreign->n_fields; i++) {
1152 upd_field_t *ufield = &update->fields[i];
1153 ulint col_no = index->get_col_no(i);
1154
1155 ufield->field_no = dict_table_get_nth_col_pos(table, col_no);
1156 dict_col_t *col = table->get_col(col_no);
1157 col->copy_type(dfield_get_type(&ufield->new_val));
1158
1159 ufield->orig_len = 0;
1160 ufield->exp = nullptr;
1161 dfield_set_null(&ufield->new_val);
1162
1163 if (table->fts &&
1164 dict_table_is_fts_column(table->fts->indexes, index->get_col_no(i),
1165 index->get_col(i)->is_virtual()) !=
1166 ULINT_UNDEFINED) {
1167 fts_col_affacted = TRUE;
1168 }
1169 }
1170
1171 if (fts_col_affacted) {
1172 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, nullptr);
1173 }
1174
1175 if (foreign->v_cols != nullptr && foreign->v_cols->size() > 0) {
1176 row_ins_foreign_fill_virtual(trx, cascade, clust_rec, clust_index, node,
1177 foreign, &err);
1178
1179 if (err != DB_SUCCESS) {
1180 goto nonstandard_exit_func;
1181 }
1182 }
1183
1184 } else if (table->fts && cascade->is_delete) {
1185 /* DICT_FOREIGN_ON_DELETE_CASCADE case */
1186 for (i = 0; i < foreign->n_fields; i++) {
1187 if (table->fts &&
1188 dict_table_is_fts_column(table->fts->indexes, index->get_col_no(i),
1189 index->get_col(i)->is_virtual()) !=
1190 ULINT_UNDEFINED) {
1191 fts_col_affacted = TRUE;
1192 }
1193 }
1194
1195 if (fts_col_affacted) {
1196 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, nullptr);
1197 }
1198 }
1199
1200 if (!node->is_delete && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1201 /* Build the appropriate update vector which sets changing
1202 foreign->n_fields first fields in rec to new values */
1203
1204 n_to_update = row_ins_cascade_calc_update_vec(node, foreign, tmp_heap, trx,
1205 &fts_col_affacted);
1206
1207 if (foreign->v_cols != nullptr && foreign->v_cols->size() > 0) {
1208 row_ins_foreign_fill_virtual(trx, cascade, clust_rec, clust_index, node,
1209 foreign, &err);
1210
1211 if (err != DB_SUCCESS) {
1212 goto nonstandard_exit_func;
1213 }
1214 }
1215
1216 if (n_to_update == ULINT_UNDEFINED) {
1217 err = DB_ROW_IS_REFERENCED;
1218
1219 row_ins_foreign_report_err(
1220 "Trying a cascaded update where the"
1221 " updated value in the child\n"
1222 "table would not fit in the length"
1223 " of the column, or the value would\n"
1224 "be NULL and the column is"
1225 " declared as not NULL in the child table,",
1226 thr, foreign, btr_pcur_get_rec(pcur), entry);
1227
1228 goto nonstandard_exit_func;
1229 }
1230
1231 if (cascade->update->n_fields == 0) {
1232 /* The update does not change any columns referred
1233 to in this foreign key constraint: no need to do
1234 anything */
1235
1236 err = DB_SUCCESS;
1237
1238 goto nonstandard_exit_func;
1239 }
1240
1241 /* Mark the old Doc ID as deleted */
1242 if (fts_col_affacted) {
1243 ut_ad(table->fts);
1244 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, nullptr);
1245 }
1246 }
1247
1248 /* Store pcur position and initialize or store the cascade node
1249 pcur stored position */
1250
1251 btr_pcur_store_position(pcur, mtr);
1252
1253 if (index == clust_index) {
1254 btr_pcur_copy_stored_position(cascade->pcur, pcur);
1255 } else {
1256 btr_pcur_store_position(cascade->pcur, mtr);
1257 }
1258
1259 mtr_commit(mtr);
1260
1261 ut_a(cascade->pcur->m_rel_pos == BTR_PCUR_ON);
1262
1263 cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1264
1265 err = row_update_cascade_for_mysql(thr, cascade, foreign->foreign_table);
1266
1267 /* Release the data dictionary latch for a while, so that we do not
1268 starve other threads from doing CREATE TABLE etc. if we have a huge
1269 cascaded operation running. The counter n_foreign_key_checks_running
1270 will prevent other users from dropping or ALTERing the table when we
1271 release the latch. */
1272
1273 DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1274
1275 mtr_start(mtr);
1276
1277 /* Restore pcur position */
1278
1279 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1280
1281 if (tmp_heap) {
1282 mem_heap_free(tmp_heap);
1283 }
1284
1285 return err;
1286
1287 nonstandard_exit_func:
1288
1289 if (tmp_heap) {
1290 mem_heap_free(tmp_heap);
1291 }
1292
1293 btr_pcur_store_position(pcur, mtr);
1294
1295 mtr_commit(mtr);
1296 mtr_start(mtr);
1297
1298 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1299
1300 return err;
1301 }
1302
1303 /** Sets a lock on a record. Used in locking possible duplicate key
1304 records and also in checking foreign key constraints.
1305 @param[in] mode requested lock type: LOCK_S or LOCK_X mode
1306 @param[in] type LOCK_ORDINARY, LOCK_GAP, or LOCK_REC_NOT_GAP type lock
1307 @param[in] block buffer block of rec
1308 @param[in] rec record
1309 @param[in] index index
1310 @param[in] offsets rec_get_offsets(rec, index)
1311 @param[in] thr query thread
1312 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
row_ins_set_rec_lock(lock_mode mode,ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1313 static dberr_t row_ins_set_rec_lock(lock_mode mode, ulint type,
1314 const buf_block_t *block, const rec_t *rec,
1315 dict_index_t *index, const ulint *offsets,
1316 que_thr_t *thr) {
1317 dberr_t err;
1318
1319 ut_ad(rec_offs_validate(rec, index, offsets));
1320
1321 if (index->is_clustered()) {
1322 err = lock_clust_rec_read_check_and_lock(
1323 lock_duration_t::AT_LEAST_STATEMENT, block, rec, index, offsets,
1324 SELECT_ORDINARY, mode, type, thr);
1325 } else {
1326 err = lock_sec_rec_read_check_and_lock(lock_duration_t::AT_LEAST_STATEMENT,
1327 block, rec, index, offsets,
1328 SELECT_ORDINARY, mode, type, thr);
1329 }
1330
1331 return (err);
1332 }
1333
1334 /* Decrement a counter in the destructor. */
1335 class ib_dec_in_dtor {
1336 public:
ib_dec_in_dtor(ulint & c)1337 ib_dec_in_dtor(ulint &c) : counter(c) {}
~ib_dec_in_dtor()1338 ~ib_dec_in_dtor() { os_atomic_decrement_ulint(&counter, 1); }
1339
1340 private:
1341 ulint &counter;
1342 };
1343
1344 /** Checks if foreign key constraint fails for an index entry. Sets shared locks
1345 which lock either the success or the failure of the constraint. NOTE that
1346 the caller must have a shared latch on dict_operation_lock.
1347 @return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1348 dberr_t row_ins_check_foreign_constraint(
1349 ibool check_ref, /*!< in: TRUE if we want to check that
1350 the referenced table is ok, FALSE if we
1351 want to check the foreign key table */
1352 dict_foreign_t *foreign, /*!< in: foreign constraint; NOTE that the
1353 tables mentioned in it must be in the
1354 dictionary cache if they exist at all */
1355 dict_table_t *table, /*!< in: if check_ref is TRUE, then the foreign
1356 table, else the referenced table */
1357 dtuple_t *entry, /*!< in: index entry for index */
1358 que_thr_t *thr) /*!< in: query thread */
1359 {
1360 dberr_t err;
1361 upd_node_t *upd_node;
1362 dict_table_t *check_table = nullptr;
1363 dict_index_t *check_index = nullptr;
1364 ulint n_fields_cmp;
1365 btr_pcur_t pcur;
1366 int cmp;
1367 mtr_t mtr;
1368 trx_t *trx = thr_get_trx(thr);
1369 mem_heap_t *heap = nullptr;
1370 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1371 ulint *offsets = offsets_;
1372
1373 bool skip_gap_lock;
1374 MDL_ticket *mdl = nullptr;
1375 THD *thd = current_thd;
1376 bool tmp_open = false;
1377 dict_foreign_t *tmp_foreign = nullptr;
1378
1379 /* GAP locks are not needed on DD tables because serializability between
1380 different DDL statements is achieved using metadata locks. So no concurrent
1381 changes to DD tables when MDL is taken. */
1382 skip_gap_lock = (trx->isolation_level <= TRX_ISO_READ_COMMITTED) ||
1383 table->skip_gap_locks();
1384
1385 DBUG_TRACE;
1386
1387 if (dict_sys_t::is_dd_table_id(table->id)) {
1388 return DB_SUCCESS;
1389 }
1390
1391 rec_offs_init(offsets_);
1392
1393 err = DB_SUCCESS;
1394
1395 if (trx->check_foreigns == FALSE) {
1396 /* The user has suppressed foreign key checks currently for
1397 this session */
1398 goto exit_func;
1399 }
1400
1401 /* If any of the foreign key fields in entry is SQL NULL, we
1402 suppress the foreign key check: this is compatible with Oracle,
1403 for example */
1404 for (ulint i = 0; i < foreign->n_fields; i++) {
1405 if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1406 goto exit_func;
1407 }
1408 }
1409
1410 if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1411 upd_node = static_cast<upd_node_t *>(thr->run_node);
1412
1413 if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1414 /* If a cascaded update is done as defined by a
1415 foreign key constraint, do not check that
1416 constraint for the child row. In ON UPDATE CASCADE
1417 the update of the parent row is only half done when
1418 we come here: if we would check the constraint here
1419 for the child row it would fail.
1420
1421 A QUESTION remains: if in the child table there are
1422 several constraints which refer to the same parent
1423 table, we should merge all updates to the child as
1424 one update? And the updates can be contradictory!
1425 Currently we just perform the update associated
1426 with each foreign key constraint, one after
1427 another, and the user has problems predicting in
1428 which order they are performed. */
1429
1430 goto exit_func;
1431 }
1432 }
1433
1434 if (check_ref) {
1435 check_table = foreign->referenced_table;
1436 check_index = foreign->referenced_index;
1437
1438 /* TODO: NewDD: Need to wait for WL#6049, as current
1439 referenced table does have has any info of any FK
1440 referencing to it */
1441 if (check_table == nullptr && foreign->referenced_table_name_lookup) {
1442 ut_ad(check_index == nullptr);
1443
1444 mutex_enter(&dict_sys->mutex);
1445 check_table = dd_table_open_on_name(thd, &mdl,
1446 foreign->referenced_table_name_lookup,
1447 true, DICT_ERR_IGNORE_NONE);
1448
1449 if (check_table) {
1450 check_index = dict_foreign_find_index(
1451 check_table, nullptr, foreign->referenced_col_names,
1452 foreign->n_fields, foreign->foreign_index, false, false);
1453
1454 if (!check_index) {
1455 dd_table_close(check_table, thd, &mdl, true);
1456 check_table = nullptr;
1457 }
1458 }
1459
1460 if (check_table && check_index) {
1461 tmp_foreign = static_cast<dict_foreign_t *>(
1462 ut_malloc_nokey(sizeof(dict_foreign_t)));
1463
1464 memcpy(tmp_foreign, foreign, sizeof(*foreign));
1465
1466 foreign = tmp_foreign;
1467 foreign->referenced_table = check_table;
1468 foreign->referenced_index = check_index;
1469 tmp_open = true;
1470 }
1471 mutex_exit(&dict_sys->mutex);
1472 }
1473 } else {
1474 check_table = foreign->foreign_table;
1475 check_index = foreign->foreign_index;
1476 }
1477
1478 if (check_table == nullptr || check_table->ibd_file_missing ||
1479 check_index == nullptr) {
1480 if (!srv_read_only_mode && check_ref) {
1481 FILE *ef = dict_foreign_err_file;
1482
1483 row_ins_set_detailed(trx, foreign);
1484
1485 row_ins_foreign_trx_print(trx);
1486
1487 fputs("Foreign key constraint fails for table ", ef);
1488 ut_print_name(ef, trx, foreign->foreign_table_name);
1489 fputs(":\n", ef);
1490 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign, TRUE);
1491 fprintf(ef, "\nTrying to add to index %s tuple:\n",
1492 foreign->foreign_index->name());
1493 dtuple_print(ef, entry);
1494 fputs("\nBut the parent table ", ef);
1495 ut_print_name(ef, trx, foreign->referenced_table_name);
1496 fputs(
1497 "\nor its .ibd file does"
1498 " not currently exist!\n",
1499 ef);
1500 mutex_exit(&dict_foreign_err_mutex);
1501
1502 err = DB_NO_REFERENCED_ROW;
1503 }
1504
1505 goto exit_func;
1506 }
1507
1508 if (check_table != table) {
1509 /* We already have a LOCK_IX on table, but not necessarily
1510 on check_table */
1511
1512 err = lock_table(0, check_table, LOCK_IS, thr);
1513
1514 if (err != DB_SUCCESS) {
1515 goto do_possible_lock_wait;
1516 }
1517 }
1518
1519 mtr_start(&mtr);
1520
1521 /* Store old value on n_fields_cmp */
1522
1523 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1524
1525 dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1526
1527 btr_pcur_open(check_index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
1528
1529 /* Scan index records and check if there is a matching record */
1530
1531 do {
1532 const rec_t *rec = btr_pcur_get_rec(&pcur);
1533 const buf_block_t *block = btr_pcur_get_block(&pcur);
1534
1535 if (page_rec_is_infimum(rec)) {
1536 continue;
1537 }
1538
1539 offsets =
1540 rec_get_offsets(rec, check_index, offsets, ULINT_UNDEFINED, &heap);
1541
1542 if (page_rec_is_supremum(rec)) {
1543 if (skip_gap_lock) {
1544 continue;
1545 }
1546
1547 err = row_ins_set_rec_lock(LOCK_S, LOCK_ORDINARY, block, rec, check_index,
1548 offsets, thr);
1549 switch (err) {
1550 case DB_SUCCESS_LOCKED_REC:
1551 case DB_SUCCESS:
1552 continue;
1553 default:
1554 goto end_scan;
1555 }
1556 }
1557
1558 cmp = cmp_dtuple_rec(entry, rec, check_index, offsets);
1559
1560 if (cmp == 0) {
1561 ulint lock_type;
1562
1563 lock_type = skip_gap_lock ? LOCK_REC_NOT_GAP : LOCK_ORDINARY;
1564
1565 if (rec_get_deleted_flag(rec, rec_offs_comp(offsets))) {
1566 err = row_ins_set_rec_lock(LOCK_S, lock_type, block, rec, check_index,
1567 offsets, thr);
1568 switch (err) {
1569 case DB_SUCCESS_LOCKED_REC:
1570 case DB_SUCCESS:
1571 break;
1572 default:
1573 goto end_scan;
1574 }
1575 } else {
1576 /* Found a matching record. Lock only
1577 a record because we can allow inserts
1578 into gaps */
1579
1580 err = row_ins_set_rec_lock(LOCK_S, LOCK_REC_NOT_GAP, block, rec,
1581 check_index, offsets, thr);
1582
1583 switch (err) {
1584 case DB_SUCCESS_LOCKED_REC:
1585 case DB_SUCCESS:
1586 break;
1587 default:
1588 goto end_scan;
1589 }
1590
1591 if (check_ref) {
1592 err = DB_SUCCESS;
1593
1594 goto end_scan;
1595 } else if (foreign->type != 0) {
1596 /* There is an ON UPDATE or ON DELETE
1597 condition: check them in a separate
1598 function */
1599
1600 err = row_ins_foreign_check_on_constraint(thr, foreign, &pcur, entry,
1601 &mtr);
1602 if (err != DB_SUCCESS) {
1603 /* Since reporting a plain
1604 "duplicate key" error
1605 message to the user in
1606 cases where a long CASCADE
1607 operation would lead to a
1608 duplicate key in some
1609 other table is very
1610 confusing, map duplicate
1611 key errors resulting from
1612 FK constraints to a
1613 separate error code. */
1614
1615 if (err == DB_DUPLICATE_KEY) {
1616 err = DB_FOREIGN_DUPLICATE_KEY;
1617 }
1618
1619 goto end_scan;
1620 }
1621
1622 /* row_ins_foreign_check_on_constraint
1623 may have repositioned pcur on a
1624 different block */
1625 block = btr_pcur_get_block(&pcur);
1626 } else {
1627 row_ins_foreign_report_err("Trying to delete or update", thr, foreign,
1628 rec, entry);
1629
1630 err = DB_ROW_IS_REFERENCED;
1631 goto end_scan;
1632 }
1633 }
1634 } else {
1635 ut_a(cmp < 0);
1636
1637 err = DB_SUCCESS;
1638
1639 if (!skip_gap_lock) {
1640 err = row_ins_set_rec_lock(LOCK_S, LOCK_GAP, block, rec, check_index,
1641 offsets, thr);
1642 }
1643
1644 switch (err) {
1645 case DB_SUCCESS_LOCKED_REC:
1646 case DB_SUCCESS:
1647 if (check_ref) {
1648 err = DB_NO_REFERENCED_ROW;
1649 row_ins_foreign_report_add_err(trx, foreign, rec, entry);
1650 } else {
1651 err = DB_SUCCESS;
1652 }
1653 default:
1654 break;
1655 }
1656
1657 goto end_scan;
1658 }
1659 } while (btr_pcur_move_to_next(&pcur, &mtr));
1660
1661 if (check_ref) {
1662 row_ins_foreign_report_add_err(trx, foreign, btr_pcur_get_rec(&pcur),
1663 entry);
1664 err = DB_NO_REFERENCED_ROW;
1665 } else {
1666 err = DB_SUCCESS;
1667 }
1668
1669 end_scan:
1670 btr_pcur_close(&pcur);
1671
1672 mtr_commit(&mtr);
1673
1674 /* Restore old value */
1675 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1676
1677 do_possible_lock_wait:
1678 if (err == DB_LOCK_WAIT) {
1679 /* An object that will correctly decrement the FK check counter
1680 when it goes out of this scope. */
1681 ib_dec_in_dtor dec(check_table->n_foreign_key_checks_running);
1682
1683 trx->error_state = err;
1684
1685 que_thr_stop_for_mysql(thr);
1686
1687 thr->lock_state = QUE_THR_LOCK_ROW;
1688
1689 /* To avoid check_table being dropped, increment counter */
1690 os_atomic_increment_ulint(&check_table->n_foreign_key_checks_running, 1);
1691
1692 trx_kill_blocking(trx);
1693
1694 lock_wait_suspend_thread(thr);
1695
1696 if (trx->error_state != DB_SUCCESS) {
1697 err = trx->error_state;
1698 goto exit_func;
1699 }
1700
1701 thr->lock_state = QUE_THR_LOCK_NOLOCK;
1702
1703 DBUG_PRINT("to_be_dropped", ("table: %s", check_table->name.m_name));
1704 if (check_table->to_be_dropped) {
1705 /* The table is being dropped. We shall timeout
1706 this operation */
1707 err = DB_LOCK_WAIT_TIMEOUT;
1708
1709 goto exit_func;
1710 }
1711 }
1712
1713 exit_func:
1714 if (heap != nullptr) {
1715 mem_heap_free(heap);
1716 }
1717
1718 /* TODO: NewDD: Remove this after WL#6049 */
1719 if (tmp_open) {
1720 mutex_enter(&dict_sys->mutex);
1721 dd_table_close(check_table, thd, &mdl, true);
1722 ut_free(tmp_foreign);
1723 mutex_exit(&dict_sys->mutex);
1724 }
1725
1726 return err;
1727 }
1728
1729 /** Checks if foreign key constraints fail for an index entry. If index
1730 is not mentioned in any constraint, this function does nothing,
1731 Otherwise does searches to the indexes of referenced tables and
1732 sets shared locks which lock either the success or the failure of
1733 a constraint.
1734 @return DB_SUCCESS or error code */
1735 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1736 row_ins_check_foreign_constraints(
1737 dict_table_t *table, /*!< in: table */
1738 dict_index_t *index, /*!< in: index */
1739 dtuple_t *entry, /*!< in: index entry for index */
1740 que_thr_t *thr) /*!< in: query thread */
1741 {
1742 dict_foreign_t *foreign;
1743 dberr_t err;
1744 trx_t *trx;
1745
1746 /* Temporarily skip the FK check for DD tables */
1747 if (dict_sys_t::is_dd_table_id(table->id)) {
1748 return (DB_SUCCESS);
1749 }
1750
1751 trx = thr_get_trx(thr);
1752
1753 if (trx->check_foreigns == FALSE) {
1754 return (DB_SUCCESS);
1755 }
1756 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1757 "foreign_constraint_check_for_ins");
1758
1759 for (dict_foreign_set::iterator it = table->foreign_set.begin();
1760 it != table->foreign_set.end(); ++it) {
1761 foreign = *it;
1762
1763 if (foreign->foreign_index == index) {
1764 dict_table_t *ref_table = nullptr;
1765 dict_table_t *foreign_table = foreign->foreign_table;
1766 dict_table_t *referenced_table = foreign->referenced_table;
1767 MDL_ticket *mdl = nullptr;
1768
1769 if (referenced_table == nullptr) {
1770 ref_table = dd_table_open_on_name(trx->mysql_thd, &mdl,
1771 foreign->referenced_table_name_lookup,
1772 false, DICT_ERR_IGNORE_NONE);
1773 }
1774
1775 if (referenced_table) {
1776 os_atomic_increment_ulint(&foreign_table->n_foreign_key_checks_running,
1777 1);
1778 }
1779
1780 /* NOTE that if the thread ends up waiting for a lock
1781 we will release dict_operation_lock temporarily!
1782 But the counter on the table protects the referenced
1783 table from being dropped while the check is running. */
1784
1785 err = row_ins_check_foreign_constraint(TRUE, foreign, table, entry, thr);
1786
1787 if (referenced_table) {
1788 os_atomic_decrement_ulint(&foreign_table->n_foreign_key_checks_running,
1789 1);
1790 }
1791 if (ref_table != nullptr) {
1792 dd_table_close(ref_table, trx->mysql_thd, &mdl, false);
1793 }
1794
1795 if (err != DB_SUCCESS) {
1796 return (err);
1797 }
1798 }
1799 }
1800
1801 return (DB_SUCCESS);
1802 }
1803
1804 /** Checks if a unique key violation to rec would occur at the index entry
1805 insert.
1806 @return true if error */
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1807 static ibool row_ins_dupl_error_with_rec(
1808 const rec_t *rec, /*!< in: user record; NOTE that we assume
1809 that the caller already has a record lock on
1810 the record! */
1811 const dtuple_t *entry, /*!< in: entry to insert */
1812 dict_index_t *index, /*!< in: index */
1813 const ulint *offsets) /*!< in: rec_get_offsets(rec, index) */
1814 {
1815 ulint matched_fields;
1816 ulint n_unique;
1817 ulint i;
1818
1819 ut_ad(rec_offs_validate(rec, index, offsets));
1820
1821 n_unique = dict_index_get_n_unique(index);
1822
1823 matched_fields = 0;
1824
1825 entry->compare(rec, index, offsets, &matched_fields);
1826
1827 if (matched_fields < n_unique) {
1828 return (FALSE);
1829 }
1830
1831 /* In a unique secondary index we allow equal key values if they
1832 contain SQL NULLs */
1833
1834 if (!index->is_clustered() && !index->nulls_equal) {
1835 for (i = 0; i < n_unique; i++) {
1836 if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1837 return (FALSE);
1838 }
1839 }
1840 }
1841
1842 return (!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1843 }
1844
1845 /** Determines if the query is REPLACE or ON DUPLICATE KEY UPDATE in which case
1846 duplicate values should be allowed (and further processed) instead of causing
1847 an error
1848 @param thr The query thread running the query
1849 @return true iff duplicated values should be allowed */
row_allow_duplicates(que_thr_t * thr)1850 static bool row_allow_duplicates(que_thr_t *thr) {
1851 return (thr->prebuilt && thr->prebuilt->allow_duplicates());
1852 }
1853
1854 /** Scans a unique non-clustered index at a given index entry to determine
1855 whether a uniqueness violation has occurred for the key value of the entry.
1856 Set shared locks on possible duplicate records.
1857 @return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
1858 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)1859 row_ins_scan_sec_index_for_duplicate(
1860 ulint flags, /*!< in: undo logging and locking flags */
1861 dict_index_t *index, /*!< in: non-clustered unique index */
1862 dtuple_t *entry, /*!< in: index entry */
1863 que_thr_t *thr, /*!< in: query thread */
1864 bool s_latch, /*!< in: whether index->lock is being held */
1865 mtr_t *mtr, /*!< in/out: mini-transaction */
1866 mem_heap_t *offsets_heap)
1867 /*!< in/out: memory heap that can be emptied */
1868 {
1869 ulint n_unique;
1870 int cmp;
1871 ulint n_fields_cmp;
1872 btr_pcur_t pcur;
1873 dberr_t err = DB_SUCCESS;
1874 ulint allow_duplicates;
1875 ulint *offsets = nullptr;
1876 DBUG_TRACE;
1877
1878 ut_ad(s_latch ==
1879 rw_lock_own_flagged(&index->lock, RW_LOCK_FLAG_S | RW_LOCK_FLAG_SX));
1880
1881 n_unique = dict_index_get_n_unique(index);
1882
1883 /* If the secondary index is unique, but one of the fields in the
1884 n_unique first fields is NULL, a unique key violation cannot occur,
1885 since we define NULL != NULL in this case */
1886
1887 if (!index->nulls_equal) {
1888 for (ulint i = 0; i < n_unique; i++) {
1889 if (UNIV_SQL_NULL == dfield_get_len(dtuple_get_nth_field(entry, i))) {
1890 return DB_SUCCESS;
1891 }
1892 }
1893 }
1894
1895 /* Store old value on n_fields_cmp */
1896
1897 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1898
1899 dtuple_set_n_fields_cmp(entry, n_unique);
1900
1901 btr_pcur_open(
1902 index, entry, PAGE_CUR_GE,
1903 s_latch ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED : BTR_SEARCH_LEAF,
1904 &pcur, mtr);
1905 allow_duplicates = row_allow_duplicates(thr);
1906
1907 /* Scan index records and check if there is a duplicate */
1908
1909 do {
1910 const rec_t *rec = btr_pcur_get_rec(&pcur);
1911 const buf_block_t *block = btr_pcur_get_block(&pcur);
1912
1913 /* For DD tables, We don't use next-key locking for duplicates
1914 found. This means it is possible for another transaction to
1915 insert a duplicate key value but MDL protection on DD tables
1916 will prevent insertion of duplicates into unique secondary indexes*/
1917 const ulint lock_type =
1918 index->table->skip_gap_locks() ? LOCK_REC_NOT_GAP : LOCK_ORDINARY;
1919
1920 if (page_rec_is_infimum(rec)) {
1921 continue;
1922 }
1923
1924 offsets =
1925 rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &offsets_heap);
1926
1927 if (flags & BTR_NO_LOCKING_FLAG) {
1928 /* Set no locks when applying log
1929 in online table rebuild. */
1930 } else if (allow_duplicates) {
1931 #if 0 // TODO: Enable this assert after WL#9509. REPLACE will not be allowed on
1932 // DD tables
1933 /* This assert means DD tables should not use REPLACE
1934 or INSERT INTO table.. ON DUPLCIATE KEY */
1935 ut_ad(!index->table->is_dd_table);
1936 #endif
1937
1938 #if 1 // TODO: Remove this code after WL#9509. REPLACE will not be allowed on
1939 // DD tables
1940 if (index->table->skip_gap_locks()) {
1941 /* Only GAP lock is possible on supremum. */
1942 if (page_rec_is_supremum(rec)) {
1943 continue;
1944 }
1945 }
1946 #endif
1947
1948 /* If the SQL-query will update or replace duplicate key we will take
1949 X-lock for duplicates ( REPLACE, LOAD DATAFILE REPLACE, INSERT ON
1950 DUPLICATE KEY UPDATE). */
1951 err = row_ins_set_rec_lock(LOCK_X, lock_type, block, rec, index, offsets,
1952 thr);
1953 } else {
1954 if (index->table->skip_gap_locks()) {
1955 /* Only GAP lock is possible on supremum. */
1956 if (page_rec_is_supremum(rec)) {
1957 continue;
1958 }
1959
1960 if (cmp_dtuple_rec(entry, rec, index, offsets) < 0) {
1961 goto end_scan;
1962 }
1963 }
1964
1965 err = row_ins_set_rec_lock(LOCK_S, lock_type, block, rec, index, offsets,
1966 thr);
1967 }
1968
1969 switch (err) {
1970 case DB_SUCCESS_LOCKED_REC:
1971 err = DB_SUCCESS;
1972 case DB_SUCCESS:
1973 break;
1974 default:
1975 goto end_scan;
1976 }
1977
1978 if (page_rec_is_supremum(rec)) {
1979 continue;
1980 }
1981
1982 cmp = cmp_dtuple_rec(entry, rec, index, offsets);
1983
1984 if (cmp == 0 && !index->allow_duplicates) {
1985 if (row_ins_dupl_error_with_rec(rec, entry, index, offsets)) {
1986 err = DB_DUPLICATE_KEY;
1987
1988 thr_get_trx(thr)->error_index = index;
1989
1990 /* If the duplicate is on hidden FTS_DOC_ID,
1991 state so in the error log */
1992 if (index == index->table->fts_doc_id_index &&
1993 DICT_TF2_FLAG_IS_SET(index->table, DICT_TF2_FTS_HAS_DOC_ID)) {
1994 ib::error(ER_IB_MSG_958) << "Duplicate FTS_DOC_ID"
1995 " value on table "
1996 << index->table->name;
1997 }
1998
1999 goto end_scan;
2000 }
2001 } else {
2002 ut_a(cmp < 0 || index->allow_duplicates);
2003 goto end_scan;
2004 }
2005 } while (btr_pcur_move_to_next(&pcur, mtr));
2006
2007 end_scan:
2008 /* Restore old value */
2009 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2010
2011 return err;
2012 }
2013
2014 /** Checks for a duplicate when the table is being rebuilt online.
2015 @param[in] n_uniq offset of DB_TRX_ID
2016 @param[in] entry entry being inserted
2017 @param[in] rec clustered index record at insert position
2018 @param[in] index clustered index
2019 @param[in,out] offsets rec_get_offsets(rec)
2020 @retval DB_SUCCESS when no duplicate is detected
2021 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2022 a newer version of entry (the entry should not be inserted)
2023 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2024 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,const dict_index_t * index,ulint * offsets)2025 row_ins_duplicate_online(ulint n_uniq, const dtuple_t *entry,
2026 const rec_t *rec, const dict_index_t *index,
2027 ulint *offsets) {
2028 ulint fields = 0;
2029
2030 /* During rebuild, there should not be any delete-marked rows
2031 in the new table. */
2032 ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2033 ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2034
2035 /* Compare the PRIMARY KEY fields and the
2036 DB_TRX_ID, DB_ROLL_PTR. */
2037 cmp_dtuple_rec_with_match_low(entry, rec, index, offsets, n_uniq + 2,
2038 &fields);
2039
2040 if (fields < n_uniq) {
2041 /* Not a duplicate. */
2042 return (DB_SUCCESS);
2043 }
2044
2045 if (fields == n_uniq + 2) {
2046 /* rec is an exact match of entry. */
2047 return (DB_SUCCESS_LOCKED_REC);
2048 }
2049
2050 return (DB_DUPLICATE_KEY);
2051 }
2052
2053 /** Checks for a duplicate when the table is being rebuilt online.
2054 @retval DB_SUCCESS when no duplicate is detected
2055 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2056 a newer version of entry (the entry should not be inserted)
2057 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2058 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2059 row_ins_duplicate_error_in_clust_online(
2060 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2061 const dtuple_t *entry, /*!< in: entry that is being inserted */
2062 const btr_cur_t *cursor, /*!< in: cursor on insert position */
2063 ulint **offsets, /*!< in/out: rec_get_offsets(rec) */
2064 mem_heap_t **heap) /*!< in/out: heap for offsets */
2065 {
2066 dberr_t err = DB_SUCCESS;
2067 const rec_t *rec = btr_cur_get_rec(cursor);
2068
2069 if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2070 *offsets =
2071 rec_get_offsets(rec, cursor->index, *offsets, ULINT_UNDEFINED, heap);
2072 err = row_ins_duplicate_online(n_uniq, entry, rec, cursor->index, *offsets);
2073 if (err != DB_SUCCESS) {
2074 return (err);
2075 }
2076 }
2077
2078 rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2079
2080 if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2081 *offsets =
2082 rec_get_offsets(rec, cursor->index, *offsets, ULINT_UNDEFINED, heap);
2083 err = row_ins_duplicate_online(n_uniq, entry, rec, cursor->index, *offsets);
2084 }
2085
2086 return (err);
2087 }
2088
2089 /** Checks if a unique key violation error would occur at an index entry
2090 insert. Sets shared locks on possible duplicate records. Works only
2091 for a clustered index!
2092 @retval DB_SUCCESS if no error
2093 @retval DB_DUPLICATE_KEY if error,
2094 @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2095 record
2096 @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2097 in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2098 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2099 row_ins_duplicate_error_in_clust(
2100 ulint flags, /*!< in: undo logging and locking flags */
2101 btr_cur_t *cursor, /*!< in: B-tree cursor */
2102 const dtuple_t *entry, /*!< in: entry to insert */
2103 que_thr_t *thr, /*!< in: query thread */
2104 mtr_t *mtr) /*!< in: mtr */
2105 {
2106 dberr_t err;
2107 rec_t *rec;
2108 ulint n_unique;
2109 trx_t *trx = thr_get_trx(thr);
2110 mem_heap_t *heap = nullptr;
2111 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2112 ulint *offsets = offsets_;
2113 rec_offs_init(offsets_);
2114
2115 UT_NOT_USED(mtr);
2116
2117 ut_ad(cursor->index->is_clustered());
2118
2119 /* NOTE: For unique non-clustered indexes there may be any number
2120 of delete marked records with the same value for the non-clustered
2121 index key (remember multiversioning), and which differ only in
2122 the row refererence part of the index record, containing the
2123 clustered index key fields. For such a secondary index record,
2124 to avoid race condition, we must FIRST do the insertion and after
2125 that check that the uniqueness condition is not breached! */
2126
2127 /* NOTE: A problem is that in the B-tree node pointers on an
2128 upper level may match more to the entry than the actual existing
2129 user records on the leaf level. So, even if low_match would suggest
2130 that a duplicate key violation may occur, this may not be the case. */
2131
2132 n_unique = dict_index_get_n_unique(cursor->index);
2133
2134 if (cursor->low_match >= n_unique) {
2135 rec = btr_cur_get_rec(cursor);
2136
2137 if (!page_rec_is_infimum(rec)) {
2138 offsets =
2139 rec_get_offsets(rec, cursor->index, offsets, ULINT_UNDEFINED, &heap);
2140
2141 /* We set a lock on the possible duplicate: this
2142 is needed in logical logging of MySQL to make
2143 sure that in roll-forward we get the same duplicate
2144 errors as in original execution */
2145
2146 if (flags & BTR_NO_LOCKING_FLAG) {
2147 /* Do nothing if no-locking is set */
2148 err = DB_SUCCESS;
2149 } else {
2150 /* If the SQL-query will update or replace
2151 duplicate key we will take X-lock for
2152 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2153 INSERT ON DUPLICATE KEY UPDATE). */
2154
2155 err = row_ins_set_rec_lock(row_allow_duplicates(thr) ? LOCK_X : LOCK_S,
2156 LOCK_REC_NOT_GAP, btr_cur_get_block(cursor),
2157 rec, cursor->index, offsets, thr);
2158 }
2159
2160 switch (err) {
2161 case DB_SUCCESS_LOCKED_REC:
2162 case DB_SUCCESS:
2163 break;
2164 default:
2165 goto func_exit;
2166 }
2167
2168 if (row_ins_dupl_error_with_rec(rec, entry, cursor->index, offsets)) {
2169 duplicate:
2170 trx->error_index = cursor->index;
2171 err = DB_DUPLICATE_KEY;
2172 goto func_exit;
2173 }
2174 }
2175 }
2176
2177 if (cursor->up_match >= n_unique) {
2178 rec = page_rec_get_next(btr_cur_get_rec(cursor));
2179
2180 if (!page_rec_is_supremum(rec)) {
2181 offsets =
2182 rec_get_offsets(rec, cursor->index, offsets, ULINT_UNDEFINED, &heap);
2183
2184 /* If the SQL-query will update or replace
2185 duplicate key we will take X-lock for
2186 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2187 INSERT ON DUPLICATE KEY UPDATE). */
2188
2189 err = row_ins_set_rec_lock(row_allow_duplicates(thr) ? LOCK_X : LOCK_S,
2190 LOCK_REC_NOT_GAP, btr_cur_get_block(cursor),
2191 rec, cursor->index, offsets, thr);
2192
2193 switch (err) {
2194 case DB_SUCCESS_LOCKED_REC:
2195 case DB_SUCCESS:
2196 break;
2197 default:
2198 goto func_exit;
2199 }
2200
2201 if (row_ins_dupl_error_with_rec(rec, entry, cursor->index, offsets)) {
2202 goto duplicate;
2203 }
2204 }
2205
2206 /* This should never happen */
2207 ut_error;
2208 }
2209
2210 err = DB_SUCCESS;
2211 func_exit:
2212 if (UNIV_LIKELY_NULL(heap)) {
2213 mem_heap_free(heap);
2214 }
2215 return (err);
2216 }
2217
2218 /** Checks if an index entry has long enough common prefix with an
2219 existing record so that the intended insert of the entry must be
2220 changed to a modify of the existing record. In the case of a clustered
2221 index, the prefix must be n_unique fields long. In the case of a
2222 secondary index, all fields must be equal. InnoDB never updates
2223 secondary index records in place, other than clearing or setting the
2224 delete-mark flag. We could be able to update the non-unique fields
2225 of a unique secondary index record by checking the cursor->up_match,
2226 but we do not do so, because it could have some locking implications.
2227 @return true if the existing record should be updated; false if not */
2228 UNIV_INLINE
row_ins_must_modify_rec(const btr_cur_t * cursor)2229 ibool row_ins_must_modify_rec(const btr_cur_t *cursor) /*!< in: B-tree cursor */
2230 {
2231 /* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2232 Because node pointers on upper levels of the B-tree may match more
2233 to entry than to actual user records on the leaf level, we
2234 have to check if the candidate record is actually a user record.
2235 A clustered index node pointer contains index->n_unique first fields,
2236 and a secondary index node pointer contains all index fields. */
2237
2238 return (cursor->low_match >= dict_index_get_n_unique_in_tree(cursor->index) &&
2239 !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2240 }
2241
2242 /** Insert the externally stored fields (off-page columns)
2243 of a clustered index entry.
2244 @param[in] trx current transaction
2245 @param[in] entry index entry to insert
2246 @param[in] big_rec externally stored fields
2247 @param[in,out] offsets rec_get_offsets()
2248 @param[in,out] heap memory heap */
2249 #ifdef UNIV_DEBUG
2250 /**
2251 @param[in] thd client connection, or NULL */
2252 #endif /* UNIV_DEBUG */
2253 /**
2254 @param[in] index clustered index
2255 @return error code
2256 @retval DB_SUCCESS
2257 @retval DB_OUT_OF_FILE_SPACE */
row_ins_index_entry_big_rec_func(trx_t * trx,const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,const THD * thd,dict_index_t * index)2258 static dberr_t row_ins_index_entry_big_rec_func(trx_t *trx,
2259 const dtuple_t *entry,
2260 const big_rec_t *big_rec,
2261 ulint *offsets,
2262 mem_heap_t **heap,
2263 #ifdef UNIV_DEBUG
2264 const THD *thd,
2265 #endif /* UNIV_DEBUG */
2266 dict_index_t *index) {
2267 mtr_t mtr;
2268 btr_pcur_t pcur;
2269 rec_t *rec;
2270 dberr_t error;
2271
2272 ut_ad(index->is_clustered());
2273
2274 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
2275 DEBUG_SYNC_C("before_insertion_of_blob");
2276
2277 mtr_start(&mtr);
2278
2279 dict_disable_redo_if_temporary(index->table, &mtr);
2280
2281 btr_pcur_open(index, entry, PAGE_CUR_LE, BTR_MODIFY_TREE, &pcur, &mtr);
2282 rec = btr_pcur_get_rec(&pcur);
2283 offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, heap);
2284
2285 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
2286 error = lob::btr_store_big_rec_extern_fields(
2287 trx, &pcur, nullptr, offsets, big_rec, &mtr, lob::OPCODE_INSERT);
2288 DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
2289
2290 if (error == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2291 row_log_table_insert(btr_pcur_get_rec(&pcur), entry, index, offsets);
2292 }
2293
2294 mtr_commit(&mtr);
2295
2296 btr_pcur_close(&pcur);
2297
2298 return (error);
2299 }
2300
2301 #ifndef UNIV_DEBUG
2302 #define row_ins_index_entry_big_rec(trx, e, big, ofs, heap, index, thd) \
2303 row_ins_index_entry_big_rec_func(trx, e, big, ofs, heap, index)
2304 #else /* UNIV_DEBUG */
2305 #define row_ins_index_entry_big_rec(trx, e, big, ofs, heap, index, thd) \
2306 row_ins_index_entry_big_rec_func(trx, e, big, ofs, heap, thd, index)
2307 #endif /* UNIV_DEBUG */
2308
2309 /** Update all the prebuilts working on this temporary table
2310 @param[in,out] table dict_table_t for the table */
row_ins_temp_prebuilt_tree_modified(dict_table_t * table)2311 static void row_ins_temp_prebuilt_tree_modified(dict_table_t *table) {
2312 if (table->temp_prebuilt == nullptr) {
2313 return;
2314 }
2315
2316 std::vector<row_prebuilt_t *>::const_iterator it;
2317
2318 for (it = table->temp_prebuilt->begin(); it != table->temp_prebuilt->end();
2319 ++it) {
2320 if ((*it)->m_temp_read_shared) {
2321 (*it)->m_temp_tree_modified = true;
2322 }
2323 }
2324 }
2325
2326 /** Tries to insert an entry into a clustered index, ignoring foreign key
2327 constraints. If a record with the same unique key is found, the other
2328 record is necessarily marked deleted by a committed transaction, or a
2329 unique key violation error occurs. The delete marked record is then
2330 updated to an existing record, and we must write an undo log record on
2331 the delete marked record.
2332 @retval DB_SUCCESS on success
2333 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2334 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2335 @return error code */
row_ins_clust_index_entry_low(uint32_t flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)2336 dberr_t row_ins_clust_index_entry_low(
2337 uint32_t flags, /*!< in: undo logging and locking flags */
2338 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2339 depending on whether we wish optimistic or
2340 pessimistic descent down the index tree */
2341 dict_index_t *index, /*!< in: clustered index */
2342 ulint n_uniq, /*!< in: 0 or index->n_uniq */
2343 dtuple_t *entry, /*!< in/out: index entry to insert */
2344 que_thr_t *thr, /*!< in: query thread, or NULL if
2345 flags & (BTR_NO_LOCKING_FLAG
2346 | BTR_NO_UNDO_LOG_FLAG) and a duplicate
2347 can't occur */
2348 bool dup_chk_only)
2349 /*!< in: if true, just do duplicate check
2350 and return. don't execute actual insert. */
2351 {
2352 btr_pcur_t pcur;
2353 btr_cur_t *cursor;
2354 dberr_t err = DB_SUCCESS;
2355 big_rec_t *big_rec = nullptr;
2356 mtr_t mtr;
2357 mem_heap_t *offsets_heap = nullptr;
2358 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2359 ulint *offsets = offsets_;
2360 rec_offs_init(offsets_);
2361
2362 DBUG_TRACE;
2363
2364 #ifdef UNIV_DEBUG
2365 mtr_t temp_mtr;
2366 temp_mtr.start();
2367 mtr_s_lock(dict_index_get_lock(index), &temp_mtr);
2368
2369 if (btr_height_get(index, &temp_mtr) >= BTR_MAX_NODE_LEVEL &&
2370 btr_cur_limit_optimistic_insert_debug > 1 &&
2371 btr_cur_limit_optimistic_insert_debug < 5) {
2372 ib::error(ER_IB_MSG_BTREE_LEVEL_LIMIT_EXCEEDED, index->name());
2373 temp_mtr.commit();
2374 return (DB_BTREE_LEVEL_LIMIT_EXCEEDED);
2375 }
2376
2377 temp_mtr.commit();
2378 #endif
2379
2380 ut_ad(index->is_clustered());
2381 ut_ad(!dict_index_is_unique(index) ||
2382 n_uniq == dict_index_get_n_unique(index));
2383 ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2384 ut_ad((flags & (BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG)) ||
2385 !thr_get_trx(thr)->in_rollback);
2386 ut_ad(thr != nullptr || !dup_chk_only);
2387
2388 mtr.start();
2389
2390 if (index->table->is_temporary()) {
2391 /* Disable REDO logging as the lifetime of temp-tables is
2392 limited to server or connection lifetime and so REDO
2393 information is not needed on restart for recovery.
2394 Disable locking as temp-tables are local to a connection. */
2395
2396 ut_ad(flags & BTR_NO_LOCKING_FLAG);
2397 ut_ad(!index->table->is_intrinsic() || (flags & BTR_NO_UNDO_LOG_FLAG));
2398
2399 mtr.set_log_mode(MTR_LOG_NO_REDO);
2400 }
2401
2402 if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2403 mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
2404 mtr_s_lock(dict_index_get_lock(index), &mtr);
2405 }
2406
2407 /* Note that we use PAGE_CUR_LE as the search mode, because then
2408 the function will return in both low_match and up_match of the
2409 cursor sensible values */
2410 btr_pcur_open(index, entry, PAGE_CUR_LE, mode, &pcur, &mtr);
2411 cursor = btr_pcur_get_btr_cur(&pcur);
2412 cursor->thr = thr;
2413
2414 ut_ad(!index->table->is_intrinsic() ||
2415 cursor->page_cur.block->made_dirty_with_no_latch);
2416
2417 #ifdef UNIV_DEBUG
2418 {
2419 page_t *page = btr_cur_get_page(cursor);
2420 rec_t *first_rec = page_rec_get_next(page_get_infimum_rec(page));
2421
2422 ut_ad(page_rec_is_supremum(first_rec) ||
2423 rec_n_fields_is_sane(index, first_rec, entry));
2424 }
2425 #endif /* UNIV_DEBUG */
2426
2427 /* Write logs for AUTOINC right after index lock has been got and
2428 before any further resource acquisitions to prevent deadlock.
2429 No need to log for temporary tables and intermediate tables */
2430 if (!index->table->is_temporary() && !index->table->skip_alter_undo &&
2431 dict_table_has_autoinc_col(index->table)) {
2432 ib_uint64_t counter =
2433 row_get_autoinc_counter(entry, index->table->autoinc_field_no);
2434
2435 if (counter != 0) {
2436 /* Always log the counter change first, so it won't
2437 be affected by any follow-up failure. */
2438 dict_table_autoinc_log(index->table, counter, &mtr);
2439 }
2440 }
2441
2442 /* Allowing duplicates in clustered index is currently enabled
2443 only for intrinsic table and caller understand the limited
2444 operation that can be done in this case. */
2445 ut_ad(!index->allow_duplicates ||
2446 (index->allow_duplicates && index->table->is_intrinsic()));
2447
2448 if (!index->allow_duplicates && n_uniq &&
2449 (cursor->up_match >= n_uniq || cursor->low_match >= n_uniq)) {
2450 if (flags == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG |
2451 BTR_KEEP_SYS_FLAG)) {
2452 /* Set no locks when applying log
2453 in online table rebuild. Only check for duplicates. */
2454 err = row_ins_duplicate_error_in_clust_online(n_uniq, entry, cursor,
2455 &offsets, &offsets_heap);
2456
2457 switch (err) {
2458 case DB_SUCCESS:
2459 break;
2460 default:
2461 ut_ad(0);
2462 /* fall through */
2463 case DB_SUCCESS_LOCKED_REC:
2464 case DB_DUPLICATE_KEY:
2465 if (thr != nullptr) {
2466 thr_get_trx(thr)->error_index = cursor->index;
2467 }
2468 }
2469 } else {
2470 /* Note that the following may return also
2471 DB_LOCK_WAIT */
2472
2473 err = row_ins_duplicate_error_in_clust(flags, cursor, entry, thr, &mtr);
2474 }
2475
2476 if (err != DB_SUCCESS) {
2477 err_exit:
2478 mtr.commit();
2479 goto func_exit;
2480 }
2481 }
2482
2483 if (dup_chk_only) {
2484 mtr.commit();
2485 goto func_exit;
2486 }
2487 /* Note: Allowing duplicates would qualify for modification of
2488 an existing record as the new entry is exactly same as old entry.
2489 Avoid this check if allow duplicates is enabled. */
2490 if (!index->allow_duplicates && row_ins_must_modify_rec(cursor)) {
2491 /* There is already an index entry with a long enough common
2492 prefix, we must convert the insert into a modify of an
2493 existing record */
2494 mem_heap_t *entry_heap = mem_heap_create(1024);
2495
2496 /* If the existing record is being modified and the new record
2497 doesn't fit the provided slot then existing record is added
2498 to free list and new record is inserted. This also means
2499 cursor that we have cached for SELECT is now invalid. */
2500 if (index->last_sel_cur) {
2501 index->last_sel_cur->invalid = true;
2502 }
2503
2504 ut_ad(thr != nullptr);
2505 err = row_ins_clust_index_entry_by_modify(&pcur, flags, mode, &offsets,
2506 &offsets_heap, entry_heap, entry,
2507 thr, &mtr);
2508
2509 if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2510 row_log_table_insert(btr_cur_get_rec(cursor), entry, index, offsets);
2511 }
2512
2513 mtr.commit();
2514 mem_heap_free(entry_heap);
2515 } else {
2516 rec_t *insert_rec;
2517
2518 if (mode != BTR_MODIFY_TREE) {
2519 ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
2520 err = btr_cur_optimistic_insert(flags, cursor, &offsets, &offsets_heap,
2521 entry, &insert_rec, &big_rec, thr, &mtr);
2522 } else {
2523 if (buf_LRU_buf_pool_running_out()) {
2524 err = DB_LOCK_TABLE_FULL;
2525 goto err_exit;
2526 }
2527
2528 DEBUG_SYNC_C("before_insert_pessimitic_row_ins_clust");
2529
2530 err = btr_cur_optimistic_insert(flags, cursor, &offsets, &offsets_heap,
2531 entry, &insert_rec, &big_rec, thr, &mtr);
2532
2533 if (err == DB_FAIL) {
2534 err =
2535 btr_cur_pessimistic_insert(flags, cursor, &offsets, &offsets_heap,
2536 entry, &insert_rec, &big_rec, thr, &mtr);
2537
2538 if (index->table->is_intrinsic() && err == DB_SUCCESS) {
2539 row_ins_temp_prebuilt_tree_modified(index->table);
2540 }
2541 }
2542 }
2543
2544 if (big_rec != nullptr) {
2545 mtr.commit();
2546
2547 /* Online table rebuild could read (and
2548 ignore) the incomplete record at this point.
2549 If online rebuild is in progress, the
2550 row_ins_index_entry_big_rec() will write log. */
2551
2552 DBUG_EXECUTE_IF("row_ins_extern_checkpoint",
2553 log_make_latest_checkpoint(););
2554 err = row_ins_index_entry_big_rec(thr_get_trx(thr), entry, big_rec,
2555 offsets, &offsets_heap, index,
2556 thr_get_trx(thr)->mysql_thd);
2557 dtuple_convert_back_big_rec(entry, big_rec);
2558 } else {
2559 if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2560 row_log_table_insert(insert_rec, entry, index, offsets);
2561 }
2562
2563 mtr.commit();
2564 }
2565 }
2566
2567 func_exit:
2568 if (offsets_heap != nullptr) {
2569 mem_heap_free(offsets_heap);
2570 }
2571
2572 btr_pcur_close(&pcur);
2573
2574 DBUG_EXECUTE_IF(
2575 "ib_sdi", if (dict_table_is_sdi(index->table->id)) {
2576 ib::info(ER_IB_MSG_959)
2577 << "ib_sdi: row_ins_clust_index_entry_low: " << index->name << " "
2578 << index->table->name << " return status: " << err;
2579 });
2580
2581 return err;
2582 }
2583
2584 /** This is a specialized function meant for direct insertion to auto-generated
2585 clustered index based on cached position from last successful insert. To be
2586 used when data is sorted.
2587 @param[in] mode BTR_MODIFY_LEAF or BTR_MODIFY_TREE.
2588 depending on whether we wish optimistic or pessimistic
2589 descent down the index tree
2590 @param[in,out] index clustered index
2591 @param[in,out] entry index entry to insert
2592 @param[in] thr query thread
2593 @return error code */
row_ins_sorted_clust_index_entry(ulint mode,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)2594 static dberr_t row_ins_sorted_clust_index_entry(ulint mode, dict_index_t *index,
2595 dtuple_t *entry,
2596 que_thr_t *thr) {
2597 dberr_t err;
2598 mtr_t *mtr;
2599 const bool commit_mtr = mode == BTR_MODIFY_TREE;
2600
2601 mem_heap_t *offsets_heap = nullptr;
2602 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2603 ulint *offsets = offsets_;
2604 rec_offs_init(offsets_);
2605
2606 DBUG_TRACE;
2607
2608 ut_ad(index->last_ins_cur != nullptr);
2609 ut_ad(index->is_clustered());
2610 ut_ad(index->table->is_intrinsic());
2611 ut_ad(dict_index_is_auto_gen_clust(index));
2612
2613 btr_cur_t cursor;
2614 cursor.thr = thr;
2615 mtr = &index->last_ins_cur->mtr;
2616
2617 /* Search for position if tree needs to be split or if last position
2618 is not cached. */
2619 if (mode == BTR_MODIFY_TREE || index->last_ins_cur->rec == nullptr ||
2620 index->last_ins_cur->disable_caching) {
2621 /* Commit the previous mtr. */
2622 index->last_ins_cur->release();
2623
2624 mtr_start(mtr);
2625 mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
2626
2627 btr_cur_search_to_nth_level_with_no_latch(index, 0, entry, PAGE_CUR_LE,
2628 &cursor, __FILE__, __LINE__, mtr);
2629 ut_ad(cursor.page_cur.block != nullptr);
2630 ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2631 } else {
2632 cursor.index = index;
2633
2634 cursor.page_cur.index = index;
2635
2636 cursor.page_cur.rec = index->last_ins_cur->rec;
2637
2638 cursor.page_cur.block = index->last_ins_cur->block;
2639 }
2640
2641 const ulint flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
2642
2643 for (;;) {
2644 rec_t *insert_rec;
2645 big_rec_t *big_rec = nullptr;
2646
2647 if (mode != BTR_MODIFY_TREE) {
2648 ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
2649
2650 err = btr_cur_optimistic_insert(flags, &cursor, &offsets, &offsets_heap,
2651 entry, &insert_rec, &big_rec, thr, mtr);
2652 if (err != DB_SUCCESS) {
2653 break;
2654 }
2655 } else {
2656 /* TODO: Check if this is needed for intrinsic table. */
2657 if (buf_LRU_buf_pool_running_out()) {
2658 err = DB_LOCK_TABLE_FULL;
2659 break;
2660 }
2661
2662 err = btr_cur_optimistic_insert(flags, &cursor, &offsets, &offsets_heap,
2663 entry, &insert_rec, &big_rec, thr, mtr);
2664
2665 if (err == DB_FAIL) {
2666 err =
2667 btr_cur_pessimistic_insert(flags, &cursor, &offsets, &offsets_heap,
2668 entry, &insert_rec, &big_rec, thr, mtr);
2669 if (index->table->is_intrinsic() && err == DB_SUCCESS) {
2670 row_ins_temp_prebuilt_tree_modified(index->table);
2671 }
2672 }
2673 }
2674
2675 if (big_rec != nullptr) {
2676 /* If index involves big-record optimization is
2677 turned-off. */
2678 index->last_ins_cur->release();
2679 index->last_ins_cur->disable_caching = true;
2680
2681 err = row_ins_index_entry_big_rec(thr_get_trx(thr), entry, big_rec,
2682 offsets, &offsets_heap, index,
2683 thr_get_trx(thr)->mysql_thd);
2684
2685 dtuple_convert_back_big_rec(entry, big_rec);
2686
2687 } else if (err == DB_SUCCESS) {
2688 if (!commit_mtr && !index->last_ins_cur->disable_caching) {
2689 index->last_ins_cur->rec = insert_rec;
2690
2691 index->last_ins_cur->block = cursor.page_cur.block;
2692 } else {
2693 index->last_ins_cur->release();
2694 }
2695 }
2696
2697 break;
2698 }
2699
2700 if (err != DB_SUCCESS) {
2701 index->last_ins_cur->release();
2702 }
2703
2704 if (offsets_heap != nullptr) {
2705 mem_heap_free(offsets_heap);
2706 }
2707
2708 return err;
2709 }
2710
2711 /** Start a mini-transaction and check if the index will be dropped.
2712 @param[in,out] mtr mini-transaction
2713 @param[in,out] index secondary index
2714 @param[in] check whether to check
2715 @param[in] search_mode flags
2716 @return true if the index is to be dropped */
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2717 static MY_ATTRIBUTE((warn_unused_result)) bool row_ins_sec_mtr_start_and_check_if_aborted(
2718 mtr_t *mtr, dict_index_t *index, bool check, ulint search_mode) {
2719 ut_ad(!index->is_clustered());
2720
2721 const mtr_log_t log_mode = mtr->get_log_mode();
2722
2723 mtr_start(mtr);
2724
2725 mtr->set_log_mode(log_mode);
2726
2727 if (!check) {
2728 return (false);
2729 }
2730
2731 if (search_mode & BTR_ALREADY_S_LATCHED) {
2732 mtr_s_lock(dict_index_get_lock(index), mtr);
2733 } else {
2734 mtr_sx_lock(dict_index_get_lock(index), mtr);
2735 }
2736
2737 switch (index->online_status) {
2738 case ONLINE_INDEX_ABORTED:
2739 case ONLINE_INDEX_ABORTED_DROPPED:
2740 ut_ad(!index->is_committed());
2741 return (true);
2742 case ONLINE_INDEX_COMPLETE:
2743 return (false);
2744 case ONLINE_INDEX_CREATION:
2745 break;
2746 }
2747
2748 ut_error;
2749 }
2750
2751 /** Tries to insert an entry into a secondary index. If a record with exactly
2752 the same fields is found, the other record is necessarily marked deleted.
2753 It is then unmarked. Otherwise, the entry is just inserted to the index.
2754 @param[in] flags undo logging and locking flags
2755 @param[in] mode BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2756 depending on whether we wish optimistic or
2757 pessimistic descent down the index tree
2758 @param[in] index secondary index
2759 @param[in,out] offsets_heap memory heap that can be emptied
2760 @param[in,out] heap memory heap
2761 @param[in,out] entry index entry to insert
2762 @param[in] trx_id PAGE_MAX_TRX_ID during row_log_table_apply(),
2763 or trx_id when undo log is disabled during
2764 alter copy operation or 0
2765 @param[in] thr query thread
2766 @param[in] dup_chk_only TRUE, just do duplicate check and return.
2767 don't execute actual insert
2768 @retval DB_SUCCESS on success
2769 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2770 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2771 @return error code */
row_ins_sec_index_entry_low(uint32_t flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr,bool dup_chk_only)2772 dberr_t row_ins_sec_index_entry_low(uint32_t flags, ulint mode,
2773 dict_index_t *index,
2774 mem_heap_t *offsets_heap, mem_heap_t *heap,
2775 dtuple_t *entry, trx_id_t trx_id,
2776 que_thr_t *thr, bool dup_chk_only) {
2777 DBUG_TRACE;
2778
2779 btr_cur_t cursor;
2780 ulint search_mode = mode;
2781 dberr_t err = DB_SUCCESS;
2782 ulint n_unique;
2783 mtr_t mtr;
2784 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2785 ulint *offsets = offsets_;
2786 rec_offs_init(offsets_);
2787 rtr_info_t rtr_info;
2788
2789 ut_ad(!index->is_clustered());
2790 ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2791
2792 cursor.thr = thr;
2793 cursor.rtr_info = nullptr;
2794 ut_ad(thr_get_trx(thr)->id != 0 || index->table->is_intrinsic());
2795
2796 mtr_start(&mtr);
2797
2798 if (index->table->is_temporary()) {
2799 /* Disable REDO logging as the lifetime of temp-tables is
2800 limited to server or connection lifetime and so REDO
2801 information is not needed on restart for recovery.
2802 Disable locking as temp-tables are local to a connection. */
2803
2804 ut_ad(flags & BTR_NO_LOCKING_FLAG);
2805 ut_ad(!index->table->is_intrinsic() || (flags & BTR_NO_UNDO_LOG_FLAG));
2806
2807 mtr.set_log_mode(MTR_LOG_NO_REDO);
2808 } else if (!dict_index_is_spatial(index)) {
2809 /* Enable insert buffering if it's neither temp-table
2810 nor spatial index. */
2811 search_mode |= BTR_INSERT;
2812 }
2813
2814 /* Ensure that we acquire index->lock when inserting into an
2815 index with index->online_status == ONLINE_INDEX_COMPLETE, but
2816 could still be subject to rollback_inplace_alter_table().
2817 This prevents a concurrent change of index->online_status.
2818 The memory object cannot be freed as long as we have an open
2819 reference to the table, or index->table->n_ref_count > 0. */
2820 bool check = !index->is_committed();
2821
2822 DBUG_EXECUTE_IF("idx_mimic_not_committed", {
2823 check = true;
2824 mode = BTR_MODIFY_TREE;
2825 });
2826
2827 if (check) {
2828 DEBUG_SYNC_C("row_ins_sec_index_enter");
2829 if (mode == BTR_MODIFY_LEAF) {
2830 search_mode |= BTR_ALREADY_S_LATCHED;
2831 mtr_s_lock(dict_index_get_lock(index), &mtr);
2832 } else {
2833 mtr_sx_lock(dict_index_get_lock(index), &mtr);
2834 }
2835
2836 if (row_log_online_op_try(index, entry, thr_get_trx(thr)->id)) {
2837 goto func_exit;
2838 }
2839 }
2840
2841 /* Note that we use PAGE_CUR_LE as the search mode, because then
2842 the function will return in both low_match and up_match of the
2843 cursor sensible values */
2844
2845 if (!thr_get_trx(thr)->check_unique_secondary) {
2846 search_mode |= BTR_IGNORE_SEC_UNIQUE;
2847 }
2848
2849 if (dict_index_is_spatial(index)) {
2850 cursor.index = index;
2851 rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2852 rtr_info_update_btr(&cursor, &rtr_info);
2853
2854 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_RTREE_INSERT,
2855 search_mode, &cursor, 0, __FILE__, __LINE__,
2856 &mtr);
2857
2858 if (mode == BTR_MODIFY_LEAF && rtr_info.mbr_adj) {
2859 mtr_commit(&mtr);
2860 rtr_clean_rtr_info(&rtr_info, true);
2861 rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2862 rtr_info_update_btr(&cursor, &rtr_info);
2863
2864 mtr_start(&mtr);
2865
2866 search_mode &= ~BTR_MODIFY_LEAF;
2867
2868 search_mode |= BTR_MODIFY_TREE;
2869
2870 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_RTREE_INSERT,
2871 search_mode, &cursor, 0, __FILE__, __LINE__,
2872 &mtr);
2873 mode = BTR_MODIFY_TREE;
2874 }
2875
2876 DBUG_EXECUTE_IF("rtree_test_check_count", { goto func_exit; });
2877
2878 } else {
2879 if (index->table->is_intrinsic()) {
2880 btr_cur_search_to_nth_level_with_no_latch(
2881 index, 0, entry, PAGE_CUR_LE, &cursor, __FILE__, __LINE__, &mtr);
2882 ut_ad(cursor.page_cur.block != nullptr);
2883 ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2884 } else {
2885 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, search_mode,
2886 &cursor, 0, __FILE__, __LINE__, &mtr);
2887 }
2888 }
2889
2890 if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2891 ut_ad(!dict_index_is_spatial(index));
2892 /* The insert was buffered during the search: we are done */
2893 goto func_exit;
2894 }
2895
2896 #ifdef UNIV_DEBUG
2897 {
2898 page_t *page = btr_cur_get_page(&cursor);
2899 rec_t *first_rec = page_rec_get_next(page_get_infimum_rec(page));
2900
2901 ut_ad(page_rec_is_supremum(first_rec) ||
2902 rec_n_fields_is_sane(index, first_rec, entry));
2903 }
2904 #endif /* UNIV_DEBUG */
2905
2906 n_unique = dict_index_get_n_unique(index);
2907
2908 if (dict_index_is_unique(index) &&
2909 (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
2910 mtr_commit(&mtr);
2911
2912 DEBUG_SYNC_C("row_ins_sec_index_unique");
2913
2914 if (row_ins_sec_mtr_start_and_check_if_aborted(&mtr, index, check,
2915 search_mode)) {
2916 goto func_exit;
2917 }
2918
2919 err = row_ins_scan_sec_index_for_duplicate(flags, index, entry, thr, check,
2920 &mtr, offsets_heap);
2921
2922 mtr_commit(&mtr);
2923
2924 switch (err) {
2925 case DB_SUCCESS:
2926 break;
2927 case DB_DUPLICATE_KEY:
2928 if (!index->is_committed()) {
2929 ut_ad(!thr_get_trx(thr)->dict_operation_lock_mode);
2930
2931 dict_set_corrupted(index);
2932 /* Do not return any error to the
2933 caller. The duplicate will be reported
2934 by ALTER TABLE or CREATE UNIQUE INDEX.
2935 Unfortunately we cannot report the
2936 duplicate key value to the DDL thread,
2937 because the altered_table object is
2938 private to its call stack. */
2939 err = DB_SUCCESS;
2940 }
2941 /* fall through */
2942 default:
2943 if (dict_index_is_spatial(index)) {
2944 rtr_clean_rtr_info(&rtr_info, true);
2945 }
2946 return err;
2947 }
2948
2949 if (row_ins_sec_mtr_start_and_check_if_aborted(&mtr, index, check,
2950 search_mode)) {
2951 goto func_exit;
2952 }
2953
2954 DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
2955
2956 /* We did not find a duplicate and we have now
2957 locked with s-locks the necessary records to
2958 prevent any insertion of a duplicate by another
2959 transaction. Let us now reposition the cursor and
2960 continue the insertion. */
2961 if (index->table->is_intrinsic()) {
2962 btr_cur_search_to_nth_level_with_no_latch(
2963 index, 0, entry, PAGE_CUR_LE, &cursor, __FILE__, __LINE__, &mtr);
2964 ut_ad(cursor.page_cur.block != nullptr);
2965 ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2966 } else {
2967 btr_cur_search_to_nth_level(
2968 index, 0, entry, PAGE_CUR_LE,
2969 (search_mode & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE)), &cursor, 0,
2970 __FILE__, __LINE__, &mtr);
2971 }
2972 }
2973
2974 if (dup_chk_only) {
2975 goto func_exit;
2976 }
2977
2978 if (row_ins_must_modify_rec(&cursor)) {
2979 /* If the existing record is being modified and the new record
2980 is doesn't fit the provided slot then existing record is added
2981 to free list and new record is inserted. This also means
2982 cursor that we have cached for SELECT is now invalid. */
2983 if (index->last_sel_cur) {
2984 index->last_sel_cur->invalid = true;
2985 }
2986
2987 /* There is already an index entry with a long enough common
2988 prefix, we must convert the insert into a modify of an
2989 existing record */
2990 offsets = rec_get_offsets(btr_cur_get_rec(&cursor), index, offsets,
2991 ULINT_UNDEFINED, &offsets_heap);
2992
2993 err = row_ins_sec_index_entry_by_modify(
2994 flags, mode, &cursor, &offsets, offsets_heap, heap, entry, thr, &mtr);
2995
2996 if (err == DB_SUCCESS && dict_index_is_spatial(index) && rtr_info.mbr_adj) {
2997 err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
2998 }
2999 } else {
3000 rec_t *insert_rec;
3001 big_rec_t *big_rec;
3002
3003 if (mode == BTR_MODIFY_LEAF) {
3004 err = btr_cur_optimistic_insert(flags, &cursor, &offsets, &offsets_heap,
3005 entry, &insert_rec, &big_rec, thr, &mtr);
3006 if (err == DB_SUCCESS && dict_index_is_spatial(index) &&
3007 rtr_info.mbr_adj) {
3008 err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3009 }
3010 } else {
3011 ut_ad(mode == BTR_MODIFY_TREE);
3012 if (buf_LRU_buf_pool_running_out()) {
3013 err = DB_LOCK_TABLE_FULL;
3014 goto func_exit;
3015 }
3016
3017 err = btr_cur_optimistic_insert(flags, &cursor, &offsets, &offsets_heap,
3018 entry, &insert_rec, &big_rec, thr, &mtr);
3019 if (err == DB_FAIL) {
3020 err =
3021 btr_cur_pessimistic_insert(flags, &cursor, &offsets, &offsets_heap,
3022 entry, &insert_rec, &big_rec, thr, &mtr);
3023 }
3024 if (err == DB_SUCCESS && dict_index_is_spatial(index) &&
3025 rtr_info.mbr_adj) {
3026 err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3027 }
3028 }
3029
3030 if (err == DB_SUCCESS && trx_id) {
3031 page_update_max_trx_id(btr_cur_get_block(&cursor),
3032 btr_cur_get_page_zip(&cursor), trx_id, &mtr);
3033 }
3034
3035 ut_ad(!big_rec);
3036 }
3037
3038 func_exit:
3039 if (dict_index_is_spatial(index)) {
3040 rtr_clean_rtr_info(&rtr_info, true);
3041 }
3042
3043 mtr_commit(&mtr);
3044 return err;
3045 }
3046
3047 /** Inserts an entry into a clustered index. Tries first optimistic,
3048 then pessimistic descent down the tree. If the entry matches enough
3049 to a delete marked record, performs the insert by updating or delete
3050 unmarking the delete marked record.
3051 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)3052 dberr_t row_ins_clust_index_entry(
3053 dict_index_t *index, /*!< in: clustered index */
3054 dtuple_t *entry, /*!< in/out: index entry to insert */
3055 que_thr_t *thr, /*!< in: query thread */
3056 bool dup_chk_only)
3057 /*!< in: if true, just do duplicate check
3058 and return. don't execute actual insert. */
3059 {
3060 dberr_t err;
3061 ulint n_uniq;
3062
3063 DBUG_TRACE;
3064
3065 if (!index->table->foreign_set.empty()) {
3066 err = row_ins_check_foreign_constraints(index->table, index, entry, thr);
3067 if (err != DB_SUCCESS) {
3068 return err;
3069 }
3070 }
3071
3072 n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
3073
3074 /* Try first optimistic descent to the B-tree */
3075 uint32_t flags;
3076
3077 if (!index->table->is_intrinsic()) {
3078 log_free_check();
3079 flags = index->table->is_temporary() ? BTR_NO_LOCKING_FLAG : 0;
3080
3081 /* For intermediate table of copy alter operation,
3082 skip undo logging and record lock checking for
3083 insertion operation. */
3084 if (index->table->skip_alter_undo) {
3085 flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
3086 }
3087
3088 } else {
3089 flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3090 }
3091
3092 if (index->table->is_intrinsic() && dict_index_is_auto_gen_clust(index)) {
3093 /* Check if the memory allocated for intrinsic cache*/
3094 if (!index->last_ins_cur) {
3095 dict_allocate_mem_intrinsic_cache(index);
3096 }
3097 err = row_ins_sorted_clust_index_entry(BTR_MODIFY_LEAF, index, entry, thr);
3098 } else {
3099 err = row_ins_clust_index_entry_low(flags, BTR_MODIFY_LEAF, index, n_uniq,
3100 entry, thr, dup_chk_only);
3101 }
3102
3103 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3104 "after_row_ins_clust_index_entry_leaf");
3105
3106 if (err != DB_FAIL) {
3107 DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
3108 return err;
3109 }
3110
3111 /* Try then pessimistic descent to the B-tree */
3112 if (!index->table->is_intrinsic()) {
3113 log_free_check();
3114 } else if (!index->last_sel_cur) {
3115 dict_allocate_mem_intrinsic_cache(index);
3116 index->last_sel_cur->invalid = true;
3117 } else {
3118 index->last_sel_cur->invalid = true;
3119 }
3120
3121 if (index->table->is_intrinsic() && dict_index_is_auto_gen_clust(index)) {
3122 err = row_ins_sorted_clust_index_entry(BTR_MODIFY_TREE, index, entry, thr);
3123 } else {
3124 err = row_ins_clust_index_entry_low(flags, BTR_MODIFY_TREE, index, n_uniq,
3125 entry, thr, dup_chk_only);
3126 }
3127
3128 return err;
3129 }
3130
3131 /** Inserts an entry into a secondary index. Tries first optimistic,
3132 then pessimistic descent down the tree. If the entry matches enough
3133 to a delete marked record, performs the insert by updating or delete
3134 unmarking the delete marked record.
3135 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)3136 dberr_t row_ins_sec_index_entry(
3137 dict_index_t *index, /*!< in: secondary index */
3138 dtuple_t *entry, /*!< in/out: index entry to insert */
3139 que_thr_t *thr, /*!< in: query thread */
3140 bool dup_chk_only)
3141 /*!< in: if true, just do duplicate check
3142 and return. don't execute actual insert. */
3143 {
3144 dberr_t err;
3145 mem_heap_t *offsets_heap;
3146 mem_heap_t *heap;
3147 trx_id_t trx_id = 0;
3148
3149 DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
3150 DBUG_SET("-d,row_ins_sec_index_entry_timeout");
3151 return (DB_LOCK_WAIT);
3152 });
3153
3154 DBUG_EXECUTE_IF("row_ins_sec_index_entry_lock_wait", {
3155 static uint16_t count = 0;
3156 if (index->is_multi_value()) {
3157 ++count;
3158 }
3159 if (count == 2) {
3160 count = 0;
3161 return (DB_LOCK_WAIT);
3162 }
3163 });
3164
3165 if (!index->table->foreign_set.empty()) {
3166 err = row_ins_check_foreign_constraints(index->table, index, entry, thr);
3167 if (err != DB_SUCCESS) {
3168 return (err);
3169 }
3170 }
3171
3172 offsets_heap = mem_heap_create(1024);
3173 heap = mem_heap_create(1024);
3174
3175 /* Try first optimistic descent to the B-tree */
3176
3177 uint32_t flags;
3178
3179 if (!index->table->is_intrinsic()) {
3180 log_free_check();
3181 ut_ad(thr_get_trx(thr)->id != 0);
3182
3183 flags = index->table->is_temporary() ? BTR_NO_LOCKING_FLAG : 0;
3184 /* For intermediate table during copy alter table,
3185 skip the undo log and record lock checking for
3186 insertion operation. */
3187 if (index->table->skip_alter_undo) {
3188 trx_id = thr_get_trx(thr)->id;
3189 flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
3190 }
3191
3192 } else {
3193 flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3194 }
3195
3196 err = row_ins_sec_index_entry_low(flags, BTR_MODIFY_LEAF, index, offsets_heap,
3197 heap, entry, trx_id, thr, dup_chk_only);
3198 if (err == DB_FAIL) {
3199 mem_heap_empty(heap);
3200
3201 /* Try then pessimistic descent to the B-tree */
3202
3203 if (!index->table->is_intrinsic()) {
3204 log_free_check();
3205 } else if (!index->last_sel_cur) {
3206 dict_allocate_mem_intrinsic_cache(index);
3207 index->last_sel_cur->invalid = true;
3208 } else {
3209 index->last_sel_cur->invalid = true;
3210 }
3211
3212 err =
3213 row_ins_sec_index_entry_low(flags, BTR_MODIFY_TREE, index, offsets_heap,
3214 heap, entry, 0, thr, dup_chk_only);
3215 }
3216
3217 mem_heap_free(heap);
3218 mem_heap_free(offsets_heap);
3219 return (err);
3220 }
3221
3222 /** Inserts an entry into a secondary index, which is created for
3223 multi-value field. For each value to be inserted, it tries first optimistic,
3224 then pessimistic descent down the tree. If the entry matches enough
3225 to a delete marked record, performs the insert by updating or delete
3226 unmarking the delete marked record.
3227 @param[in] index secondary index which is for multi-value field
3228 @param[in,out] entry index entry to insert
3229 @param[in,out] multi_val_pos the start position to insert next multi-value
3230 data, and the returned value should be either
3231 0 if all are done, or the position where the
3232 insert failed. So return value of 0 could be
3233 a bit ambiguous, however the return error
3234 can help to see which case it is
3235 @param[in] thr query thread
3236 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
row_ins_sec_index_multi_value_entry(dict_index_t * index,dtuple_t * entry,uint32_t & multi_val_pos,que_thr_t * thr)3237 static dberr_t row_ins_sec_index_multi_value_entry(dict_index_t *index,
3238 dtuple_t *entry,
3239 uint32_t &multi_val_pos,
3240 que_thr_t *thr) {
3241 ut_d(trx_t *trx = thr_get_trx(thr));
3242
3243 ut_ad(trx->id != 0);
3244 ut_ad(!index->table->is_intrinsic());
3245 ut_ad(index->is_committed());
3246 ut_ad(!dict_index_is_online_ddl(index));
3247 ut_ad(index->is_multi_value());
3248
3249 dberr_t err = DB_SUCCESS;
3250 Multi_value_entry_builder_insert mv_entry_builder(index, entry);
3251
3252 for (dtuple_t *mv_entry = mv_entry_builder.begin(multi_val_pos);
3253 mv_entry != nullptr; mv_entry = mv_entry_builder.next()) {
3254 err = row_ins_sec_index_entry(index, mv_entry, thr, false);
3255 if (err != DB_SUCCESS) {
3256 multi_val_pos = mv_entry_builder.last_multi_value_position();
3257 return (err);
3258 }
3259 }
3260
3261 multi_val_pos = 0;
3262
3263 return (err);
3264 }
3265
3266 /** Inserts an index entry to index. Tries first optimistic, then pessimistic
3267 descent down the tree. If the entry matches enough to a delete marked record,
3268 performs the insert by updating or delete unmarking the delete marked
3269 record.
3270 @param[in] index index to insert the entry
3271 @param[in,out] entry entry to insert
3272 @param[in,out] multi_val_pos if multi-value index, the start position
3273 to insert next multi-value data,
3274 and the returned value should be either
3275 0 if all are done, or the position where the
3276 insert failed. So return value of 0 could be
3277 a bit ambiguous, however the return error
3278 can help to see which case it is
3279 @param[in] thr query thread
3280 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,uint32_t & multi_val_pos,que_thr_t * thr)3281 static dberr_t row_ins_index_entry(dict_index_t *index, dtuple_t *entry,
3282 uint32_t &multi_val_pos, que_thr_t *thr) {
3283 ut_ad(thr_get_trx(thr)->id != 0);
3284
3285 DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
3286 DBUG_SET("-d,row_ins_index_entry_timeout");
3287 return (DB_LOCK_WAIT);
3288 });
3289
3290 if (index->is_clustered()) {
3291 return (row_ins_clust_index_entry(index, entry, thr, false));
3292 } else if (index->is_multi_value()) {
3293 return (
3294 row_ins_sec_index_multi_value_entry(index, entry, multi_val_pos, thr));
3295 } else {
3296 return (row_ins_sec_index_entry(index, entry, thr, false));
3297 }
3298 }
3299
3300 /** This function generate MBR (Minimum Bounding Box) for spatial objects
3301 and set it to spatial index field. */
row_ins_spatial_index_entry_set_mbr_field(dfield_t * field,const dfield_t * row_field,uint32_t * srid,const dd::Spatial_reference_system * srs)3302 static void row_ins_spatial_index_entry_set_mbr_field(
3303 dfield_t *field, /*!< in/out: mbr field */
3304 const dfield_t *row_field, /*!< in: row field */
3305 uint32_t *srid, /*!< in/out: spatial reference id */
3306 const dd::Spatial_reference_system *srs) /*!< in: SRS of row_field */
3307 {
3308 uchar *dptr = nullptr;
3309 ulint dlen = 0;
3310 double mbr[SPDIMS * 2];
3311
3312 /* This must be a GEOMETRY datatype */
3313 ut_ad(DATA_GEOMETRY_MTYPE(field->type.mtype));
3314
3315 dptr = static_cast<uchar *>(dfield_get_data(row_field));
3316 dlen = dfield_get_len(row_field);
3317
3318 /* obtain the MBR */
3319 get_mbr_from_store(srs, dptr, static_cast<uint>(dlen), SPDIMS, mbr, srid);
3320
3321 /* Set mbr as index entry data */
3322 dfield_write_mbr(field, mbr);
3323 }
3324
3325 /** Sets the values of the dtuple fields in entry from the values of appropriate
3326 columns in row.
3327 @param[in] index index handler
3328 @param[out] entry index entry to make
3329 @param[in] row row
3330
3331 @return DB_SUCCESS if the set is successful */
row_ins_index_entry_set_vals(const dict_index_t * index,dtuple_t * entry,const dtuple_t * row)3332 dberr_t row_ins_index_entry_set_vals(const dict_index_t *index, dtuple_t *entry,
3333 const dtuple_t *row) {
3334 ulint n_fields;
3335 ulint i;
3336 ulint num_v = dtuple_get_n_v_fields(entry);
3337
3338 n_fields = dtuple_get_n_fields(entry);
3339
3340 for (i = 0; i < n_fields + num_v; i++) {
3341 dict_field_t *ind_field = nullptr;
3342 dfield_t *field;
3343 const dfield_t *row_field;
3344 ulint len;
3345 dict_col_t *col;
3346
3347 if (i >= n_fields) {
3348 /* This is virtual field */
3349 field = dtuple_get_nth_v_field(entry, i - n_fields);
3350 col = &dict_table_get_nth_v_col(index->table, i - n_fields)->m_col;
3351 } else {
3352 field = dtuple_get_nth_field(entry, i);
3353 ind_field = index->get_field(i);
3354 col = ind_field->col;
3355 }
3356
3357 if (col->is_virtual()) {
3358 const dict_v_col_t *v_col = reinterpret_cast<const dict_v_col_t *>(col);
3359 ut_ad(dtuple_get_n_fields(row) == index->table->get_n_cols());
3360 row_field = dtuple_get_nth_v_field(row, v_col->v_pos);
3361 } else {
3362 row_field = dtuple_get_nth_field(row, ind_field->col->ind);
3363 }
3364
3365 len = dfield_get_len(row_field);
3366
3367 /* Check column prefix indexes */
3368 if (ind_field != nullptr && ind_field->prefix_len > 0 &&
3369 dfield_get_len(row_field) != UNIV_SQL_NULL) {
3370 const dict_col_t *col = ind_field->col;
3371
3372 len = dtype_get_at_most_n_mbchars(
3373 col->prtype, col->mbminmaxlen, ind_field->prefix_len, len,
3374 static_cast<const char *>(dfield_get_data(row_field)));
3375
3376 ut_ad(!dfield_is_ext(row_field));
3377 }
3378
3379 /* Handle spatial index. For the first field, replace
3380 the data with its MBR (Minimum Bounding Box). */
3381 if ((i == 0) && dict_index_is_spatial(index)) {
3382 if (!row_field->data || row_field->len < GEO_DATA_HEADER_SIZE) {
3383 return (DB_CANT_CREATE_GEOMETRY_OBJECT);
3384 }
3385
3386 uint32_t srid;
3387 row_ins_spatial_index_entry_set_mbr_field(field, row_field, &srid,
3388 index->rtr_srs.get());
3389
3390 if (index->srid_is_valid && index->srid != srid) {
3391 return DB_CANT_CREATE_GEOMETRY_OBJECT;
3392 }
3393
3394 continue;
3395 }
3396
3397 dfield_set_data(field, dfield_get_data(row_field), len);
3398 if (dfield_is_ext(row_field)) {
3399 ut_ad(index->is_clustered());
3400 dfield_set_ext(field);
3401 }
3402 }
3403
3404 return (DB_SUCCESS);
3405 }
3406
3407 /** Inserts a single index entry to the table.
3408 @return DB_SUCCESS if operation successfully completed, else error
3409 code or DB_LOCK_WAIT */
3410 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3411 row_ins_index_entry_step(ins_node_t *node, /*!< in: row insert node */
3412 que_thr_t *thr) /*!< in: query thread */
3413 {
3414 dberr_t err;
3415
3416 DBUG_TRACE;
3417
3418 ut_ad(dtuple_check_typed(node->row));
3419
3420 err = row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3421
3422 if (err != DB_SUCCESS) {
3423 return err;
3424 }
3425
3426 ut_ad(dtuple_check_typed(node->entry));
3427
3428 err = row_ins_index_entry(node->index, node->entry, node->ins_multi_val_pos,
3429 thr);
3430
3431 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3432 "after_row_ins_index_entry_step");
3433
3434 return err;
3435 }
3436
3437 /** Allocates a row id for row and inits the node->index field. */
3438 UNIV_INLINE
row_ins_alloc_row_id_step(ins_node_t * node)3439 void row_ins_alloc_row_id_step(ins_node_t *node) /*!< in: row insert node */
3440 {
3441 row_id_t row_id;
3442
3443 ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3444
3445 if (dict_index_is_unique(node->table->first_index())) {
3446 /* No row id is stored if the clustered index is unique */
3447
3448 return;
3449 }
3450
3451 /* Fill in row id value to row */
3452
3453 row_id = dict_sys_get_new_row_id();
3454
3455 dict_sys_write_row_id(node->row_id_buf, row_id);
3456 }
3457
3458 /** Gets a row to insert from the values list. */
3459 UNIV_INLINE
row_ins_get_row_from_values(ins_node_t * node)3460 void row_ins_get_row_from_values(ins_node_t *node) /*!< in: row insert node */
3461 {
3462 que_node_t *list_node;
3463 dfield_t *dfield;
3464 dtuple_t *row;
3465 ulint i;
3466
3467 /* The field values are copied in the buffers of the select node and
3468 it is safe to use them until we fetch from select again: therefore
3469 we can just copy the pointers */
3470
3471 row = node->row;
3472
3473 i = 0;
3474 list_node = node->values_list;
3475
3476 while (list_node) {
3477 eval_exp(list_node);
3478
3479 dfield = dtuple_get_nth_field(row, i);
3480 dfield_copy_data(dfield, que_node_get_val(list_node));
3481
3482 i++;
3483 list_node = que_node_get_next(list_node);
3484 }
3485 }
3486
3487 /** Gets a row to insert from the select list. */
3488 UNIV_INLINE
row_ins_get_row_from_select(ins_node_t * node)3489 void row_ins_get_row_from_select(ins_node_t *node) /*!< in: row insert node */
3490 {
3491 que_node_t *list_node;
3492 dfield_t *dfield;
3493 dtuple_t *row;
3494 ulint i;
3495
3496 /* The field values are copied in the buffers of the select node and
3497 it is safe to use them until we fetch from select again: therefore
3498 we can just copy the pointers */
3499
3500 row = node->row;
3501
3502 i = 0;
3503 list_node = node->select->select_list;
3504
3505 while (list_node) {
3506 dfield = dtuple_get_nth_field(row, i);
3507 dfield_copy_data(dfield, que_node_get_val(list_node));
3508
3509 i++;
3510 list_node = que_node_get_next(list_node);
3511 }
3512 }
3513
3514 /** Inserts a row to a table.
3515 @return DB_SUCCESS if operation successfully completed, else error
3516 code or DB_LOCK_WAIT */
3517 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3518 row_ins(ins_node_t *node, /*!< in: row insert node */
3519 que_thr_t *thr) /*!< in: query thread */
3520 {
3521 dberr_t err;
3522
3523 DBUG_TRACE;
3524
3525 DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));
3526
3527 if (node->state == INS_NODE_ALLOC_ROW_ID) {
3528 row_ins_alloc_row_id_step(node);
3529
3530 node->index = node->table->first_index();
3531 node->entry = UT_LIST_GET_FIRST(node->entry_list);
3532
3533 if (node->ins_type == INS_SEARCHED) {
3534 row_ins_get_row_from_select(node);
3535
3536 } else if (node->ins_type == INS_VALUES) {
3537 row_ins_get_row_from_values(node);
3538 }
3539
3540 node->state = INS_NODE_INSERT_ENTRIES;
3541 }
3542
3543 ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3544
3545 while (node->index != nullptr) {
3546 if (node->index->type != DICT_FTS) {
3547 err = row_ins_index_entry_step(node, thr);
3548
3549 switch (err) {
3550 case DB_SUCCESS:
3551 break;
3552 case DB_DUPLICATE_KEY:
3553 thr_get_trx(thr)->error_state = DB_DUPLICATE_KEY;
3554 thr_get_trx(thr)->error_index = node->index;
3555 // fall through
3556 default:
3557 return err;
3558 }
3559 }
3560
3561 node->index = node->index->next();
3562 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3563
3564 DBUG_EXECUTE_IF("row_ins_skip_sec", node->index = nullptr;
3565 node->entry = nullptr; break;);
3566
3567 /* Skip corrupted secondary index and its entry */
3568 while (node->index && node->index->is_corrupted()) {
3569 node->index = node->index->next();
3570 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3571 }
3572 }
3573
3574 ut_ad(node->entry == nullptr);
3575
3576 thr_get_trx(thr)->error_index = nullptr;
3577 node->state = INS_NODE_ALLOC_ROW_ID;
3578
3579 return DB_SUCCESS;
3580 }
3581
3582 /** Inserts a row to a table. This is a high-level function used in SQL
3583 execution graphs.
3584 @return query thread to run next or NULL */
row_ins_step(que_thr_t * thr)3585 que_thr_t *row_ins_step(que_thr_t *thr) /*!< in: query thread */
3586 {
3587 ins_node_t *node;
3588 que_node_t *parent;
3589 sel_node_t *sel_node;
3590 trx_t *trx;
3591 dberr_t err;
3592
3593 ut_ad(thr);
3594
3595 DEBUG_SYNC_C("innodb_row_ins_step_enter");
3596
3597 trx = thr_get_trx(thr);
3598
3599 trx_start_if_not_started_xa(trx, true);
3600
3601 node = static_cast<ins_node_t *>(thr->run_node);
3602
3603 ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3604 ut_ad(!node->table->is_intrinsic());
3605
3606 parent = que_node_get_parent(node);
3607 sel_node = node->select;
3608
3609 if (thr->prev_node == parent) {
3610 node->state = INS_NODE_SET_IX_LOCK;
3611 }
3612
3613 /* If this is the first time this node is executed (or when
3614 execution resumes after wait for the table IX lock), set an
3615 IX lock on the table and reset the possible select node. MySQL's
3616 partitioned table code may also call an insert within the same
3617 SQL statement AFTER it has used this table handle to do a search.
3618 This happens, for example, when a row update moves it to another
3619 partition. In that case, we have already set the IX lock on the
3620 table during the search operation, and there is no need to set
3621 it again here. But we must write trx->id to node->trx_id_buf. */
3622
3623 memset(node->trx_id_buf, 0, DATA_TRX_ID_LEN);
3624 trx_write_trx_id(node->trx_id_buf, trx->id);
3625
3626 if (node->state == INS_NODE_SET_IX_LOCK) {
3627 node->state = INS_NODE_ALLOC_ROW_ID;
3628
3629 /* It may be that the current session has not yet started
3630 its transaction, or it has been committed: */
3631
3632 if (trx->id == node->trx_id) {
3633 /* No need to do IX-locking */
3634
3635 goto same_trx;
3636 }
3637
3638 err = lock_table(0, node->table, LOCK_IX, thr);
3639
3640 DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait", err = DB_LOCK_WAIT;);
3641
3642 if (err != DB_SUCCESS) {
3643 goto error_handling;
3644 }
3645
3646 node->trx_id = trx->id;
3647 same_trx:
3648 if (node->ins_type == INS_SEARCHED) {
3649 /* Reset the cursor */
3650 sel_node->state = SEL_NODE_OPEN;
3651
3652 /* Fetch a row to insert */
3653
3654 thr->run_node = sel_node;
3655
3656 return (thr);
3657 }
3658 }
3659
3660 if ((node->ins_type == INS_SEARCHED) && (sel_node->state != SEL_NODE_FETCH)) {
3661 ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3662
3663 /* No more rows to insert */
3664 thr->run_node = parent;
3665
3666 return (thr);
3667 }
3668
3669 /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3670
3671 err = row_ins(node, thr);
3672
3673 error_handling:
3674 trx->error_state = err;
3675
3676 if (err != DB_SUCCESS) {
3677 /* err == DB_LOCK_WAIT or SQL error detected */
3678 return (nullptr);
3679 }
3680
3681 /* DO THE TRIGGER ACTIONS HERE */
3682
3683 if (node->ins_type == INS_SEARCHED) {
3684 /* Fetch a row to insert */
3685
3686 thr->run_node = sel_node;
3687 } else {
3688 thr->run_node = que_node_get_parent(node);
3689 }
3690
3691 return (thr);
3692 }
3693