1 /*****************************************************************************
2
3 Copyright (c) 1996, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0ins.cc
29 Insert into a table
30
31 Created 4/20/1996 Heikki Tuuri
32 *******************************************************/
33
34 #include "ha_prototypes.h"
35
36 #include "row0ins.h"
37
38 #ifdef UNIV_NONINL
39 #include "row0ins.ic"
40 #endif
41
42 #include "dict0dict.h"
43 #include "dict0boot.h"
44 #include "trx0rec.h"
45 #include "trx0undo.h"
46 #include "btr0btr.h"
47 #include "btr0cur.h"
48 #include "mach0data.h"
49 #include "que0que.h"
50 #include "row0upd.h"
51 #include "row0sel.h"
52 #include "row0row.h"
53 #include "row0log.h"
54 #include "rem0cmp.h"
55 #include "lock0lock.h"
56 #include "log0log.h"
57 #include "eval0eval.h"
58 #include "data0data.h"
59 #include "usr0sess.h"
60 #include "buf0lru.h"
61 #include "fts0fts.h"
62 #include "fts0types.h"
63 #include "m_string.h"
64 #include "gis0geo.h"
65
66 /*************************************************************************
67 IMPORTANT NOTE: Any operation that generates redo MUST check that there
68 is enough space in the redo log before for that operation. This is
69 done by calling log_free_check(). The reason for checking the
70 availability of the redo log space before the start of the operation is
71 that we MUST not hold any synchonization objects when performing the
72 check.
73 If you make a change in this module make sure that no codepath is
74 introduced where a call to log_free_check() is bypassed. */
75
76 /*********************************************************************//**
77 Creates an insert node struct.
78 @return own: insert node struct */
79 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)80 ins_node_create(
81 /*============*/
82 ulint ins_type, /*!< in: INS_VALUES, ... */
83 dict_table_t* table, /*!< in: table where to insert */
84 mem_heap_t* heap) /*!< in: mem heap where created */
85 {
86 ins_node_t* node;
87
88 node = static_cast<ins_node_t*>(
89 mem_heap_alloc(heap, sizeof(ins_node_t)));
90
91 node->common.type = QUE_NODE_INSERT;
92
93 node->ins_type = ins_type;
94
95 node->state = INS_NODE_SET_IX_LOCK;
96 node->table = table;
97 node->index = NULL;
98 node->entry = NULL;
99
100 node->select = NULL;
101
102 node->trx_id = 0;
103
104 node->entry_sys_heap = mem_heap_create(128);
105
106 node->magic_n = INS_NODE_MAGIC_N;
107
108 return(node);
109 }
110
111 /***********************************************************//**
112 Creates an entry template for each index of a table. */
113 static
114 void
ins_node_create_entry_list(ins_node_t * node)115 ins_node_create_entry_list(
116 /*=======================*/
117 ins_node_t* node) /*!< in: row insert node */
118 {
119 dict_index_t* index;
120 dtuple_t* entry;
121
122 ut_ad(node->entry_sys_heap);
123
124 UT_LIST_INIT(node->entry_list, &dtuple_t::tuple_list);
125
126 /* We will include all indexes (include those corrupted
127 secondary indexes) in the entry list. Filteration of
128 these corrupted index will be done in row_ins() */
129
130 for (index = dict_table_get_first_index(node->table);
131 index != 0;
132 index = dict_table_get_next_index(index)) {
133
134 entry = row_build_index_entry_low(
135 node->row, NULL, index, node->entry_sys_heap,
136 ROW_BUILD_FOR_INSERT);
137
138 UT_LIST_ADD_LAST(node->entry_list, entry);
139 }
140 }
141
142 /*****************************************************************//**
143 Adds system field buffers to a row. */
144 static
145 void
row_ins_alloc_sys_fields(ins_node_t * node)146 row_ins_alloc_sys_fields(
147 /*=====================*/
148 ins_node_t* node) /*!< in: insert node */
149 {
150 dtuple_t* row;
151 dict_table_t* table;
152 mem_heap_t* heap;
153 const dict_col_t* col;
154 dfield_t* dfield;
155 byte* ptr;
156
157 row = node->row;
158 table = node->table;
159 heap = node->entry_sys_heap;
160
161 ut_ad(row && table && heap);
162 ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
163
164 /* allocate buffer to hold the needed system created hidden columns. */
165 uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN;
166 if (!dict_table_is_intrinsic(table)) {
167 len += DATA_ROLL_PTR_LEN;
168 }
169 ptr = static_cast<byte*>(mem_heap_zalloc(heap, len));
170
171 /* 1. Populate row-id */
172 col = dict_table_get_sys_col(table, DATA_ROW_ID);
173
174 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
175
176 dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
177
178 node->row_id_buf = ptr;
179
180 ptr += DATA_ROW_ID_LEN;
181
182 /* 2. Populate trx id */
183 col = dict_table_get_sys_col(table, DATA_TRX_ID);
184
185 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
186
187 dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
188
189 node->trx_id_buf = ptr;
190
191 ptr += DATA_TRX_ID_LEN;
192
193 if (!dict_table_is_intrinsic(table)) {
194 col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
195
196 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
197
198 dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
199 }
200 }
201
202 /*********************************************************************//**
203 Sets a new row to insert for an INS_DIRECT node. This function is only used
204 if we have constructed the row separately, which is a rare case; this
205 function is quite slow. */
206 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)207 ins_node_set_new_row(
208 /*=================*/
209 ins_node_t* node, /*!< in: insert node */
210 dtuple_t* row) /*!< in: new row (or first row) for the node */
211 {
212 node->state = INS_NODE_SET_IX_LOCK;
213 node->index = NULL;
214 node->entry = NULL;
215
216 node->row = row;
217
218 mem_heap_empty(node->entry_sys_heap);
219
220 /* Create templates for index entries */
221
222 ins_node_create_entry_list(node);
223
224 /* Allocate from entry_sys_heap buffers for sys fields */
225
226 row_ins_alloc_sys_fields(node);
227
228 /* As we allocated a new trx id buf, the trx id should be written
229 there again: */
230
231 node->trx_id = 0;
232 }
233
234 /*******************************************************************//**
235 Does an insert operation by updating a delete-marked existing record
236 in the index. This situation can occur if the delete-marked record is
237 kept in the index for consistent reads.
238 @return DB_SUCCESS or error code */
239 static MY_ATTRIBUTE((warn_unused_result))
240 dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)241 row_ins_sec_index_entry_by_modify(
242 /*==============================*/
243 ulint flags, /*!< in: undo logging and locking flags */
244 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
245 depending on whether mtr holds just a leaf
246 latch or also a tree latch */
247 btr_cur_t* cursor, /*!< in: B-tree cursor */
248 ulint** offsets,/*!< in/out: offsets on cursor->page_cur.rec */
249 mem_heap_t* offsets_heap,
250 /*!< in/out: memory heap that can be emptied */
251 mem_heap_t* heap, /*!< in/out: memory heap */
252 const dtuple_t* entry, /*!< in: index entry to insert */
253 que_thr_t* thr, /*!< in: query thread */
254 mtr_t* mtr) /*!< in: mtr; must be committed before
255 latching any further pages */
256 {
257 big_rec_t* dummy_big_rec;
258 upd_t* update;
259 rec_t* rec;
260 dberr_t err;
261
262 rec = btr_cur_get_rec(cursor);
263
264 ut_ad(!dict_index_is_clust(cursor->index));
265 ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
266 ut_ad(!entry->info_bits);
267
268 /* We know that in the alphabetical ordering, entry and rec are
269 identified. But in their binary form there may be differences if
270 there are char fields in them. Therefore we have to calculate the
271 difference. */
272
273 update = row_upd_build_sec_rec_difference_binary(
274 rec, cursor->index, *offsets, entry, heap);
275
276 if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
277 /* We should never insert in place of a record that
278 has not been delete-marked. The only exception is when
279 online CREATE INDEX copied the changes that we already
280 made to the clustered index, and completed the
281 secondary index creation before we got here. In this
282 case, the change would already be there. The CREATE
283 INDEX should be waiting for a MySQL meta-data lock
284 upgrade at least until this INSERT or UPDATE
285 returns. After that point, set_committed(true)
286 would be invoked in commit_inplace_alter_table(). */
287 ut_a(update->n_fields == 0);
288 ut_a(!cursor->index->is_committed());
289 ut_ad(!dict_index_is_online_ddl(cursor->index));
290 return(DB_SUCCESS);
291 }
292
293 if (mode == BTR_MODIFY_LEAF) {
294 /* Try an optimistic updating of the record, keeping changes
295 within the page */
296
297 /* TODO: pass only *offsets */
298 err = btr_cur_optimistic_update(
299 flags | BTR_KEEP_SYS_FLAG, cursor,
300 offsets, &offsets_heap, update, 0, thr,
301 thr_get_trx(thr)->id, mtr);
302 switch (err) {
303 case DB_OVERFLOW:
304 case DB_UNDERFLOW:
305 case DB_ZIP_OVERFLOW:
306 err = DB_FAIL;
307 default:
308 break;
309 }
310 } else {
311 ut_a(mode == BTR_MODIFY_TREE);
312 if (buf_LRU_buf_pool_running_out()) {
313
314 return(DB_LOCK_TABLE_FULL);
315 }
316
317 err = btr_cur_pessimistic_update(
318 flags | BTR_KEEP_SYS_FLAG, cursor,
319 offsets, &offsets_heap,
320 heap, &dummy_big_rec, update, 0,
321 thr, thr_get_trx(thr)->id, mtr);
322 ut_ad(!dummy_big_rec);
323 }
324
325 return(err);
326 }
327
328 /*******************************************************************//**
329 Does an insert operation by delete unmarking and updating a delete marked
330 existing record in the index. This situation can occur if the delete marked
331 record is kept in the index for consistent reads.
332 @return DB_SUCCESS, DB_FAIL, or error code */
333 static MY_ATTRIBUTE((warn_unused_result))
334 dberr_t
row_ins_clust_index_entry_by_modify(btr_pcur_t * pcur,ulint flags,ulint mode,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)335 row_ins_clust_index_entry_by_modify(
336 /*================================*/
337 btr_pcur_t* pcur, /*!< in/out: a persistent cursor pointing
338 to the clust_rec that is being modified. */
339 ulint flags, /*!< in: undo logging and locking flags */
340 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
341 depending on whether mtr holds just a leaf
342 latch or also a tree latch */
343 ulint** offsets,/*!< out: offsets on cursor->page_cur.rec */
344 mem_heap_t** offsets_heap,
345 /*!< in/out: pointer to memory heap that can
346 be emptied, or NULL */
347 mem_heap_t* heap, /*!< in/out: memory heap */
348 const dtuple_t* entry, /*!< in: index entry to insert */
349 que_thr_t* thr, /*!< in: query thread */
350 mtr_t* mtr) /*!< in: mtr; must be committed before
351 latching any further pages */
352 {
353 const rec_t* rec;
354 upd_t* update;
355 dberr_t err = DB_SUCCESS;
356 btr_cur_t* cursor = btr_pcur_get_btr_cur(pcur);
357 TABLE* mysql_table = NULL;
358 ut_ad(dict_index_is_clust(cursor->index));
359
360 rec = btr_cur_get_rec(cursor);
361
362 ut_ad(rec_get_deleted_flag(rec,
363 dict_table_is_comp(cursor->index->table)));
364
365 /* Build an update vector containing all the fields to be modified;
366 NOTE that this vector may NOT contain system columns trx_id or
367 roll_ptr */
368 if (thr->prebuilt != NULL) {
369 mysql_table = thr->prebuilt->m_mysql_table;
370 ut_ad(thr->prebuilt->trx == thr_get_trx(thr));
371 }
372
373 update = row_upd_build_difference_binary(
374 cursor->index, entry, rec, NULL, true,
375 thr_get_trx(thr), heap, mysql_table, thr->prebuilt, &err);
376 if (err != DB_SUCCESS) {
377 return(err);
378 }
379
380 if (mode != BTR_MODIFY_TREE) {
381 ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
382
383 /* Try optimistic updating of the record, keeping changes
384 within the page */
385
386 err = btr_cur_optimistic_update(
387 flags, cursor, offsets, offsets_heap, update, 0, thr,
388 thr_get_trx(thr)->id, mtr);
389 switch (err) {
390 case DB_OVERFLOW:
391 case DB_UNDERFLOW:
392 case DB_ZIP_OVERFLOW:
393 err = DB_FAIL;
394 default:
395 break;
396 }
397 } else {
398 if (buf_LRU_buf_pool_running_out()) {
399
400 return(DB_LOCK_TABLE_FULL);
401
402 }
403
404 big_rec_t* big_rec = NULL;
405
406 err = btr_cur_pessimistic_update(
407 flags | BTR_KEEP_POS_FLAG,
408 cursor, offsets, offsets_heap, heap,
409 &big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
410
411 if (big_rec) {
412 ut_a(err == DB_SUCCESS);
413
414 DEBUG_SYNC_C("before_row_ins_upd_extern");
415 err = btr_store_big_rec_extern_fields(
416 pcur, update, *offsets, big_rec, mtr,
417 BTR_STORE_INSERT_UPDATE);
418 DEBUG_SYNC_C("after_row_ins_upd_extern");
419 dtuple_big_rec_free(big_rec);
420 }
421 }
422
423 return(err);
424 }
425
426 /*********************************************************************//**
427 Returns TRUE if in a cascaded update/delete an ancestor node of node
428 updates (not DELETE, but UPDATE) table.
429 @return TRUE if an ancestor updates table */
430 static
431 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)432 row_ins_cascade_ancestor_updates_table(
433 /*===================================*/
434 que_node_t* node, /*!< in: node in a query graph */
435 dict_table_t* table) /*!< in: table */
436 {
437 que_node_t* parent;
438
439 for (parent = que_node_get_parent(node);
440 que_node_get_type(parent) == QUE_NODE_UPDATE;
441 parent = que_node_get_parent(parent)) {
442
443 upd_node_t* upd_node;
444
445 upd_node = static_cast<upd_node_t*>(parent);
446
447 if (upd_node->table == table && upd_node->is_delete == FALSE) {
448
449 return(TRUE);
450 }
451 }
452
453 return(FALSE);
454 }
455
456 /*********************************************************************//**
457 Returns the number of ancestor UPDATE or DELETE nodes of a
458 cascaded update/delete node.
459 @return number of ancestors */
460 static MY_ATTRIBUTE((warn_unused_result))
461 ulint
row_ins_cascade_n_ancestors(que_node_t * node)462 row_ins_cascade_n_ancestors(
463 /*========================*/
464 que_node_t* node) /*!< in: node in a query graph */
465 {
466 que_node_t* parent;
467 ulint n_ancestors = 0;
468
469 for (parent = que_node_get_parent(node);
470 que_node_get_type(parent) == QUE_NODE_UPDATE;
471 parent = que_node_get_parent(parent)) {
472
473 n_ancestors++;
474 }
475
476 return(n_ancestors);
477 }
478
479 /******************************************************************//**
480 Calculates the update vector node->cascade->update for a child table in
481 a cascaded update.
482 @return number of fields in the calculated update vector; the value
483 can also be 0 if no foreign key fields changed; the returned value is
484 ULINT_UNDEFINED if the column type in the child table is too short to
485 fit the new value in the parent table: that means the update fails */
486 static MY_ATTRIBUTE((warn_unused_result))
487 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)488 row_ins_cascade_calc_update_vec(
489 /*============================*/
490 upd_node_t* node, /*!< in: update node of the parent
491 table */
492 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
493 type is != 0 */
494 mem_heap_t* heap, /*!< in: memory heap to use as
495 temporary storage */
496 trx_t* trx, /*!< in: update transaction */
497 ibool* fts_col_affected)
498 /*!< out: is FTS column affected */
499 {
500 upd_node_t* cascade = node->cascade_node;
501 dict_table_t* table = foreign->foreign_table;
502 dict_index_t* index = foreign->foreign_index;
503 upd_t* update;
504 dict_table_t* parent_table;
505 dict_index_t* parent_index;
506 upd_t* parent_update;
507 ulint n_fields_updated;
508 ulint parent_field_no;
509 ulint i;
510 ulint j;
511 ibool doc_id_updated = FALSE;
512 ulint doc_id_pos = 0;
513 doc_id_t new_doc_id = FTS_NULL_DOC_ID;
514
515 ut_a(node);
516 ut_a(foreign);
517 ut_a(cascade);
518 ut_a(table);
519 ut_a(index);
520
521 /* Calculate the appropriate update vector which will set the fields
522 in the child index record to the same value (possibly padded with
523 spaces if the column is a fixed length CHAR or FIXBINARY column) as
524 the referenced index record will get in the update. */
525
526 parent_table = node->table;
527 ut_a(parent_table == foreign->referenced_table);
528 parent_index = foreign->referenced_index;
529 parent_update = node->update;
530
531 update = cascade->update;
532
533 update->info_bits = 0;
534
535 n_fields_updated = 0;
536
537 *fts_col_affected = FALSE;
538
539 if (table->fts) {
540 doc_id_pos = dict_table_get_nth_col_pos(
541 table, table->fts->doc_col);
542 }
543
544 for (i = 0; i < foreign->n_fields; i++) {
545
546 parent_field_no = dict_table_get_nth_col_pos(
547 parent_table,
548 dict_index_get_nth_col_no(parent_index, i));
549
550 for (j = 0; j < parent_update->n_fields; j++) {
551 const upd_field_t* parent_ufield
552 = &parent_update->fields[j];
553
554 if (parent_ufield->field_no == parent_field_no) {
555
556 ulint min_size;
557 const dict_col_t* col;
558 ulint ufield_len;
559 upd_field_t* ufield;
560
561 col = dict_index_get_nth_col(index, i);
562
563 /* A field in the parent index record is
564 updated. Let us make the update vector
565 field for the child table. */
566
567 ufield = update->fields + n_fields_updated;
568
569 ufield->field_no
570 = dict_table_get_nth_col_pos(
571 table, dict_col_get_no(col));
572
573 ufield->orig_len = 0;
574 ufield->exp = NULL;
575
576 ufield->new_val = parent_ufield->new_val;
577 ufield_len = dfield_get_len(&ufield->new_val);
578
579 /* Clear the "external storage" flag */
580 dfield_set_len(&ufield->new_val, ufield_len);
581
582 /* Do not allow a NOT NULL column to be
583 updated as NULL */
584
585 if (dfield_is_null(&ufield->new_val)
586 && (col->prtype & DATA_NOT_NULL)) {
587
588 return(ULINT_UNDEFINED);
589 }
590
591 /* If the new value would not fit in the
592 column, do not allow the update */
593
594 if (!dfield_is_null(&ufield->new_val)
595 && dtype_get_at_most_n_mbchars(
596 col->prtype, col->mbminmaxlen,
597 col->len,
598 ufield_len,
599 static_cast<char*>(
600 dfield_get_data(
601 &ufield->new_val)))
602 < ufield_len) {
603
604 return(ULINT_UNDEFINED);
605 }
606
607 /* If the parent column type has a different
608 length than the child column type, we may
609 need to pad with spaces the new value of the
610 child column */
611
612 min_size = dict_col_get_min_size(col);
613
614 /* Because UNIV_SQL_NULL (the marker
615 of SQL NULL values) exceeds all possible
616 values of min_size, the test below will
617 not hold for SQL NULL columns. */
618
619 if (min_size > ufield_len) {
620
621 byte* pad;
622 ulint pad_len;
623 byte* padded_data;
624 ulint mbminlen;
625
626 padded_data = static_cast<byte*>(
627 mem_heap_alloc(
628 heap, min_size));
629
630 pad = padded_data + ufield_len;
631 pad_len = min_size - ufield_len;
632
633 memcpy(padded_data,
634 dfield_get_data(&ufield
635 ->new_val),
636 ufield_len);
637
638 mbminlen = dict_col_get_mbminlen(col);
639
640 ut_ad(!(ufield_len % mbminlen));
641 ut_ad(!(min_size % mbminlen));
642
643 if (mbminlen == 1
644 && dtype_get_charset_coll(
645 col->prtype)
646 == DATA_MYSQL_BINARY_CHARSET_COLL) {
647 /* Do not pad BINARY columns */
648 return(ULINT_UNDEFINED);
649 }
650
651 row_mysql_pad_col(mbminlen,
652 pad, pad_len);
653 dfield_set_data(&ufield->new_val,
654 padded_data, min_size);
655 }
656
657 /* Check whether the current column has
658 FTS index on it */
659 if (table->fts
660 && dict_table_is_fts_column(
661 table->fts->indexes,
662 dict_col_get_no(col),
663 dict_col_is_virtual(col))
664 != ULINT_UNDEFINED) {
665 *fts_col_affected = TRUE;
666 }
667
668 /* If Doc ID is updated, check whether the
669 Doc ID is valid */
670 if (table->fts
671 && ufield->field_no == doc_id_pos) {
672 doc_id_t n_doc_id;
673
674 n_doc_id =
675 table->fts->cache->next_doc_id;
676
677 new_doc_id = fts_read_doc_id(
678 static_cast<const byte*>(
679 dfield_get_data(
680 &ufield->new_val)));
681
682 if (new_doc_id <= 0) {
683 ib::error() << "FTS Doc ID"
684 " must be larger than"
685 " 0";
686 return(ULINT_UNDEFINED);
687 }
688
689 if (new_doc_id < n_doc_id) {
690 ib::error() << "FTS Doc ID"
691 " must be larger than "
692 << n_doc_id - 1
693 << " for table "
694 << table->name;
695
696 return(ULINT_UNDEFINED);
697 }
698
699 *fts_col_affected = TRUE;
700 doc_id_updated = TRUE;
701 }
702
703 n_fields_updated++;
704 }
705 }
706 }
707
708 /* Generate a new Doc ID if FTS index columns get updated */
709 if (table->fts && *fts_col_affected) {
710 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
711 doc_id_t doc_id;
712 doc_id_t* next_doc_id;
713 upd_field_t* ufield;
714
715 next_doc_id = static_cast<doc_id_t*>(mem_heap_alloc(
716 heap, sizeof(doc_id_t)));
717
718 ut_ad(!doc_id_updated);
719 ufield = update->fields + n_fields_updated;
720 fts_get_next_doc_id(table, next_doc_id);
721 doc_id = fts_update_doc_id(table, ufield, next_doc_id);
722 n_fields_updated++;
723 fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
724 } else {
725 if (doc_id_updated) {
726 ut_ad(new_doc_id);
727 fts_trx_add_op(trx, table, new_doc_id,
728 FTS_INSERT, NULL);
729 } else {
730 ib::error() << "FTS Doc ID must be updated"
731 " along with FTS indexed column for"
732 " table " << table->name;
733 return(ULINT_UNDEFINED);
734 }
735 }
736 }
737
738 update->n_fields = n_fields_updated;
739
740 return(n_fields_updated);
741 }
742
743 /*********************************************************************//**
744 Set detailed error message associated with foreign key errors for
745 the given transaction. */
746 static
747 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)748 row_ins_set_detailed(
749 /*=================*/
750 trx_t* trx, /*!< in: transaction */
751 dict_foreign_t* foreign) /*!< in: foreign key constraint */
752 {
753 ut_ad(!srv_read_only_mode);
754
755 mutex_enter(&srv_misc_tmpfile_mutex);
756 rewind(srv_misc_tmpfile);
757
758 if (os_file_set_eof(srv_misc_tmpfile)) {
759 ut_print_name(srv_misc_tmpfile, trx,
760 foreign->foreign_table_name);
761 dict_print_info_on_foreign_key_in_create_format(
762 srv_misc_tmpfile, trx, foreign, FALSE);
763 trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
764 } else {
765 trx_set_detailed_error(trx, "temp file operation failed");
766 }
767
768 mutex_exit(&srv_misc_tmpfile_mutex);
769 }
770
771 /*********************************************************************//**
772 Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
773 and displays information about the given transaction.
774 The caller must release dict_foreign_err_mutex. */
775 static
776 void
row_ins_foreign_trx_print(trx_t * trx)777 row_ins_foreign_trx_print(
778 /*======================*/
779 trx_t* trx) /*!< in: transaction */
780 {
781 ulint n_rec_locks;
782 ulint n_trx_locks;
783 ulint heap_size;
784
785 if (srv_read_only_mode) {
786 return;
787 }
788
789 lock_mutex_enter();
790 n_rec_locks = lock_number_of_rows_locked(&trx->lock);
791 n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
792 heap_size = mem_heap_get_size(trx->lock.lock_heap);
793 lock_mutex_exit();
794
795 trx_sys_mutex_enter();
796
797 mutex_enter(&dict_foreign_err_mutex);
798 rewind(dict_foreign_err_file);
799 ut_print_timestamp(dict_foreign_err_file);
800 fputs(" Transaction:\n", dict_foreign_err_file);
801
802 trx_print_low(dict_foreign_err_file, trx, 600,
803 n_rec_locks, n_trx_locks, heap_size);
804
805 trx_sys_mutex_exit();
806
807 ut_ad(mutex_own(&dict_foreign_err_mutex));
808 }
809
810 /*********************************************************************//**
811 Reports a foreign key error associated with an update or a delete of a
812 parent table index entry. */
813 static
814 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)815 row_ins_foreign_report_err(
816 /*=======================*/
817 const char* errstr, /*!< in: error string from the viewpoint
818 of the parent table */
819 que_thr_t* thr, /*!< in: query thread whose run_node
820 is an update node */
821 dict_foreign_t* foreign, /*!< in: foreign key constraint */
822 const rec_t* rec, /*!< in: a matching index record in the
823 child table */
824 const dtuple_t* entry) /*!< in: index entry in the parent
825 table */
826 {
827 if (srv_read_only_mode) {
828 return;
829 }
830
831 FILE* ef = dict_foreign_err_file;
832 trx_t* trx = thr_get_trx(thr);
833
834 row_ins_set_detailed(trx, foreign);
835
836 row_ins_foreign_trx_print(trx);
837
838 fputs("Foreign key constraint fails for table ", ef);
839 ut_print_name(ef, trx, foreign->foreign_table_name);
840 fputs(":\n", ef);
841 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
842 TRUE);
843 putc('\n', ef);
844 fputs(errstr, ef);
845 fprintf(ef, " in parent table, in index %s",
846 foreign->referenced_index->name());
847 if (entry) {
848 fputs(" tuple:\n", ef);
849 dtuple_print(ef, entry);
850 }
851 fputs("\nBut in child table ", ef);
852 ut_print_name(ef, trx, foreign->foreign_table_name);
853 fprintf(ef, ", in index %s", foreign->foreign_index->name());
854 if (rec) {
855 fputs(", there is a record:\n", ef);
856 rec_print(ef, rec, foreign->foreign_index);
857 } else {
858 fputs(", the record is not available\n", ef);
859 }
860 putc('\n', ef);
861
862 mutex_exit(&dict_foreign_err_mutex);
863 }
864
865 /*********************************************************************//**
866 Reports a foreign key error to dict_foreign_err_file when we are trying
867 to add an index entry to a child table. Note that the adding may be the result
868 of an update, too. */
869 static
870 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)871 row_ins_foreign_report_add_err(
872 /*===========================*/
873 trx_t* trx, /*!< in: transaction */
874 dict_foreign_t* foreign, /*!< in: foreign key constraint */
875 const rec_t* rec, /*!< in: a record in the parent table:
876 it does not match entry because we
877 have an error! */
878 const dtuple_t* entry) /*!< in: index entry to insert in the
879 child table */
880 {
881 if (srv_read_only_mode) {
882 return;
883 }
884
885 FILE* ef = dict_foreign_err_file;
886
887 row_ins_set_detailed(trx, foreign);
888
889 row_ins_foreign_trx_print(trx);
890
891 fputs("Foreign key constraint fails for table ", ef);
892 ut_print_name(ef, trx, foreign->foreign_table_name);
893 fputs(":\n", ef);
894 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
895 TRUE);
896 fprintf(ef, "\nTrying to add in child table, in index %s",
897 foreign->foreign_index->name());
898 if (entry) {
899 fputs(" tuple:\n", ef);
900 /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
901 It would be better to only display the user columns. */
902 dtuple_print(ef, entry);
903 }
904 fputs("\nBut in parent table ", ef);
905 ut_print_name(ef, trx, foreign->referenced_table_name);
906 fprintf(ef, ", in index %s,\n"
907 "the closest match we can find is record:\n",
908 foreign->referenced_index->name());
909 if (rec && page_rec_is_supremum(rec)) {
910 /* If the cursor ended on a supremum record, it is better
911 to report the previous record in the error message, so that
912 the user gets a more descriptive error message. */
913 rec = page_rec_get_prev_const(rec);
914 }
915
916 if (rec) {
917 rec_print(ef, rec, foreign->referenced_index);
918 }
919 putc('\n', ef);
920
921 mutex_exit(&dict_foreign_err_mutex);
922 }
923
924 /*********************************************************************//**
925 Invalidate the query cache for the given table. */
926 static
927 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)928 row_ins_invalidate_query_cache(
929 /*===========================*/
930 que_thr_t* thr, /*!< in: query thread whose run_node
931 is an update node */
932 const char* name) /*!< in: table name prefixed with
933 database name and a '/' character */
934 {
935 ulint len = strlen(name) + 1;
936 innobase_invalidate_query_cache(thr_get_trx(thr), name, len);
937 }
938
939 /** Fill virtual column information in cascade node for the child table.
940 @param[out] cascade child update node
941 @param[in] rec clustered rec of child table
942 @param[in] index clustered index of child table
943 @param[in] node parent update node
944 @param[in] foreign foreign key information
945 @param[out] err error code. */
946 static
947 void
row_ins_foreign_fill_virtual(upd_node_t * cascade,const rec_t * rec,dict_index_t * index,upd_node_t * node,dict_foreign_t * foreign,dberr_t * err)948 row_ins_foreign_fill_virtual(
949 upd_node_t* cascade,
950 const rec_t* rec,
951 dict_index_t* index,
952 upd_node_t* node,
953 dict_foreign_t* foreign,
954 dberr_t* err)
955 {
956 row_ext_t* ext;
957 THD* thd = current_thd;
958 ulint offsets_[REC_OFFS_NORMAL_SIZE];
959 mem_heap_t* v_heap = NULL;
960 upd_t* update = cascade->update;
961 rec_offs_init(offsets_);
962 const ulint* offsets =
963 rec_get_offsets(rec, index, offsets_,
964 ULINT_UNDEFINED, &update->heap);
965 ulint n_v_fld = index->table->n_v_def;
966 ulint n_diff;
967 upd_field_t* upd_field;
968 dict_vcol_set* v_cols = foreign->v_cols;
969 row_prebuilt_t* prebuilt =
970 static_cast<que_thr_t*>(node->common.parent)->prebuilt;
971
972 update->old_vrow = row_build(
973 ROW_COPY_POINTERS, index, rec,
974 offsets, index->table, NULL, NULL,
975 &ext, update->heap);
976
977 n_diff = update->n_fields;
978
979 update->n_fields += n_v_fld;
980
981 if (index->table->vc_templ == NULL) {
982 /** This can occur when there is a cascading
983 delete or update after restart. */
984 innobase_init_vc_templ(index->table);
985 }
986
987 for (ulint i = 0; i < n_v_fld; i++) {
988
989 dict_v_col_t* col = dict_table_get_nth_v_col(
990 index->table, i);
991
992 dict_vcol_set::iterator it = v_cols->find(col);
993
994 if (it == v_cols->end()) {
995 continue;
996 }
997
998 dfield_t* vfield = innobase_get_computed_value(
999 update->old_vrow, col, index,
1000 &v_heap, update->heap, NULL, thd, NULL,
1001 NULL, NULL, NULL, prebuilt);
1002
1003 if (vfield == NULL) {
1004 *err = DB_COMPUTE_VALUE_FAILED;
1005 goto func_exit;
1006 }
1007
1008 upd_field = upd_get_nth_field(update, n_diff);
1009
1010 upd_field->old_v_val = static_cast<dfield_t*>(
1011 mem_heap_alloc(update->heap,
1012 sizeof *upd_field->old_v_val));
1013
1014 dfield_copy(upd_field->old_v_val, vfield);
1015
1016 upd_field_set_v_field_no(upd_field, i, index);
1017
1018 if (node->is_delete
1019 ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1020 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1021 uint32_t col_match_count =
1022 dict_vcol_base_is_foreign_key(col, foreign);
1023 if (col_match_count == col->num_base) {
1024 /* If all base columns of virtual col are
1025 in FK */
1026 dfield_set_null(&upd_field->new_val);
1027 } else if (col_match_count == 0) {
1028 /* If no base column of virtual col is in FK */
1029 dfield_copy(&(upd_field->new_val), vfield);
1030 } else {
1031 /* If at least one base column of virtual col
1032 is in FK */
1033 for (uint32_t j = 0; j < col->num_base; j++) {
1034 dict_col_t *base_col = col->base_col[j];
1035 uint32_t col_no = base_col->ind;
1036 dfield_t *row_field =
1037 innobase_get_field_from_update_vector(
1038 foreign, node->update, col_no);
1039 if (row_field != NULL) {
1040 dfield_set_null(row_field);
1041 }
1042 }
1043 dfield_t *new_vfield = innobase_get_computed_value(
1044 update->old_vrow, col, index, &v_heap,
1045 update->heap, NULL, thd, NULL,
1046 NULL, node->update, foreign, prebuilt);
1047 dfield_copy(&(upd_field->new_val), new_vfield);
1048 }
1049
1050 }
1051
1052 if (!node->is_delete
1053 && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1054
1055 dfield_t* new_vfield = innobase_get_computed_value(
1056 update->old_vrow, col, index,
1057 &v_heap, update->heap, NULL, thd,
1058 NULL, NULL, node->update, foreign,
1059 prebuilt);
1060
1061 if (new_vfield == NULL) {
1062 *err = DB_COMPUTE_VALUE_FAILED;
1063 goto func_exit;
1064 }
1065
1066 dfield_copy(&(upd_field->new_val), new_vfield);
1067 }
1068
1069 n_diff++;
1070 }
1071
1072 update->n_fields = n_diff;
1073 *err = DB_SUCCESS;
1074
1075 func_exit:
1076 if (v_heap) {
1077 mem_heap_free(v_heap);
1078 }
1079 }
1080
1081 /*********************************************************************//**
1082 Perform referential actions or checks when a parent row is deleted or updated
1083 and the constraint had an ON DELETE or ON UPDATE condition which was not
1084 RESTRICT.
1085 @return DB_SUCCESS, DB_LOCK_WAIT, or error code */
1086 static MY_ATTRIBUTE((warn_unused_result))
1087 dberr_t
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)1088 row_ins_foreign_check_on_constraint(
1089 /*================================*/
1090 que_thr_t* thr, /*!< in: query thread whose run_node
1091 is an update node */
1092 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
1093 type is != 0 */
1094 btr_pcur_t* pcur, /*!< in: cursor placed on a matching
1095 index record in the child table */
1096 dtuple_t* entry, /*!< in: index entry in the parent
1097 table */
1098 mtr_t* mtr) /*!< in: mtr holding the latch of pcur
1099 page */
1100 {
1101 upd_node_t* node;
1102 upd_node_t* cascade;
1103 dict_table_t* table = foreign->foreign_table;
1104 dict_index_t* index;
1105 dict_index_t* clust_index;
1106 dtuple_t* ref;
1107 const rec_t* rec;
1108 const rec_t* clust_rec;
1109 const buf_block_t* clust_block;
1110 upd_t* update;
1111 ulint n_to_update;
1112 dberr_t err;
1113 ulint i;
1114 trx_t* trx;
1115 mem_heap_t* tmp_heap = NULL;
1116 doc_id_t doc_id = FTS_NULL_DOC_ID;
1117 ibool fts_col_affacted = FALSE;
1118
1119 DBUG_ENTER("row_ins_foreign_check_on_constraint");
1120 ut_a(thr);
1121 ut_a(foreign);
1122 ut_a(pcur);
1123 ut_a(mtr);
1124
1125 trx = thr_get_trx(thr);
1126
1127 /* Since we are going to delete or update a row, we have to invalidate
1128 the MySQL query cache for table. A deadlock of threads is not possible
1129 here because the caller of this function does not hold any latches with
1130 the mutex rank above the lock_sys_t::mutex. The query cache mutex
1131 has a rank just above the lock_sys_t::mutex. */
1132
1133 row_ins_invalidate_query_cache(thr, table->name.m_name);
1134
1135 node = static_cast<upd_node_t*>(thr->run_node);
1136
1137 if (node->is_delete && 0 == (foreign->type
1138 & (DICT_FOREIGN_ON_DELETE_CASCADE
1139 | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
1140
1141 row_ins_foreign_report_err("Trying to delete",
1142 thr, foreign,
1143 btr_pcur_get_rec(pcur), entry);
1144
1145 DBUG_RETURN(DB_ROW_IS_REFERENCED);
1146 }
1147
1148 if (!node->is_delete && 0 == (foreign->type
1149 & (DICT_FOREIGN_ON_UPDATE_CASCADE
1150 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
1151
1152 /* This is an UPDATE */
1153
1154 row_ins_foreign_report_err("Trying to update",
1155 thr, foreign,
1156 btr_pcur_get_rec(pcur), entry);
1157
1158 DBUG_RETURN(DB_ROW_IS_REFERENCED);
1159 }
1160
1161 if (node->cascade_node == NULL) {
1162 node->cascade_heap = mem_heap_create(128);
1163 node->cascade_node = row_create_update_node_for_mysql(
1164 table, node->cascade_heap);
1165 que_node_set_parent(node->cascade_node, node);
1166
1167 }
1168 cascade = node->cascade_node;
1169 cascade->table = table;
1170 cascade->foreign = foreign;
1171
1172 if (node->is_delete
1173 && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1174 cascade->is_delete = TRUE;
1175 } else {
1176 cascade->is_delete = FALSE;
1177
1178 if (foreign->n_fields > cascade->update_n_fields) {
1179 /* We have to make the update vector longer */
1180
1181 cascade->update = upd_create(foreign->n_fields,
1182 node->cascade_heap);
1183 cascade->update_n_fields = foreign->n_fields;
1184 }
1185 }
1186
1187 /* We do not allow cyclic cascaded updating (DELETE is allowed,
1188 but not UPDATE) of the same table, as this can lead to an infinite
1189 cycle. Check that we are not updating the same table which is
1190 already being modified in this cascade chain. We have to check
1191 this also because the modification of the indexes of a 'parent'
1192 table may still be incomplete, and we must avoid seeing the indexes
1193 of the parent table in an inconsistent state! */
1194
1195 if (!cascade->is_delete
1196 && row_ins_cascade_ancestor_updates_table(cascade, table)) {
1197
1198 /* We do not know if this would break foreign key
1199 constraints, but play safe and return an error */
1200
1201 err = DB_ROW_IS_REFERENCED;
1202
1203 row_ins_foreign_report_err(
1204 "Trying an update, possibly causing a cyclic"
1205 " cascaded update\n"
1206 "in the child table,", thr, foreign,
1207 btr_pcur_get_rec(pcur), entry);
1208
1209 goto nonstandard_exit_func;
1210 }
1211
1212 if (row_ins_cascade_n_ancestors(cascade) >= FK_MAX_CASCADE_DEL) {
1213 err = DB_FOREIGN_EXCEED_MAX_CASCADE;
1214
1215 row_ins_foreign_report_err(
1216 "Trying a too deep cascaded delete or update\n",
1217 thr, foreign, btr_pcur_get_rec(pcur), entry);
1218
1219 goto nonstandard_exit_func;
1220 }
1221
1222 index = btr_pcur_get_btr_cur(pcur)->index;
1223
1224 ut_a(index == foreign->foreign_index);
1225
1226 rec = btr_pcur_get_rec(pcur);
1227
1228 tmp_heap = mem_heap_create(256);
1229
1230 if (dict_index_is_clust(index)) {
1231 /* pcur is already positioned in the clustered index of
1232 the child table */
1233
1234 clust_index = index;
1235 clust_rec = rec;
1236 clust_block = btr_pcur_get_block(pcur);
1237 } else {
1238 /* We have to look for the record in the clustered index
1239 in the child table */
1240
1241 clust_index = dict_table_get_first_index(table);
1242
1243 ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1244 tmp_heap);
1245 btr_pcur_open_with_no_init(clust_index, ref,
1246 PAGE_CUR_LE, BTR_SEARCH_LEAF,
1247 cascade->pcur, 0, mtr);
1248
1249 clust_rec = btr_pcur_get_rec(cascade->pcur);
1250 clust_block = btr_pcur_get_block(cascade->pcur);
1251
1252 if (!page_rec_is_user_rec(clust_rec)
1253 || btr_pcur_get_low_match(cascade->pcur)
1254 < dict_index_get_n_unique(clust_index)) {
1255
1256 ib::error() << "In cascade of a foreign key op index "
1257 << index->name
1258 << " of table " << index->table->name;
1259
1260 fputs("InnoDB: record ", stderr);
1261 rec_print(stderr, rec, index);
1262 fputs("\n"
1263 "InnoDB: clustered record ", stderr);
1264 rec_print(stderr, clust_rec, clust_index);
1265 fputs("\n"
1266 "InnoDB: Submit a detailed bug report to"
1267 " http://bugs.mysql.com\n", stderr);
1268 ut_ad(0);
1269 err = DB_SUCCESS;
1270
1271 goto nonstandard_exit_func;
1272 }
1273 }
1274
1275 /* Set an X-lock on the row to delete or update in the child table */
1276
1277 err = lock_table(0, table, LOCK_IX, thr);
1278
1279 if (err == DB_SUCCESS) {
1280 /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1281 we already have a normal shared lock on the appropriate
1282 gap if the search criterion was not unique */
1283
1284 err = lock_clust_rec_read_check_and_lock_alt(
1285 0, clust_block, clust_rec, clust_index,
1286 LOCK_X, LOCK_REC_NOT_GAP, thr);
1287 }
1288
1289 if (err != DB_SUCCESS) {
1290
1291 goto nonstandard_exit_func;
1292 }
1293
1294 if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1295 /* This can happen if there is a circular reference of
1296 rows such that cascading delete comes to delete a row
1297 already in the process of being delete marked */
1298 err = DB_SUCCESS;
1299
1300 goto nonstandard_exit_func;
1301 }
1302
1303
1304 if (table->fts) {
1305 doc_id = fts_get_doc_id_from_rec(table, clust_rec,
1306 clust_index, tmp_heap);
1307 }
1308
1309 /* A cascade delete from the parent table triggers delete on the child
1310 table. Before a clustered index record is deleted in the child table,
1311 a copy of row is built to remove secondary index records. This copy of
1312 the row requires virtual columns to be materialized. Hence, if child
1313 table has any virtual columns, we have to initialize virtual column
1314 template */
1315 if (cascade->is_delete && dict_table_get_n_v_cols(table) > 0
1316 && table->vc_templ == NULL) {
1317 innobase_init_vc_templ(table);
1318 }
1319 if (node->is_delete
1320 ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1321 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1322 /* Build the appropriate update vector which sets
1323 foreign->n_fields first fields in rec to SQL NULL */
1324
1325 update = cascade->update;
1326
1327 update->info_bits = 0;
1328 update->n_fields = foreign->n_fields;
1329 UNIV_MEM_INVALID(update->fields,
1330 update->n_fields * sizeof *update->fields);
1331
1332 for (i = 0; i < foreign->n_fields; i++) {
1333 upd_field_t* ufield = &update->fields[i];
1334 ulint col_no = dict_index_get_nth_col_no(
1335 index, i);
1336
1337 ufield->field_no = dict_table_get_nth_col_pos(
1338 table, col_no);
1339 dict_col_t* col = dict_table_get_nth_col(
1340 table, col_no);
1341 dict_col_copy_type(col, dfield_get_type(&ufield->new_val));
1342
1343 ufield->orig_len = 0;
1344 ufield->exp = NULL;
1345 dfield_set_null(&ufield->new_val);
1346
1347 if (table->fts && dict_table_is_fts_column(
1348 table->fts->indexes,
1349 dict_index_get_nth_col_no(index, i),
1350 dict_col_is_virtual(
1351 dict_index_get_nth_col(index, i)))
1352 != ULINT_UNDEFINED) {
1353 fts_col_affacted = TRUE;
1354 }
1355 }
1356
1357 if (fts_col_affacted) {
1358 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1359 }
1360
1361 if (foreign->v_cols != NULL
1362 && foreign->v_cols->size() > 0) {
1363 row_ins_foreign_fill_virtual(
1364 cascade, clust_rec, clust_index,
1365 node, foreign, &err);
1366
1367 if (err != DB_SUCCESS) {
1368 goto nonstandard_exit_func;
1369 }
1370 }
1371
1372 } else if (table->fts && cascade->is_delete) {
1373 /* DICT_FOREIGN_ON_DELETE_CASCADE case */
1374 for (i = 0; i < foreign->n_fields; i++) {
1375 if (table->fts && dict_table_is_fts_column(
1376 table->fts->indexes,
1377 dict_index_get_nth_col_no(index, i),
1378 dict_col_is_virtual(
1379 dict_index_get_nth_col(index, i)))
1380 != ULINT_UNDEFINED) {
1381 fts_col_affacted = TRUE;
1382 }
1383 }
1384
1385 if (fts_col_affacted) {
1386 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1387 }
1388 }
1389
1390 if (!node->is_delete
1391 && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1392
1393 /* Build the appropriate update vector which sets changing
1394 foreign->n_fields first fields in rec to new values */
1395
1396 n_to_update = row_ins_cascade_calc_update_vec(
1397 node, foreign, tmp_heap,
1398 trx, &fts_col_affacted);
1399
1400
1401 if (foreign->v_cols != NULL
1402 && foreign->v_cols->size() > 0) {
1403 row_ins_foreign_fill_virtual(
1404 cascade, clust_rec, clust_index,
1405 node, foreign, &err);
1406
1407 if (err != DB_SUCCESS) {
1408 goto nonstandard_exit_func;
1409 }
1410 }
1411
1412 if (n_to_update == ULINT_UNDEFINED) {
1413 err = DB_ROW_IS_REFERENCED;
1414
1415 row_ins_foreign_report_err(
1416 "Trying a cascaded update where the"
1417 " updated value in the child\n"
1418 "table would not fit in the length"
1419 " of the column, or the value would\n"
1420 "be NULL and the column is"
1421 " declared as not NULL in the child table,",
1422 thr, foreign, btr_pcur_get_rec(pcur), entry);
1423
1424 goto nonstandard_exit_func;
1425 }
1426
1427 if (cascade->update->n_fields == 0) {
1428
1429 /* The update does not change any columns referred
1430 to in this foreign key constraint: no need to do
1431 anything */
1432
1433 err = DB_SUCCESS;
1434
1435 goto nonstandard_exit_func;
1436 }
1437
1438 /* Mark the old Doc ID as deleted */
1439 if (fts_col_affacted) {
1440 ut_ad(table->fts);
1441 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1442 }
1443 }
1444
1445 /* Store pcur position and initialize or store the cascade node
1446 pcur stored position */
1447
1448 btr_pcur_store_position(pcur, mtr);
1449
1450 if (index == clust_index) {
1451 btr_pcur_copy_stored_position(cascade->pcur, pcur);
1452 } else {
1453 btr_pcur_store_position(cascade->pcur, mtr);
1454 }
1455
1456 mtr_commit(mtr);
1457
1458 ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1459
1460 cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1461
1462 err = row_update_cascade_for_mysql(thr, cascade,
1463 foreign->foreign_table);
1464
1465 /* Release the data dictionary latch for a while, so that we do not
1466 starve other threads from doing CREATE TABLE etc. if we have a huge
1467 cascaded operation running. The counter n_foreign_key_checks_running
1468 will prevent other users from dropping or ALTERing the table when we
1469 release the latch. */
1470
1471 row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1472
1473 DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1474
1475 row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1476
1477 mtr_start(mtr);
1478
1479 /* Restore pcur position */
1480
1481 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1482
1483 if (tmp_heap) {
1484 mem_heap_free(tmp_heap);
1485 }
1486
1487 DBUG_RETURN(err);
1488
1489 nonstandard_exit_func:
1490
1491 if (tmp_heap) {
1492 mem_heap_free(tmp_heap);
1493 }
1494
1495 btr_pcur_store_position(pcur, mtr);
1496
1497 mtr_commit(mtr);
1498 mtr_start(mtr);
1499
1500 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1501
1502 DBUG_RETURN(err);
1503 }
1504
1505 /*********************************************************************//**
1506 Sets a shared lock on a record. Used in locking possible duplicate key
1507 records and also in checking foreign key constraints.
1508 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1509 static
1510 dberr_t
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1511 row_ins_set_shared_rec_lock(
1512 /*========================*/
1513 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1514 LOCK_REC_NOT_GAP type lock */
1515 const buf_block_t* block, /*!< in: buffer block of rec */
1516 const rec_t* rec, /*!< in: record */
1517 dict_index_t* index, /*!< in: index */
1518 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1519 que_thr_t* thr) /*!< in: query thread */
1520 {
1521 dberr_t err;
1522
1523 ut_ad(rec_offs_validate(rec, index, offsets));
1524
1525 if (dict_index_is_clust(index)) {
1526 err = lock_clust_rec_read_check_and_lock(
1527 0, block, rec, index, offsets, LOCK_S, type, thr);
1528 } else {
1529 err = lock_sec_rec_read_check_and_lock(
1530 0, block, rec, index, offsets, LOCK_S, type, thr);
1531 }
1532
1533 return(err);
1534 }
1535
1536 /*********************************************************************//**
1537 Sets a exclusive lock on a record. Used in locking possible duplicate key
1538 records
1539 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1540 static
1541 dberr_t
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1542 row_ins_set_exclusive_rec_lock(
1543 /*===========================*/
1544 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1545 LOCK_REC_NOT_GAP type lock */
1546 const buf_block_t* block, /*!< in: buffer block of rec */
1547 const rec_t* rec, /*!< in: record */
1548 dict_index_t* index, /*!< in: index */
1549 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1550 que_thr_t* thr) /*!< in: query thread */
1551 {
1552 dberr_t err;
1553
1554 ut_ad(rec_offs_validate(rec, index, offsets));
1555
1556 if (dict_index_is_clust(index)) {
1557 err = lock_clust_rec_read_check_and_lock(
1558 0, block, rec, index, offsets, LOCK_X, type, thr);
1559 } else {
1560 err = lock_sec_rec_read_check_and_lock(
1561 0, block, rec, index, offsets, LOCK_X, type, thr);
1562 }
1563
1564 return(err);
1565 }
1566
1567 /* Decrement a counter in the destructor. */
1568 class ib_dec_in_dtor {
1569 public:
ib_dec_in_dtor(ulint & c)1570 ib_dec_in_dtor(ulint& c): counter(c) {}
~ib_dec_in_dtor()1571 ~ib_dec_in_dtor() {
1572 os_atomic_decrement_ulint(&counter, 1);
1573 }
1574 private:
1575 ulint& counter;
1576 };
1577
1578 /***************************************************************//**
1579 Checks if foreign key constraint fails for an index entry. Sets shared locks
1580 which lock either the success or the failure of the constraint. NOTE that
1581 the caller must have a shared latch on dict_operation_lock.
1582 @return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1583 dberr_t
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1584 row_ins_check_foreign_constraint(
1585 /*=============================*/
1586 ibool check_ref,/*!< in: TRUE if we want to check that
1587 the referenced table is ok, FALSE if we
1588 want to check the foreign key table */
1589 dict_foreign_t* foreign,/*!< in: foreign constraint; NOTE that the
1590 tables mentioned in it must be in the
1591 dictionary cache if they exist at all */
1592 dict_table_t* table, /*!< in: if check_ref is TRUE, then the foreign
1593 table, else the referenced table */
1594 dtuple_t* entry, /*!< in: index entry for index */
1595 que_thr_t* thr) /*!< in: query thread */
1596 {
1597 dberr_t err;
1598 upd_node_t* upd_node;
1599 dict_table_t* check_table;
1600 dict_index_t* check_index;
1601 ulint n_fields_cmp;
1602 btr_pcur_t pcur;
1603 int cmp;
1604 mtr_t mtr;
1605 trx_t* trx = thr_get_trx(thr);
1606 mem_heap_t* heap = NULL;
1607 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1608 ulint* offsets = offsets_;
1609
1610 bool skip_gap_lock;
1611
1612 skip_gap_lock = (trx->isolation_level <= TRX_ISO_READ_COMMITTED);
1613
1614 DBUG_ENTER("row_ins_check_foreign_constraint");
1615
1616 rec_offs_init(offsets_);
1617
1618 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_S));
1619
1620 err = DB_SUCCESS;
1621
1622 if (trx->check_foreigns == FALSE) {
1623 /* The user has suppressed foreign key checks currently for
1624 this session */
1625 goto exit_func;
1626 }
1627
1628 /* If any of the foreign key fields in entry is SQL NULL, we
1629 suppress the foreign key check: this is compatible with Oracle,
1630 for example */
1631 for (ulint i = 0; i < foreign->n_fields; i++) {
1632 if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1633 goto exit_func;
1634 }
1635 }
1636
1637 if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1638 upd_node = static_cast<upd_node_t*>(thr->run_node);
1639
1640 if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1641 /* If a cascaded update is done as defined by a
1642 foreign key constraint, do not check that
1643 constraint for the child row. In ON UPDATE CASCADE
1644 the update of the parent row is only half done when
1645 we come here: if we would check the constraint here
1646 for the child row it would fail.
1647
1648 A QUESTION remains: if in the child table there are
1649 several constraints which refer to the same parent
1650 table, we should merge all updates to the child as
1651 one update? And the updates can be contradictory!
1652 Currently we just perform the update associated
1653 with each foreign key constraint, one after
1654 another, and the user has problems predicting in
1655 which order they are performed. */
1656
1657 goto exit_func;
1658 }
1659 }
1660
1661 if (check_ref) {
1662 check_table = foreign->referenced_table;
1663 check_index = foreign->referenced_index;
1664 } else {
1665 check_table = foreign->foreign_table;
1666 check_index = foreign->foreign_index;
1667 }
1668
1669 if (check_table == NULL
1670 || check_table->file_unreadable
1671 || check_index == NULL
1672 || fil_space_is_being_truncated(check_table->space)) {
1673
1674 if (!srv_read_only_mode && check_ref) {
1675 FILE* ef = dict_foreign_err_file;
1676
1677 row_ins_set_detailed(trx, foreign);
1678
1679 row_ins_foreign_trx_print(trx);
1680
1681 fputs("Foreign key constraint fails for table ", ef);
1682 ut_print_name(ef, trx,
1683 foreign->foreign_table_name);
1684 fputs(":\n", ef);
1685 dict_print_info_on_foreign_key_in_create_format(
1686 ef, trx, foreign, TRUE);
1687 fprintf(ef, "\nTrying to add to index %s tuple:\n",
1688 foreign->foreign_index->name());
1689 dtuple_print(ef, entry);
1690 fputs("\nBut the parent table ", ef);
1691 ut_print_name(ef, trx,
1692 foreign->referenced_table_name);
1693 fputs("\nor its .ibd file does"
1694 " not currently exist!, or"
1695 " is undergoing truncate!\n", ef);
1696 mutex_exit(&dict_foreign_err_mutex);
1697
1698 err = DB_NO_REFERENCED_ROW;
1699 }
1700
1701 goto exit_func;
1702 }
1703
1704 if (check_table != table) {
1705 /* We already have a LOCK_IX on table, but not necessarily
1706 on check_table */
1707
1708 err = lock_table(0, check_table, LOCK_IS, thr);
1709
1710 if (err != DB_SUCCESS) {
1711
1712 goto do_possible_lock_wait;
1713 }
1714 }
1715
1716 mtr_start(&mtr);
1717
1718 /* Store old value on n_fields_cmp */
1719
1720 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1721
1722 dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1723
1724 btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1725 BTR_SEARCH_LEAF, &pcur, &mtr);
1726
1727 /* Scan index records and check if there is a matching record */
1728
1729 do {
1730 const rec_t* rec = btr_pcur_get_rec(&pcur);
1731 const buf_block_t* block = btr_pcur_get_block(&pcur);
1732
1733 SRV_CORRUPT_TABLE_CHECK(block,
1734 {
1735 err = DB_CORRUPTION;
1736 goto exit_loop;
1737 });
1738
1739 if (page_rec_is_infimum(rec)) {
1740
1741 continue;
1742 }
1743
1744 offsets = rec_get_offsets(rec, check_index,
1745 offsets, ULINT_UNDEFINED, &heap);
1746
1747 if (page_rec_is_supremum(rec)) {
1748
1749 if (skip_gap_lock) {
1750
1751 continue;
1752 }
1753
1754 err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1755 rec, check_index,
1756 offsets, thr);
1757 switch (err) {
1758 case DB_SUCCESS_LOCKED_REC:
1759 case DB_SUCCESS:
1760 continue;
1761 default:
1762 goto end_scan;
1763 }
1764 }
1765
1766 cmp = cmp_dtuple_rec(entry, rec, offsets);
1767
1768 if (cmp == 0) {
1769
1770 ulint lock_type;
1771
1772 lock_type = skip_gap_lock
1773 ? LOCK_REC_NOT_GAP
1774 : LOCK_ORDINARY;
1775
1776 if (rec_get_deleted_flag(rec,
1777 rec_offs_comp(offsets))) {
1778 err = row_ins_set_shared_rec_lock(
1779 lock_type, block,
1780 rec, check_index, offsets, thr);
1781 switch (err) {
1782 case DB_SUCCESS_LOCKED_REC:
1783 case DB_SUCCESS:
1784 break;
1785 default:
1786 goto end_scan;
1787 }
1788 } else {
1789 /* Found a matching record. Lock only
1790 a record because we can allow inserts
1791 into gaps */
1792
1793 err = row_ins_set_shared_rec_lock(
1794 LOCK_REC_NOT_GAP, block,
1795 rec, check_index, offsets, thr);
1796
1797 switch (err) {
1798 case DB_SUCCESS_LOCKED_REC:
1799 case DB_SUCCESS:
1800 break;
1801 default:
1802 goto end_scan;
1803 }
1804
1805 if (check_ref) {
1806 err = DB_SUCCESS;
1807
1808 goto end_scan;
1809 } else if (foreign->type != 0) {
1810 /* There is an ON UPDATE or ON DELETE
1811 condition: check them in a separate
1812 function */
1813
1814 err = row_ins_foreign_check_on_constraint(
1815 thr, foreign, &pcur, entry,
1816 &mtr);
1817 if (err != DB_SUCCESS) {
1818 /* Since reporting a plain
1819 "duplicate key" error
1820 message to the user in
1821 cases where a long CASCADE
1822 operation would lead to a
1823 duplicate key in some
1824 other table is very
1825 confusing, map duplicate
1826 key errors resulting from
1827 FK constraints to a
1828 separate error code. */
1829
1830 if (err == DB_DUPLICATE_KEY) {
1831 err = DB_FOREIGN_DUPLICATE_KEY;
1832 }
1833
1834 goto end_scan;
1835 }
1836
1837 /* row_ins_foreign_check_on_constraint
1838 may have repositioned pcur on a
1839 different block */
1840 block = btr_pcur_get_block(&pcur);
1841 } else {
1842 row_ins_foreign_report_err(
1843 "Trying to delete or update",
1844 thr, foreign, rec, entry);
1845
1846 err = DB_ROW_IS_REFERENCED;
1847 goto end_scan;
1848 }
1849 }
1850 } else {
1851 ut_a(cmp < 0);
1852
1853 err = DB_SUCCESS;
1854
1855 if (!skip_gap_lock) {
1856 err = row_ins_set_shared_rec_lock(
1857 LOCK_GAP, block,
1858 rec, check_index, offsets, thr);
1859 }
1860
1861 switch (err) {
1862 case DB_SUCCESS_LOCKED_REC:
1863 case DB_SUCCESS:
1864 if (check_ref) {
1865 err = DB_NO_REFERENCED_ROW;
1866 row_ins_foreign_report_add_err(
1867 trx, foreign, rec, entry);
1868 } else {
1869 err = DB_SUCCESS;
1870 }
1871 default:
1872 break;
1873 }
1874
1875 goto end_scan;
1876 }
1877 } while (btr_pcur_move_to_next(&pcur, &mtr));
1878
1879 exit_loop:
1880 if (check_ref) {
1881 row_ins_foreign_report_add_err(
1882 trx, foreign, btr_pcur_get_rec(&pcur), entry);
1883 err = DB_NO_REFERENCED_ROW;
1884 } else {
1885 err = DB_SUCCESS;
1886 }
1887
1888 end_scan:
1889 btr_pcur_close(&pcur);
1890
1891 mtr_commit(&mtr);
1892
1893 /* Restore old value */
1894 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1895
1896 do_possible_lock_wait:
1897 if (err == DB_LOCK_WAIT) {
1898 /* An object that will correctly decrement the FK check counter
1899 when it goes out of this scope. */
1900 ib_dec_in_dtor dec(check_table->n_foreign_key_checks_running);
1901
1902 trx->error_state = err;
1903
1904 que_thr_stop_for_mysql(thr);
1905
1906 thr->lock_state = QUE_THR_LOCK_ROW;
1907
1908 /* To avoid check_table being dropped, increment counter */
1909 os_atomic_increment_ulint(
1910 &check_table->n_foreign_key_checks_running, 1);
1911
1912 trx_kill_blocking(trx);
1913
1914 lock_wait_suspend_thread(thr);
1915
1916 thr->lock_state = QUE_THR_LOCK_NOLOCK;
1917
1918 if(trx->error_state != DB_SUCCESS) {
1919 err = trx->error_state;
1920 goto exit_func;
1921 }
1922
1923 DBUG_PRINT("to_be_dropped",
1924 ("table: %s", check_table->name.m_name));
1925 if (check_table->to_be_dropped) {
1926 /* The table is being dropped. We shall timeout
1927 this operation */
1928 err = DB_LOCK_WAIT_TIMEOUT;
1929
1930 goto exit_func;
1931 }
1932 }
1933
1934
1935 exit_func:
1936 if (heap != NULL) {
1937 mem_heap_free(heap);
1938 }
1939
1940 DEBUG_SYNC_C("finished_scanning_index");
1941 DBUG_RETURN(err);
1942 }
1943
1944 /***************************************************************//**
1945 Checks if foreign key constraints fail for an index entry. If index
1946 is not mentioned in any constraint, this function does nothing,
1947 Otherwise does searches to the indexes of referenced tables and
1948 sets shared locks which lock either the success or the failure of
1949 a constraint.
1950 @return DB_SUCCESS or error code */
1951 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1952 dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1953 row_ins_check_foreign_constraints(
1954 /*==============================*/
1955 dict_table_t* table, /*!< in: table */
1956 dict_index_t* index, /*!< in: index */
1957 dtuple_t* entry, /*!< in: index entry for index */
1958 que_thr_t* thr) /*!< in: query thread */
1959 {
1960 dict_foreign_t* foreign;
1961 dberr_t err;
1962 trx_t* trx;
1963 ibool got_s_lock = FALSE;
1964
1965 trx = thr_get_trx(thr);
1966
1967 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1968 "foreign_constraint_check_for_ins");
1969
1970 for (dict_foreign_set::iterator it = table->foreign_set.begin();
1971 it != table->foreign_set.end();
1972 ++it) {
1973
1974 foreign = *it;
1975
1976 if (foreign->foreign_index == index) {
1977 dict_table_t* ref_table = NULL;
1978 dict_table_t* foreign_table = foreign->foreign_table;
1979 dict_table_t* referenced_table
1980 = foreign->referenced_table;
1981
1982 if (referenced_table == NULL) {
1983
1984 ref_table = dict_table_open_on_name(
1985 foreign->referenced_table_name_lookup,
1986 FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1987 }
1988
1989 if (0 == trx->dict_operation_lock_mode) {
1990 got_s_lock = TRUE;
1991
1992 row_mysql_freeze_data_dictionary(trx);
1993 }
1994
1995 if (referenced_table) {
1996 os_atomic_increment_ulint(
1997 &foreign_table->n_foreign_key_checks_running, 1);
1998 }
1999
2000 /* NOTE that if the thread ends up waiting for a lock
2001 we will release dict_operation_lock temporarily!
2002 But the counter on the table protects the referenced
2003 table from being dropped while the check is running. */
2004
2005 err = row_ins_check_foreign_constraint(
2006 TRUE, foreign, table, entry, thr);
2007
2008 if (referenced_table) {
2009 os_atomic_decrement_ulint(
2010 &foreign_table->n_foreign_key_checks_running, 1);
2011 }
2012
2013 if (got_s_lock) {
2014 row_mysql_unfreeze_data_dictionary(trx);
2015 }
2016
2017 if (ref_table != NULL) {
2018 dict_table_close(ref_table, FALSE, FALSE);
2019 }
2020
2021 if (err != DB_SUCCESS) {
2022
2023 return(err);
2024 }
2025 }
2026 }
2027
2028 return(DB_SUCCESS);
2029 }
2030
2031 /***************************************************************//**
2032 Checks if a unique key violation to rec would occur at the index entry
2033 insert.
2034 @return TRUE if error */
2035 static
2036 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)2037 row_ins_dupl_error_with_rec(
2038 /*========================*/
2039 const rec_t* rec, /*!< in: user record; NOTE that we assume
2040 that the caller already has a record lock on
2041 the record! */
2042 const dtuple_t* entry, /*!< in: entry to insert */
2043 dict_index_t* index, /*!< in: index */
2044 const ulint* offsets)/*!< in: rec_get_offsets(rec, index) */
2045 {
2046 ulint matched_fields;
2047 ulint n_unique;
2048 ulint i;
2049
2050 ut_ad(rec_offs_validate(rec, index, offsets));
2051
2052 n_unique = dict_index_get_n_unique(index);
2053
2054 matched_fields = 0;
2055
2056 cmp_dtuple_rec_with_match(entry, rec, offsets, &matched_fields);
2057
2058 if (matched_fields < n_unique) {
2059
2060 return(FALSE);
2061 }
2062
2063 /* In a unique secondary index we allow equal key values if they
2064 contain SQL NULLs */
2065
2066 if (!dict_index_is_clust(index) && !index->nulls_equal) {
2067
2068 for (i = 0; i < n_unique; i++) {
2069 if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
2070
2071 return(FALSE);
2072 }
2073 }
2074 }
2075
2076 return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2077 }
2078
2079 /***************************************************************//**
2080 Scans a unique non-clustered index at a given index entry to determine
2081 whether a uniqueness violation has occurred for the key value of the entry.
2082 Set shared locks on possible duplicate records.
2083 @return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
2084 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2085 dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)2086 row_ins_scan_sec_index_for_duplicate(
2087 /*=================================*/
2088 ulint flags, /*!< in: undo logging and locking flags */
2089 dict_index_t* index, /*!< in: non-clustered unique index */
2090 dtuple_t* entry, /*!< in: index entry */
2091 que_thr_t* thr, /*!< in: query thread */
2092 bool s_latch,/*!< in: whether index->lock is being held */
2093 mtr_t* mtr, /*!< in/out: mini-transaction */
2094 mem_heap_t* offsets_heap)
2095 /*!< in/out: memory heap that can be emptied */
2096 {
2097 ulint n_unique;
2098 int cmp;
2099 ulint n_fields_cmp;
2100 btr_pcur_t pcur;
2101 dberr_t err = DB_SUCCESS;
2102 ulint allow_duplicates;
2103 ulint* offsets = NULL;
2104 DBUG_ENTER("row_ins_scan_sec_index_for_duplicate");
2105
2106
2107 ut_ad(s_latch == rw_lock_own_flagged(
2108 &index->lock, RW_LOCK_FLAG_S | RW_LOCK_FLAG_SX));
2109
2110 n_unique = dict_index_get_n_unique(index);
2111
2112 /* If the secondary index is unique, but one of the fields in the
2113 n_unique first fields is NULL, a unique key violation cannot occur,
2114 since we define NULL != NULL in this case */
2115
2116 if (!index->nulls_equal) {
2117 for (ulint i = 0; i < n_unique; i++) {
2118 if (UNIV_SQL_NULL == dfield_get_len(
2119 dtuple_get_nth_field(entry, i))) {
2120
2121 DBUG_RETURN(DB_SUCCESS);
2122 }
2123 }
2124 }
2125
2126 /* Store old value on n_fields_cmp */
2127
2128 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
2129
2130 dtuple_set_n_fields_cmp(entry, n_unique);
2131
2132 btr_pcur_open(index, entry, PAGE_CUR_GE,
2133 s_latch
2134 ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED
2135 : BTR_SEARCH_LEAF,
2136 &pcur, mtr);
2137
2138 allow_duplicates = thr_get_trx(thr)->duplicates;
2139
2140 /* Scan index records and check if there is a duplicate */
2141
2142 do {
2143 const rec_t* rec = btr_pcur_get_rec(&pcur);
2144 const buf_block_t* block = btr_pcur_get_block(&pcur);
2145 const ulint lock_type = LOCK_ORDINARY;
2146
2147 if (page_rec_is_infimum(rec)) {
2148
2149 continue;
2150 }
2151
2152 offsets = rec_get_offsets(rec, index, offsets,
2153 ULINT_UNDEFINED, &offsets_heap);
2154
2155 if (flags & BTR_NO_LOCKING_FLAG) {
2156 /* Set no locks when applying log
2157 in online table rebuild. */
2158 } else if (allow_duplicates) {
2159
2160 /* If the SQL-query will update or replace
2161 duplicate key we will take X-lock for
2162 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2163 INSERT ON DUPLICATE KEY UPDATE). */
2164
2165 err = row_ins_set_exclusive_rec_lock(
2166 lock_type, block, rec, index, offsets, thr);
2167 } else {
2168
2169 err = row_ins_set_shared_rec_lock(
2170 lock_type, block, rec, index, offsets, thr);
2171 }
2172
2173 switch (err) {
2174 case DB_SUCCESS_LOCKED_REC:
2175 err = DB_SUCCESS;
2176 case DB_SUCCESS:
2177 break;
2178 default:
2179 goto end_scan;
2180 }
2181
2182 if (page_rec_is_supremum(rec)) {
2183
2184 continue;
2185 }
2186
2187 cmp = cmp_dtuple_rec(entry, rec, offsets);
2188
2189 if (cmp == 0 && !index->allow_duplicates) {
2190 if (row_ins_dupl_error_with_rec(rec, entry,
2191 index, offsets)) {
2192 err = DB_DUPLICATE_KEY;
2193
2194 thr_get_trx(thr)->error_index = index;
2195
2196 /* If the duplicate is on hidden FTS_DOC_ID,
2197 state so in the error log */
2198 if (index == index->table->fts_doc_id_index
2199 && DICT_TF2_FLAG_IS_SET(
2200 index->table,
2201 DICT_TF2_FTS_HAS_DOC_ID)) {
2202
2203 ib::error() << "Duplicate FTS_DOC_ID"
2204 " value on table "
2205 << index->table->name;
2206 }
2207
2208 goto end_scan;
2209 }
2210 } else {
2211 ut_a(cmp < 0 || index->allow_duplicates);
2212 goto end_scan;
2213 }
2214 } while (btr_pcur_move_to_next(&pcur, mtr));
2215
2216 end_scan:
2217 /* Restore old value */
2218 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2219
2220 DBUG_RETURN(err);
2221 }
2222
2223 /** Checks for a duplicate when the table is being rebuilt online.
2224 @retval DB_SUCCESS when no duplicate is detected
2225 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2226 a newer version of entry (the entry should not be inserted)
2227 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2228 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2229 dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,ulint * offsets)2230 row_ins_duplicate_online(
2231 /*=====================*/
2232 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2233 const dtuple_t* entry, /*!< in: entry that is being inserted */
2234 const rec_t* rec, /*!< in: clustered index record */
2235 ulint* offsets)/*!< in/out: rec_get_offsets(rec) */
2236 {
2237 ulint fields = 0;
2238
2239 /* During rebuild, there should not be any delete-marked rows
2240 in the new table. */
2241 ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2242 ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2243
2244 /* Compare the PRIMARY KEY fields and the
2245 DB_TRX_ID, DB_ROLL_PTR. */
2246 cmp_dtuple_rec_with_match_low(
2247 entry, rec, offsets, n_uniq + 2, &fields);
2248
2249 if (fields < n_uniq) {
2250 /* Not a duplicate. */
2251 return(DB_SUCCESS);
2252 }
2253
2254 if (fields == n_uniq + 2) {
2255 /* rec is an exact match of entry. */
2256 return(DB_SUCCESS_LOCKED_REC);
2257 }
2258
2259 return(DB_DUPLICATE_KEY);
2260 }
2261
2262 /** Checks for a duplicate when the table is being rebuilt online.
2263 @retval DB_SUCCESS when no duplicate is detected
2264 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2265 a newer version of entry (the entry should not be inserted)
2266 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2267 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2268 dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2269 row_ins_duplicate_error_in_clust_online(
2270 /*====================================*/
2271 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2272 const dtuple_t* entry, /*!< in: entry that is being inserted */
2273 const btr_cur_t*cursor, /*!< in: cursor on insert position */
2274 ulint** offsets,/*!< in/out: rec_get_offsets(rec) */
2275 mem_heap_t** heap) /*!< in/out: heap for offsets */
2276 {
2277 dberr_t err = DB_SUCCESS;
2278 const rec_t* rec = btr_cur_get_rec(cursor);
2279
2280 if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2281 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2282 ULINT_UNDEFINED, heap);
2283 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2284 if (err != DB_SUCCESS) {
2285 return(err);
2286 }
2287 }
2288
2289 rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2290
2291 if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2292 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2293 ULINT_UNDEFINED, heap);
2294 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2295 }
2296
2297 return(err);
2298 }
2299
2300 /***************************************************************//**
2301 Checks if a unique key violation error would occur at an index entry
2302 insert. Sets shared locks on possible duplicate records. Works only
2303 for a clustered index!
2304 @retval DB_SUCCESS if no error
2305 @retval DB_DUPLICATE_KEY if error,
2306 @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2307 record
2308 @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2309 in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2310 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2311 dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2312 row_ins_duplicate_error_in_clust(
2313 /*=============================*/
2314 ulint flags, /*!< in: undo logging and locking flags */
2315 btr_cur_t* cursor, /*!< in: B-tree cursor */
2316 const dtuple_t* entry, /*!< in: entry to insert */
2317 que_thr_t* thr, /*!< in: query thread */
2318 mtr_t* mtr) /*!< in: mtr */
2319 {
2320 dberr_t err;
2321 rec_t* rec;
2322 ulint n_unique;
2323 trx_t* trx = thr_get_trx(thr);
2324 mem_heap_t*heap = NULL;
2325 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2326 ulint* offsets = offsets_;
2327 rec_offs_init(offsets_);
2328
2329 UT_NOT_USED(mtr);
2330
2331 ut_ad(dict_index_is_clust(cursor->index));
2332
2333 /* NOTE: For unique non-clustered indexes there may be any number
2334 of delete marked records with the same value for the non-clustered
2335 index key (remember multiversioning), and which differ only in
2336 the row refererence part of the index record, containing the
2337 clustered index key fields. For such a secondary index record,
2338 to avoid race condition, we must FIRST do the insertion and after
2339 that check that the uniqueness condition is not breached! */
2340
2341 /* NOTE: A problem is that in the B-tree node pointers on an
2342 upper level may match more to the entry than the actual existing
2343 user records on the leaf level. So, even if low_match would suggest
2344 that a duplicate key violation may occur, this may not be the case. */
2345
2346 n_unique = dict_index_get_n_unique(cursor->index);
2347
2348 if (cursor->low_match >= n_unique) {
2349
2350 rec = btr_cur_get_rec(cursor);
2351
2352 if (!page_rec_is_infimum(rec)) {
2353 offsets = rec_get_offsets(rec, cursor->index, offsets,
2354 ULINT_UNDEFINED, &heap);
2355
2356 /* We set a lock on the possible duplicate: this
2357 is needed in logical logging of MySQL to make
2358 sure that in roll-forward we get the same duplicate
2359 errors as in original execution */
2360
2361 if (flags & BTR_NO_LOCKING_FLAG) {
2362 /* Do nothing if no-locking is set */
2363 err = DB_SUCCESS;
2364 } else if (trx->duplicates) {
2365
2366 /* If the SQL-query will update or replace
2367 duplicate key we will take X-lock for
2368 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2369 INSERT ON DUPLICATE KEY UPDATE). */
2370
2371 err = row_ins_set_exclusive_rec_lock(
2372 LOCK_REC_NOT_GAP,
2373 btr_cur_get_block(cursor),
2374 rec, cursor->index, offsets, thr);
2375 } else {
2376
2377 err = row_ins_set_shared_rec_lock(
2378 LOCK_REC_NOT_GAP,
2379 btr_cur_get_block(cursor), rec,
2380 cursor->index, offsets, thr);
2381 }
2382
2383 switch (err) {
2384 case DB_SUCCESS_LOCKED_REC:
2385 case DB_SUCCESS:
2386 break;
2387 default:
2388 goto func_exit;
2389 }
2390
2391 if (row_ins_dupl_error_with_rec(
2392 rec, entry, cursor->index, offsets)) {
2393 duplicate:
2394 trx->error_index = cursor->index;
2395 err = DB_DUPLICATE_KEY;
2396 goto func_exit;
2397 }
2398 }
2399 }
2400
2401 if (cursor->up_match >= n_unique) {
2402
2403 rec = page_rec_get_next(btr_cur_get_rec(cursor));
2404
2405 if (!page_rec_is_supremum(rec)) {
2406 offsets = rec_get_offsets(rec, cursor->index, offsets,
2407 ULINT_UNDEFINED, &heap);
2408
2409 if (trx->duplicates) {
2410
2411 /* If the SQL-query will update or replace
2412 duplicate key we will take X-lock for
2413 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2414 INSERT ON DUPLICATE KEY UPDATE). */
2415
2416 err = row_ins_set_exclusive_rec_lock(
2417 LOCK_REC_NOT_GAP,
2418 btr_cur_get_block(cursor),
2419 rec, cursor->index, offsets, thr);
2420 } else {
2421
2422 err = row_ins_set_shared_rec_lock(
2423 LOCK_REC_NOT_GAP,
2424 btr_cur_get_block(cursor),
2425 rec, cursor->index, offsets, thr);
2426 }
2427
2428 switch (err) {
2429 case DB_SUCCESS_LOCKED_REC:
2430 case DB_SUCCESS:
2431 break;
2432 default:
2433 goto func_exit;
2434 }
2435
2436 if (row_ins_dupl_error_with_rec(
2437 rec, entry, cursor->index, offsets)) {
2438 goto duplicate;
2439 }
2440 }
2441
2442 /* This should never happen */
2443 ut_error;
2444 }
2445
2446 err = DB_SUCCESS;
2447 func_exit:
2448 if (UNIV_LIKELY_NULL(heap)) {
2449 mem_heap_free(heap);
2450 }
2451 return(err);
2452 }
2453
2454 /***************************************************************//**
2455 Checks if an index entry has long enough common prefix with an
2456 existing record so that the intended insert of the entry must be
2457 changed to a modify of the existing record. In the case of a clustered
2458 index, the prefix must be n_unique fields long. In the case of a
2459 secondary index, all fields must be equal. InnoDB never updates
2460 secondary index records in place, other than clearing or setting the
2461 delete-mark flag. We could be able to update the non-unique fields
2462 of a unique secondary index record by checking the cursor->up_match,
2463 but we do not do so, because it could have some locking implications.
2464 @return TRUE if the existing record should be updated; FALSE if not */
2465 UNIV_INLINE
2466 ibool
row_ins_must_modify_rec(const btr_cur_t * cursor)2467 row_ins_must_modify_rec(
2468 /*====================*/
2469 const btr_cur_t* cursor) /*!< in: B-tree cursor */
2470 {
2471 /* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2472 Because node pointers on upper levels of the B-tree may match more
2473 to entry than to actual user records on the leaf level, we
2474 have to check if the candidate record is actually a user record.
2475 A clustered index node pointer contains index->n_unique first fields,
2476 and a secondary index node pointer contains all index fields. */
2477
2478 return(cursor->low_match
2479 >= dict_index_get_n_unique_in_tree(cursor->index)
2480 && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2481 }
2482
2483 /***************************************************************//**
2484 Tries to insert an entry into a clustered index, ignoring foreign key
2485 constraints. If a record with the same unique key is found, the other
2486 record is necessarily marked deleted by a committed transaction, or a
2487 unique key violation error occurs. The delete marked record is then
2488 updated to an existing record, and we must write an undo log record on
2489 the delete marked record.
2490 @retval DB_SUCCESS on success
2491 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2492 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2493 @return error code */
2494 dberr_t
row_ins_clust_index_entry_low(ulint flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,ulint n_ext,que_thr_t * thr,bool dup_chk_only)2495 row_ins_clust_index_entry_low(
2496 /*==========================*/
2497 ulint flags, /*!< in: undo logging and locking flags */
2498 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2499 depending on whether we wish optimistic or
2500 pessimistic descent down the index tree */
2501 dict_index_t* index, /*!< in: clustered index */
2502 ulint n_uniq, /*!< in: 0 or index->n_uniq */
2503 dtuple_t* entry, /*!< in/out: index entry to insert */
2504 ulint n_ext, /*!< in: number of externally stored columns */
2505 que_thr_t* thr, /*!< in: query thread */
2506 bool dup_chk_only)
2507 /*!< in: if true, just do duplicate check
2508 and return. don't execute actual insert. */
2509 {
2510 btr_pcur_t pcur;
2511 btr_cur_t* cursor;
2512 dberr_t err = DB_SUCCESS;
2513 big_rec_t* big_rec = NULL;
2514 mtr_t mtr;
2515 mem_heap_t* offsets_heap = NULL;
2516 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2517 ulint* offsets = offsets_;
2518 rec_offs_init(offsets_);
2519
2520 DBUG_ENTER("row_ins_clust_index_entry_low");
2521
2522 ut_ad(dict_index_is_clust(index));
2523 ut_ad(!dict_index_is_unique(index)
2524 || n_uniq == dict_index_get_n_unique(index));
2525 ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2526 ut_ad(!thr_get_trx(thr)->in_rollback);
2527
2528 mtr_start(&mtr);
2529 mtr.set_named_space(index->space);
2530
2531 if (dict_table_is_temporary(index->table)) {
2532 /* Disable REDO logging as the lifetime of temp-tables is
2533 limited to server or connection lifetime and so REDO
2534 information is not needed on restart for recovery.
2535 Disable locking as temp-tables are local to a connection. */
2536
2537 ut_ad(flags & BTR_NO_LOCKING_FLAG);
2538 ut_ad(!dict_table_is_intrinsic(index->table)
2539 || (flags & BTR_NO_UNDO_LOG_FLAG));
2540
2541 mtr.set_log_mode(MTR_LOG_NO_REDO);
2542 }
2543
2544 if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2545 mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
2546 mtr_s_lock(dict_index_get_lock(index), &mtr);
2547 }
2548
2549 /* Note that we use PAGE_CUR_LE as the search mode, because then
2550 the function will return in both low_match and up_match of the
2551 cursor sensible values */
2552 err = btr_pcur_open(index, entry, PAGE_CUR_LE, mode, &pcur, &mtr);
2553
2554 if (err != DB_SUCCESS) {
2555 index->table->set_file_unreadable();
2556 mtr.commit();
2557 goto func_exit;
2558 }
2559
2560 cursor = btr_pcur_get_btr_cur(&pcur);
2561 cursor->thr = thr;
2562
2563 ut_ad(!dict_table_is_intrinsic(index->table)
2564 || cursor->page_cur.block->made_dirty_with_no_latch);
2565
2566 #ifdef UNIV_DEBUG
2567 {
2568 page_t* page = btr_cur_get_page(cursor);
2569 rec_t* first_rec = page_rec_get_next(
2570 page_get_infimum_rec(page));
2571
2572 ut_ad(page_rec_is_supremum(first_rec)
2573 || rec_n_fields_is_sane(index, first_rec, entry));
2574 }
2575 #endif /* UNIV_DEBUG */
2576
2577 /* Allowing duplicates in clustered index is currently enabled
2578 only for intrinsic table and caller understand the limited
2579 operation that can be done in this case. */
2580 ut_ad(!index->allow_duplicates
2581 || (index->allow_duplicates
2582 && dict_table_is_intrinsic(index->table)));
2583
2584 if (!index->allow_duplicates
2585 && n_uniq
2586 && (cursor->up_match >= n_uniq || cursor->low_match >= n_uniq)) {
2587
2588 if (flags
2589 == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2590 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
2591 /* Set no locks when applying log
2592 in online table rebuild. Only check for duplicates. */
2593 err = row_ins_duplicate_error_in_clust_online(
2594 n_uniq, entry, cursor,
2595 &offsets, &offsets_heap);
2596
2597 switch (err) {
2598 case DB_SUCCESS:
2599 break;
2600 default:
2601 ut_ad(0);
2602 /* fall through */
2603 case DB_SUCCESS_LOCKED_REC:
2604 case DB_DUPLICATE_KEY:
2605 thr_get_trx(thr)->error_index = cursor->index;
2606 }
2607 } else {
2608 /* Note that the following may return also
2609 DB_LOCK_WAIT */
2610
2611 err = row_ins_duplicate_error_in_clust(
2612 flags, cursor, entry, thr, &mtr);
2613 }
2614
2615 if (err != DB_SUCCESS) {
2616 err_exit:
2617 mtr_commit(&mtr);
2618 goto func_exit;
2619 }
2620 }
2621
2622 if (dup_chk_only) {
2623 mtr_commit(&mtr);
2624 goto func_exit;
2625 }
2626
2627 /* Note: Allowing duplicates would qualify for modification of
2628 an existing record as the new entry is exactly same as old entry.
2629 Avoid this check if allow duplicates is enabled. */
2630 if (!index->allow_duplicates && row_ins_must_modify_rec(cursor)) {
2631 /* There is already an index entry with a long enough common
2632 prefix, we must convert the insert into a modify of an
2633 existing record */
2634 mem_heap_t* entry_heap = mem_heap_create(1024);
2635
2636 /* If the existing record is being modified and the new record
2637 doesn't fit the provided slot then existing record is added
2638 to free list and new record is inserted. This also means
2639 cursor that we have cached for SELECT is now invalid. */
2640 if(index->last_sel_cur) {
2641 index->last_sel_cur->invalid = true;
2642 }
2643
2644 err = row_ins_clust_index_entry_by_modify(
2645 &pcur, flags, mode, &offsets, &offsets_heap,
2646 entry_heap, entry, thr, &mtr);
2647
2648 if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2649 row_log_table_insert(btr_cur_get_rec(cursor), entry,
2650 index, offsets);
2651 }
2652
2653 mtr_commit(&mtr);
2654 mem_heap_free(entry_heap);
2655 } else {
2656 rec_t* insert_rec;
2657
2658 if (mode != BTR_MODIFY_TREE) {
2659 ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2660 == BTR_MODIFY_LEAF);
2661 err = btr_cur_optimistic_insert(
2662 flags, cursor, &offsets, &offsets_heap,
2663 entry, &insert_rec, &big_rec,
2664 n_ext, thr, &mtr);
2665 } else {
2666 if (buf_LRU_buf_pool_running_out()) {
2667
2668 err = DB_LOCK_TABLE_FULL;
2669 goto err_exit;
2670 }
2671
2672 DEBUG_SYNC_C("before_insert_pessimitic_row_ins_clust");
2673
2674 err = btr_cur_optimistic_insert(
2675 flags, cursor,
2676 &offsets, &offsets_heap,
2677 entry, &insert_rec, &big_rec,
2678 n_ext, thr, &mtr);
2679
2680 if (err == DB_FAIL) {
2681 err = btr_cur_pessimistic_insert(
2682 flags, cursor,
2683 &offsets, &offsets_heap,
2684 entry, &insert_rec, &big_rec,
2685 n_ext, thr, &mtr);
2686 }
2687 }
2688
2689 if (big_rec != NULL) {
2690 mtr_commit(&mtr);
2691
2692 /* Online table rebuild could read (and
2693 ignore) the incomplete record at this point.
2694 If online rebuild is in progress, the
2695 row_ins_index_entry_big_rec() will write log. */
2696
2697 DBUG_EXECUTE_IF(
2698 "row_ins_extern_checkpoint",
2699 log_make_checkpoint_at(
2700 LSN_MAX, TRUE););
2701 err = row_ins_index_entry_big_rec(
2702 entry, big_rec, offsets, &offsets_heap, index,
2703 thr_get_trx(thr)->mysql_thd,
2704 __FILE__, __LINE__);
2705 dtuple_convert_back_big_rec(index, entry, big_rec);
2706 } else {
2707 if (err == DB_SUCCESS
2708 && dict_index_is_online_ddl(index)) {
2709 row_log_table_insert(
2710 insert_rec, entry, index, offsets);
2711 }
2712
2713 mtr_commit(&mtr);
2714 }
2715 }
2716
2717 func_exit:
2718 if (offsets_heap != NULL) {
2719 mem_heap_free(offsets_heap);
2720 }
2721
2722 btr_pcur_close(&pcur);
2723
2724 DBUG_RETURN(err);
2725 }
2726
2727 /** This is a specialized function meant for direct insertion to
2728 auto-generated clustered index based on cached position from
2729 last successful insert. To be used when data is sorted.
2730
2731 @param[in] mode BTR_MODIFY_LEAF or BTR_MODIFY_TREE.
2732 depending on whether we wish optimistic or
2733 pessimistic descent down the index tree
2734 @param[in,out] index clustered index
2735 @param[in,out] entry index entry to insert
2736 @param[in] thr query thread
2737
2738 @return error code */
2739 static
2740 dberr_t
row_ins_sorted_clust_index_entry(ulint mode,dict_index_t * index,dtuple_t * entry,ulint n_ext,que_thr_t * thr)2741 row_ins_sorted_clust_index_entry(
2742 ulint mode,
2743 dict_index_t* index,
2744 dtuple_t* entry,
2745 ulint n_ext,
2746 que_thr_t* thr)
2747 {
2748 dberr_t err;
2749 mtr_t* mtr;
2750 const bool commit_mtr = mode == BTR_MODIFY_TREE;
2751
2752 mem_heap_t* offsets_heap = NULL;
2753 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2754 ulint* offsets = offsets_;
2755 rec_offs_init(offsets_);
2756
2757 DBUG_ENTER("row_ins_sorted_clust_index_entry");
2758
2759 ut_ad(index->last_ins_cur != NULL);
2760 ut_ad(dict_index_is_clust(index));
2761 ut_ad(dict_table_is_intrinsic(index->table));
2762 ut_ad(dict_index_is_auto_gen_clust(index));
2763
2764 btr_cur_t cursor;
2765 cursor.thr = thr;
2766 mtr = &index->last_ins_cur->mtr;
2767
2768 /* Search for position if tree needs to be split or if last position
2769 is not cached. */
2770 if (mode == BTR_MODIFY_TREE
2771 || index->last_ins_cur->rec == NULL
2772 || index->last_ins_cur->disable_caching) {
2773
2774 /* Commit the previous mtr. */
2775 index->last_ins_cur->release();
2776
2777 mtr_start(mtr);
2778 mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
2779
2780 btr_cur_search_to_nth_level_with_no_latch(
2781 index, 0, entry, PAGE_CUR_LE, &cursor,
2782 __FILE__, __LINE__, mtr);
2783 ut_ad(cursor.page_cur.block != NULL);
2784 ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2785 } else {
2786 cursor.index = index;
2787
2788 cursor.page_cur.index = index;
2789
2790 cursor.page_cur.rec = index->last_ins_cur->rec;
2791
2792 cursor.page_cur.block = index->last_ins_cur->block;
2793 }
2794
2795 const ulint flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
2796
2797 for (;;) {
2798 rec_t* insert_rec;
2799 big_rec_t* big_rec = NULL;
2800
2801 if (mode != BTR_MODIFY_TREE) {
2802 ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2803 == BTR_MODIFY_LEAF);
2804
2805 err = btr_cur_optimistic_insert(
2806 flags, &cursor, &offsets, &offsets_heap, entry,
2807 &insert_rec, &big_rec, n_ext, thr, mtr);
2808 if (err != DB_SUCCESS) {
2809 break;
2810 }
2811 } else {
2812 /* TODO: Check if this is needed for intrinsic table. */
2813 if (buf_LRU_buf_pool_running_out()) {
2814 err = DB_LOCK_TABLE_FULL;
2815 break;
2816 }
2817
2818 err = btr_cur_optimistic_insert(
2819 flags, &cursor, &offsets, &offsets_heap, entry,
2820 &insert_rec, &big_rec, n_ext, thr, mtr);
2821
2822 if (err == DB_FAIL) {
2823 err = btr_cur_pessimistic_insert(
2824 flags, &cursor, &offsets, &offsets_heap,
2825 entry, &insert_rec, &big_rec, n_ext,
2826 thr, mtr);
2827 }
2828 }
2829
2830 if (big_rec != NULL) {
2831 /* If index involves big-record optimization is
2832 turned-off. */
2833 index->last_ins_cur->release();
2834 index->last_ins_cur->disable_caching = true;
2835
2836 err = row_ins_index_entry_big_rec(
2837 entry, big_rec, offsets, &offsets_heap, index,
2838 thr_get_trx(thr)->mysql_thd, __FILE__, __LINE__);
2839
2840 dtuple_convert_back_big_rec(index, entry, big_rec);
2841
2842 } else if (err == DB_SUCCESS ) {
2843 if (!commit_mtr
2844 && !index->last_ins_cur->disable_caching) {
2845 index->last_ins_cur->rec = insert_rec;
2846
2847 index->last_ins_cur->block
2848 = cursor.page_cur.block;
2849 } else {
2850 index->last_ins_cur->release();
2851 }
2852 }
2853
2854 break;
2855 }
2856
2857 if (err != DB_SUCCESS) {
2858 index->last_ins_cur->release();
2859 }
2860
2861 if (offsets_heap != NULL) {
2862 mem_heap_free(offsets_heap);
2863 }
2864
2865 DBUG_RETURN(err);
2866 }
2867
2868 /** Start a mini-transaction and check if the index will be dropped.
2869 @param[in,out] mtr mini-transaction
2870 @param[in,out] index secondary index
2871 @param[in] check whether to check
2872 @param[in] search_mode flags
2873 @return true if the index is to be dropped */
2874 static MY_ATTRIBUTE((warn_unused_result))
2875 bool
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2876 row_ins_sec_mtr_start_and_check_if_aborted(
2877 mtr_t* mtr,
2878 dict_index_t* index,
2879 bool check,
2880 ulint search_mode)
2881 {
2882 ut_ad(!dict_index_is_clust(index));
2883 ut_ad(mtr->is_named_space(index->space));
2884
2885 const mtr_log_t log_mode = mtr->get_log_mode();
2886
2887 mtr_start(mtr);
2888 mtr->set_named_space(index->space);
2889 mtr->set_log_mode(log_mode);
2890
2891 if (!check) {
2892 return(false);
2893 }
2894
2895 if (search_mode & BTR_ALREADY_S_LATCHED) {
2896 mtr_s_lock(dict_index_get_lock(index), mtr);
2897 } else {
2898 mtr_sx_lock(dict_index_get_lock(index), mtr);
2899 }
2900
2901 switch (index->online_status) {
2902 case ONLINE_INDEX_ABORTED:
2903 case ONLINE_INDEX_ABORTED_DROPPED:
2904 ut_ad(!index->is_committed());
2905 return(true);
2906 case ONLINE_INDEX_COMPLETE:
2907 return(false);
2908 case ONLINE_INDEX_CREATION:
2909 break;
2910 }
2911
2912 ut_error;
2913 return(true);
2914 }
2915
2916 /***************************************************************//**
2917 Tries to insert an entry into a secondary index. If a record with exactly the
2918 same fields is found, the other record is necessarily marked deleted.
2919 It is then unmarked. Otherwise, the entry is just inserted to the index.
2920 @retval DB_SUCCESS on success
2921 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2922 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2923 @return error code */
2924 dberr_t
row_ins_sec_index_entry_low(ulint flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr,bool dup_chk_only)2925 row_ins_sec_index_entry_low(
2926 /*========================*/
2927 ulint flags, /*!< in: undo logging and locking flags */
2928 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2929 depending on whether we wish optimistic or
2930 pessimistic descent down the index tree */
2931 dict_index_t* index, /*!< in: secondary index */
2932 mem_heap_t* offsets_heap,
2933 /*!< in/out: memory heap that can be emptied */
2934 mem_heap_t* heap, /*!< in/out: memory heap */
2935 dtuple_t* entry, /*!< in/out: index entry to insert */
2936 trx_id_t trx_id, /*!< in: PAGE_MAX_TRX_ID during
2937 row_log_table_apply(), or 0 */
2938 que_thr_t* thr, /*!< in: query thread */
2939 bool dup_chk_only)
2940 /*!< in: if true, just do duplicate check
2941 and return. don't execute actual insert. */
2942 {
2943 DBUG_ENTER("row_ins_sec_index_entry_low");
2944
2945 btr_cur_t cursor;
2946 ulint search_mode = mode;
2947 dberr_t err = DB_SUCCESS;
2948 ulint n_unique;
2949 mtr_t mtr;
2950 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2951 ulint* offsets = offsets_;
2952 rec_offs_init(offsets_);
2953 rtr_info_t rtr_info;
2954
2955 ut_ad(!dict_index_is_clust(index));
2956 ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2957
2958 cursor.thr = thr;
2959 cursor.rtr_info = NULL;
2960 ut_ad(thr_get_trx(thr)->id != 0
2961 || dict_table_is_intrinsic(index->table));
2962
2963 mtr_start(&mtr);
2964 mtr.set_named_space(index->space);
2965
2966 if (dict_table_is_temporary(index->table)) {
2967 /* Disable REDO logging as the lifetime of temp-tables is
2968 limited to server or connection lifetime and so REDO
2969 information is not needed on restart for recovery.
2970 Disable locking as temp-tables are local to a connection. */
2971
2972 ut_ad(flags & BTR_NO_LOCKING_FLAG);
2973 ut_ad(!dict_table_is_intrinsic(index->table)
2974 || (flags & BTR_NO_UNDO_LOG_FLAG));
2975
2976 mtr.set_log_mode(MTR_LOG_NO_REDO);
2977 } else if (!dict_index_is_spatial(index)) {
2978 /* Enable insert buffering if it's neither temp-table
2979 nor spatial index. */
2980 search_mode |= BTR_INSERT;
2981 }
2982
2983 /* Ensure that we acquire index->lock when inserting into an
2984 index with index->online_status == ONLINE_INDEX_COMPLETE, but
2985 could still be subject to rollback_inplace_alter_table().
2986 This prevents a concurrent change of index->online_status.
2987 The memory object cannot be freed as long as we have an open
2988 reference to the table, or index->table->n_ref_count > 0. */
2989 const bool check = !index->is_committed();
2990 if (check) {
2991 DEBUG_SYNC_C("row_ins_sec_index_enter");
2992 if (mode == BTR_MODIFY_LEAF) {
2993 search_mode |= BTR_ALREADY_S_LATCHED;
2994 mtr_s_lock(dict_index_get_lock(index), &mtr);
2995 } else {
2996 mtr_sx_lock(dict_index_get_lock(index), &mtr);
2997 }
2998
2999 if (row_log_online_op_try(
3000 index, entry, thr_get_trx(thr)->id)) {
3001 goto func_exit;
3002 }
3003 }
3004
3005 /* Note that we use PAGE_CUR_LE as the search mode, because then
3006 the function will return in both low_match and up_match of the
3007 cursor sensible values */
3008
3009 if (!thr_get_trx(thr)->check_unique_secondary) {
3010 search_mode |= BTR_IGNORE_SEC_UNIQUE;
3011 }
3012
3013 if (dict_index_is_spatial(index)) {
3014 cursor.index = index;
3015 rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
3016 rtr_info_update_btr(&cursor, &rtr_info);
3017
3018 err = btr_cur_search_to_nth_level(
3019 index, 0, entry, PAGE_CUR_RTREE_INSERT,
3020 search_mode,
3021 &cursor, 0, __FILE__, __LINE__, &mtr);
3022
3023 if (mode == BTR_MODIFY_LEAF && rtr_info.mbr_adj) {
3024 mtr_commit(&mtr);
3025 rtr_clean_rtr_info(&rtr_info, true);
3026 rtr_init_rtr_info(&rtr_info, false, &cursor,
3027 index, false);
3028 rtr_info_update_btr(&cursor, &rtr_info);
3029 mtr_start(&mtr);
3030 mtr.set_named_space(index->space);
3031 search_mode &= ~BTR_MODIFY_LEAF;
3032 search_mode |= BTR_MODIFY_TREE;
3033 err = btr_cur_search_to_nth_level(
3034 index, 0, entry, PAGE_CUR_RTREE_INSERT,
3035 search_mode,
3036 &cursor, 0, __FILE__, __LINE__, &mtr);
3037 mode = BTR_MODIFY_TREE;
3038 }
3039
3040 DBUG_EXECUTE_IF(
3041 "rtree_test_check_count", {
3042 goto func_exit;});
3043
3044 } else {
3045 if (dict_table_is_intrinsic(index->table)) {
3046 btr_cur_search_to_nth_level_with_no_latch(
3047 index, 0, entry, PAGE_CUR_LE, &cursor,
3048 __FILE__, __LINE__, &mtr);
3049 ut_ad(cursor.page_cur.block != NULL);
3050 ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
3051 } else {
3052 err = btr_cur_search_to_nth_level(
3053 index, 0, entry, PAGE_CUR_LE,
3054 search_mode,
3055 &cursor, 0, __FILE__, __LINE__, &mtr);
3056 }
3057 }
3058
3059 if (err != DB_SUCCESS) {
3060 if (err == DB_DECRYPTION_FAILED) {
3061 ib::warn() << "Table is encrypted but encryption service or"
3062 " used key_id is not available. "
3063 " Can't continue reading table.";
3064 index->table->set_file_unreadable();
3065 }
3066 goto func_exit;
3067 }
3068
3069 if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
3070 ut_ad(!dict_index_is_spatial(index));
3071 /* The insert was buffered during the search: we are done */
3072 goto func_exit;
3073 }
3074
3075 #ifdef UNIV_DEBUG
3076 {
3077 page_t* page = btr_cur_get_page(&cursor);
3078 rec_t* first_rec = page_rec_get_next(
3079 page_get_infimum_rec(page));
3080
3081 ut_ad(page_rec_is_supremum(first_rec)
3082 || rec_n_fields_is_sane(index, first_rec, entry));
3083 }
3084 #endif /* UNIV_DEBUG */
3085
3086 n_unique = dict_index_get_n_unique(index);
3087
3088 if (dict_index_is_unique(index)
3089 && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
3090 mtr_commit(&mtr);
3091
3092 DEBUG_SYNC_C("row_ins_sec_index_unique");
3093
3094 if (row_ins_sec_mtr_start_and_check_if_aborted(
3095 &mtr, index, check, search_mode)) {
3096 goto func_exit;
3097 }
3098
3099 err = row_ins_scan_sec_index_for_duplicate(
3100 flags, index, entry, thr, check, &mtr, offsets_heap);
3101
3102 mtr_commit(&mtr);
3103
3104 switch (err) {
3105 case DB_SUCCESS:
3106 break;
3107 case DB_DUPLICATE_KEY:
3108 if (!index->is_committed()) {
3109 ut_ad(!thr_get_trx(thr)
3110 ->dict_operation_lock_mode);
3111 mutex_enter(&dict_sys->mutex);
3112 dict_set_corrupted_index_cache_only(index);
3113 mutex_exit(&dict_sys->mutex);
3114 /* Do not return any error to the
3115 caller. The duplicate will be reported
3116 by ALTER TABLE or CREATE UNIQUE INDEX.
3117 Unfortunately we cannot report the
3118 duplicate key value to the DDL thread,
3119 because the altered_table object is
3120 private to its call stack. */
3121 err = DB_SUCCESS;
3122 }
3123 /* fall through */
3124 default:
3125 if (dict_index_is_spatial(index)) {
3126 rtr_clean_rtr_info(&rtr_info, true);
3127 }
3128 DBUG_RETURN(err);
3129 }
3130
3131 if (row_ins_sec_mtr_start_and_check_if_aborted(
3132 &mtr, index, check, search_mode)) {
3133 goto func_exit;
3134 }
3135
3136 DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
3137
3138 /* We did not find a duplicate and we have now
3139 locked with s-locks the necessary records to
3140 prevent any insertion of a duplicate by another
3141 transaction. Let us now reposition the cursor and
3142 continue the insertion. */
3143 if (dict_table_is_intrinsic(index->table)) {
3144 btr_cur_search_to_nth_level_with_no_latch(
3145 index, 0, entry, PAGE_CUR_LE, &cursor,
3146 __FILE__, __LINE__, &mtr);
3147 ut_ad(cursor.page_cur.block != NULL);
3148 ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
3149 } else {
3150 btr_cur_search_to_nth_level(
3151 index, 0, entry, PAGE_CUR_LE,
3152 (search_mode
3153 & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE)),
3154 &cursor, 0, __FILE__, __LINE__, &mtr);
3155 }
3156 }
3157
3158 if (dup_chk_only) {
3159 goto func_exit;
3160 }
3161
3162 if (row_ins_must_modify_rec(&cursor)) {
3163 /* If the existing record is being modified and the new record
3164 is doesn't fit the provided slot then existing record is added
3165 to free list and new record is inserted. This also means
3166 cursor that we have cached for SELECT is now invalid. */
3167 if(index->last_sel_cur) {
3168 index->last_sel_cur->invalid = true;
3169 }
3170
3171 /* There is already an index entry with a long enough common
3172 prefix, we must convert the insert into a modify of an
3173 existing record */
3174 offsets = rec_get_offsets(
3175 btr_cur_get_rec(&cursor), index, offsets,
3176 ULINT_UNDEFINED, &offsets_heap);
3177
3178 err = row_ins_sec_index_entry_by_modify(
3179 flags, mode, &cursor, &offsets,
3180 offsets_heap, heap, entry, thr, &mtr);
3181
3182 if (err == DB_SUCCESS && dict_index_is_spatial(index)
3183 && rtr_info.mbr_adj) {
3184 err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3185 }
3186 } else {
3187 rec_t* insert_rec;
3188 big_rec_t* big_rec;
3189
3190 if (mode == BTR_MODIFY_LEAF) {
3191 err = btr_cur_optimistic_insert(
3192 flags, &cursor, &offsets, &offsets_heap,
3193 entry, &insert_rec,
3194 &big_rec, 0, thr, &mtr);
3195 if (err == DB_SUCCESS
3196 && dict_index_is_spatial(index)
3197 && rtr_info.mbr_adj) {
3198 err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3199 }
3200 } else {
3201 ut_ad(mode == BTR_MODIFY_TREE);
3202 if (buf_LRU_buf_pool_running_out()) {
3203
3204 err = DB_LOCK_TABLE_FULL;
3205 goto func_exit;
3206 }
3207
3208 err = btr_cur_optimistic_insert(
3209 flags, &cursor,
3210 &offsets, &offsets_heap,
3211 entry, &insert_rec,
3212 &big_rec, 0, thr, &mtr);
3213 if (err == DB_FAIL) {
3214 err = btr_cur_pessimistic_insert(
3215 flags, &cursor,
3216 &offsets, &offsets_heap,
3217 entry, &insert_rec,
3218 &big_rec, 0, thr, &mtr);
3219 }
3220 if (err == DB_SUCCESS
3221 && dict_index_is_spatial(index)
3222 && rtr_info.mbr_adj) {
3223 err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3224 }
3225 }
3226
3227 if (err == DB_SUCCESS && trx_id) {
3228 page_update_max_trx_id(
3229 btr_cur_get_block(&cursor),
3230 btr_cur_get_page_zip(&cursor),
3231 trx_id, &mtr);
3232 }
3233
3234 ut_ad(!big_rec);
3235 }
3236
3237 func_exit:
3238 if (dict_index_is_spatial(index)) {
3239 rtr_clean_rtr_info(&rtr_info, true);
3240 }
3241
3242 mtr_commit(&mtr);
3243 DBUG_RETURN(err);
3244 }
3245
3246 /***************************************************************//**
3247 Tries to insert the externally stored fields (off-page columns)
3248 of a clustered index entry.
3249 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
3250 dberr_t
row_ins_index_entry_big_rec_func(const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,dict_index_t * index,const char * file,const void * thd,ulint line)3251 row_ins_index_entry_big_rec_func(
3252 /*=============================*/
3253 const dtuple_t* entry, /*!< in/out: index entry to insert */
3254 const big_rec_t* big_rec,/*!< in: externally stored fields */
3255 ulint* offsets,/*!< in/out: rec offsets */
3256 mem_heap_t** heap, /*!< in/out: memory heap */
3257 dict_index_t* index, /*!< in: index */
3258 const char* file, /*!< in: file name of caller */
3259 #ifndef NDEBUG
3260 const void* thd, /*!< in: connection, or NULL */
3261 #endif /* NDEBUG */
3262 ulint line) /*!< in: line number of caller */
3263 {
3264 mtr_t mtr;
3265 btr_pcur_t pcur;
3266 rec_t* rec;
3267 dberr_t error;
3268
3269 ut_ad(dict_index_is_clust(index));
3270
3271 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
3272 DEBUG_SYNC_C("before_insertion_of_blob");
3273
3274 mtr_start(&mtr);
3275 mtr.set_named_space(index->space);
3276 dict_disable_redo_if_temporary(index->table, &mtr);
3277
3278 btr_pcur_open(index, entry, PAGE_CUR_LE, BTR_MODIFY_TREE,
3279 &pcur, &mtr);
3280 rec = btr_pcur_get_rec(&pcur);
3281 offsets = rec_get_offsets(rec, index, offsets,
3282 ULINT_UNDEFINED, heap);
3283
3284 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
3285 error = btr_store_big_rec_extern_fields(
3286 &pcur, 0, offsets, big_rec, &mtr, BTR_STORE_INSERT);
3287 DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
3288
3289 if (error == DB_SUCCESS
3290 && dict_index_is_online_ddl(index)) {
3291 row_log_table_insert(btr_pcur_get_rec(&pcur), entry,
3292 index, offsets);
3293 }
3294
3295 mtr_commit(&mtr);
3296
3297 btr_pcur_close(&pcur);
3298
3299 return(error);
3300 }
3301
3302 /***************************************************************//**
3303 Inserts an entry into a clustered index. Tries first optimistic,
3304 then pessimistic descent down the tree. If the entry matches enough
3305 to a delete marked record, performs the insert by updating or delete
3306 unmarking the delete marked record.
3307 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3308 dberr_t
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,ulint n_ext,bool dup_chk_only)3309 row_ins_clust_index_entry(
3310 /*======================*/
3311 dict_index_t* index, /*!< in: clustered index */
3312 dtuple_t* entry, /*!< in/out: index entry to insert */
3313 que_thr_t* thr, /*!< in: query thread */
3314 ulint n_ext, /*!< in: number of externally stored columns */
3315 bool dup_chk_only)
3316 /*!< in: if true, just do duplicate check
3317 and return. don't execute actual insert. */
3318 {
3319 dberr_t err;
3320 ulint n_uniq;
3321
3322 DBUG_ENTER("row_ins_clust_index_entry");
3323
3324 if (!index->table->foreign_set.empty()) {
3325 err = row_ins_check_foreign_constraints(
3326 index->table, index, entry, thr);
3327 if (err != DB_SUCCESS) {
3328
3329 DBUG_RETURN(err);
3330 }
3331 }
3332
3333 n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
3334
3335 /* Try first optimistic descent to the B-tree */
3336 ulint flags;
3337
3338 if (!dict_table_is_intrinsic(index->table)) {
3339 log_free_check();
3340 flags = dict_table_is_temporary(index->table)
3341 ? BTR_NO_LOCKING_FLAG
3342 : 0;
3343 } else {
3344 flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3345 }
3346
3347 if (dict_table_is_intrinsic(index->table)
3348 && dict_index_is_auto_gen_clust(index)) {
3349
3350 /* Check if the memory allocated for intrinsic cache*/
3351 if(!index->last_ins_cur) {
3352 dict_allocate_mem_intrinsic_cache(index);
3353 }
3354
3355 err = row_ins_sorted_clust_index_entry(
3356 BTR_MODIFY_LEAF, index, entry, n_ext, thr);
3357 } else {
3358 err = row_ins_clust_index_entry_low(
3359 flags, BTR_MODIFY_LEAF, index, n_uniq, entry,
3360 n_ext, thr, dup_chk_only);
3361 }
3362
3363
3364 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3365 "after_row_ins_clust_index_entry_leaf");
3366
3367 if (err != DB_FAIL) {
3368 DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
3369 DBUG_RETURN(err);
3370 }
3371
3372 /* Try then pessimistic descent to the B-tree */
3373 if (!dict_table_is_intrinsic(index->table)) {
3374 log_free_check();
3375 } else if(!index->last_sel_cur) {
3376 dict_allocate_mem_intrinsic_cache(index);
3377 index->last_sel_cur->invalid = true;
3378 } else {
3379 index->last_sel_cur->invalid = true;
3380 }
3381
3382 if (dict_table_is_intrinsic(index->table)
3383 && dict_index_is_auto_gen_clust(index)) {
3384 err = row_ins_sorted_clust_index_entry(
3385 BTR_MODIFY_TREE, index, entry, n_ext, thr);
3386 } else {
3387 err = row_ins_clust_index_entry_low(
3388 flags, BTR_MODIFY_TREE, index, n_uniq, entry,
3389 n_ext, thr, dup_chk_only);
3390 }
3391
3392 DBUG_RETURN(err);
3393 }
3394
3395 /***************************************************************//**
3396 Inserts an entry into a secondary index. Tries first optimistic,
3397 then pessimistic descent down the tree. If the entry matches enough
3398 to a delete marked record, performs the insert by updating or delete
3399 unmarking the delete marked record.
3400 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3401 dberr_t
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)3402 row_ins_sec_index_entry(
3403 /*====================*/
3404 dict_index_t* index, /*!< in: secondary index */
3405 dtuple_t* entry, /*!< in/out: index entry to insert */
3406 que_thr_t* thr, /*!< in: query thread */
3407 bool dup_chk_only)
3408 /*!< in: if true, just do duplicate check
3409 and return. don't execute actual insert. */
3410 {
3411 dberr_t err;
3412 mem_heap_t* offsets_heap;
3413 mem_heap_t* heap;
3414
3415 DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
3416 DBUG_SET("-d,row_ins_sec_index_entry_timeout");
3417 return(DB_LOCK_WAIT);});
3418
3419 if (!index->table->foreign_set.empty()) {
3420 err = row_ins_check_foreign_constraints(index->table, index,
3421 entry, thr);
3422 if (err != DB_SUCCESS) {
3423
3424 return(err);
3425 }
3426 }
3427
3428 ut_ad(thr_get_trx(thr)->id != 0
3429 || dict_table_is_intrinsic(index->table));
3430
3431 offsets_heap = mem_heap_create(1024);
3432 heap = mem_heap_create(1024);
3433
3434 /* Try first optimistic descent to the B-tree */
3435
3436 ulint flags;
3437
3438 if (!dict_table_is_intrinsic(index->table)) {
3439 log_free_check();
3440 flags = dict_table_is_temporary(index->table)
3441 ? BTR_NO_LOCKING_FLAG
3442 : 0;
3443 } else {
3444 flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3445 }
3446
3447 err = row_ins_sec_index_entry_low(
3448 flags, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry,
3449 0, thr, dup_chk_only);
3450 if (err == DB_FAIL) {
3451 mem_heap_empty(heap);
3452
3453 /* Try then pessimistic descent to the B-tree */
3454
3455 if (!dict_table_is_intrinsic(index->table)) {
3456 log_free_check();
3457 } else if(!index->last_sel_cur) {
3458 dict_allocate_mem_intrinsic_cache(index);
3459 index->last_sel_cur->invalid = true;
3460 } else {
3461 index->last_sel_cur->invalid = true;
3462 }
3463
3464 err = row_ins_sec_index_entry_low(
3465 flags, BTR_MODIFY_TREE, index,
3466 offsets_heap, heap, entry, 0, thr,
3467 dup_chk_only);
3468 }
3469
3470 mem_heap_free(heap);
3471 mem_heap_free(offsets_heap);
3472 return(err);
3473 }
3474
3475 /***************************************************************//**
3476 Inserts an index entry to index. Tries first optimistic, then pessimistic
3477 descent down the tree. If the entry matches enough to a delete marked record,
3478 performs the insert by updating or delete unmarking the delete marked
3479 record.
3480 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3481 static
3482 dberr_t
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)3483 row_ins_index_entry(
3484 /*================*/
3485 dict_index_t* index, /*!< in: index */
3486 dtuple_t* entry, /*!< in/out: index entry to insert */
3487 que_thr_t* thr) /*!< in: query thread */
3488 {
3489 ut_ad(thr_get_trx(thr)->id != 0);
3490
3491 DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
3492 DBUG_SET("-d,row_ins_index_entry_timeout");
3493 return(DB_LOCK_WAIT);});
3494
3495 if (dict_index_is_clust(index)) {
3496 return(row_ins_clust_index_entry(index, entry, thr, 0, false));
3497 } else {
3498 return(row_ins_sec_index_entry(index, entry, thr, false));
3499 }
3500 }
3501
3502
3503 /*****************************************************************//**
3504 This function generate MBR (Minimum Bounding Box) for spatial objects
3505 and set it to spatial index field. */
3506 static
3507 void
row_ins_spatial_index_entry_set_mbr_field(dfield_t * field,const dfield_t * row_field)3508 row_ins_spatial_index_entry_set_mbr_field(
3509 /*======================================*/
3510 dfield_t* field, /*!< in/out: mbr field */
3511 const dfield_t* row_field) /*!< in: row field */
3512 {
3513 uchar* dptr = NULL;
3514 ulint dlen = 0;
3515 double mbr[SPDIMS * 2];
3516
3517 /* This must be a GEOMETRY datatype */
3518 ut_ad(DATA_GEOMETRY_MTYPE(field->type.mtype));
3519
3520 dptr = static_cast<uchar*>(dfield_get_data(row_field));
3521 dlen = dfield_get_len(row_field);
3522
3523 /* obtain the MBR */
3524 rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
3525 static_cast<uint>(dlen - GEO_DATA_HEADER_SIZE),
3526 SPDIMS, mbr);
3527
3528 /* Set mbr as index entry data */
3529 dfield_write_mbr(field, mbr);
3530 }
3531
3532 /** Sets the values of the dtuple fields in entry from the values of appropriate
3533 columns in row.
3534 @param[in] index index handler
3535 @param[out] entry index entry to make
3536 @param[in] row row
3537
3538 @return DB_SUCCESS if the set is successful */
3539 dberr_t
row_ins_index_entry_set_vals(const dict_index_t * index,dtuple_t * entry,const dtuple_t * row)3540 row_ins_index_entry_set_vals(
3541 const dict_index_t* index,
3542 dtuple_t* entry,
3543 const dtuple_t* row)
3544 {
3545 ulint n_fields;
3546 ulint i;
3547 ulint num_v = dtuple_get_n_v_fields(entry);
3548
3549 n_fields = dtuple_get_n_fields(entry);
3550
3551 for (i = 0; i < n_fields + num_v; i++) {
3552 dict_field_t* ind_field = NULL;
3553 dfield_t* field;
3554 const dfield_t* row_field;
3555 ulint len;
3556 dict_col_t* col;
3557
3558 if (i >= n_fields) {
3559 /* This is virtual field */
3560 field = dtuple_get_nth_v_field(entry, i - n_fields);
3561 col = &dict_table_get_nth_v_col(
3562 index->table, i - n_fields)->m_col;
3563 } else {
3564 field = dtuple_get_nth_field(entry, i);
3565 ind_field = dict_index_get_nth_field(index, i);
3566 col = ind_field->col;
3567 }
3568
3569 if (dict_col_is_virtual(col)) {
3570 const dict_v_col_t* v_col
3571 = reinterpret_cast<const dict_v_col_t*>(col);
3572 ut_ad(dtuple_get_n_fields(row)
3573 == dict_table_get_n_cols(index->table));
3574 row_field = dtuple_get_nth_v_field(row, v_col->v_pos);
3575 } else {
3576 row_field = dtuple_get_nth_field(
3577 row, ind_field->col->ind);
3578 }
3579
3580 len = dfield_get_len(row_field);
3581
3582 /* Check column prefix indexes */
3583 if (ind_field != NULL && ind_field->prefix_len > 0
3584 && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3585
3586 const dict_col_t* col
3587 = dict_field_get_col(ind_field);
3588
3589 len = dtype_get_at_most_n_mbchars(
3590 col->prtype, col->mbminmaxlen,
3591 ind_field->prefix_len,
3592 len,
3593 static_cast<const char*>(
3594 dfield_get_data(row_field)));
3595
3596 ut_ad(!dfield_is_ext(row_field));
3597 }
3598
3599 /* Handle spatial index. For the first field, replace
3600 the data with its MBR (Minimum Bounding Box). */
3601 if ((i == 0) && dict_index_is_spatial(index)) {
3602 if (!row_field->data
3603 || row_field->len < GEO_DATA_HEADER_SIZE) {
3604 return(DB_CANT_CREATE_GEOMETRY_OBJECT);
3605 }
3606 row_ins_spatial_index_entry_set_mbr_field(
3607 field, row_field);
3608 continue;
3609 }
3610
3611 dfield_set_data(field, dfield_get_data(row_field), len);
3612 if (dfield_is_ext(row_field)) {
3613 ut_ad(dict_index_is_clust(index));
3614 dfield_set_ext(field);
3615 }
3616 }
3617
3618 return(DB_SUCCESS);
3619 }
3620
3621 /***********************************************************//**
3622 Inserts a single index entry to the table.
3623 @return DB_SUCCESS if operation successfully completed, else error
3624 code or DB_LOCK_WAIT */
3625 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3626 dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3627 row_ins_index_entry_step(
3628 /*=====================*/
3629 ins_node_t* node, /*!< in: row insert node */
3630 que_thr_t* thr) /*!< in: query thread */
3631 {
3632 dberr_t err;
3633
3634 DBUG_ENTER("row_ins_index_entry_step");
3635
3636 ut_ad(dtuple_check_typed(node->row));
3637
3638 err = row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3639
3640 if (err != DB_SUCCESS) {
3641 DBUG_RETURN(err);
3642 }
3643
3644 ut_ad(dtuple_check_typed(node->entry));
3645
3646 err = row_ins_index_entry(node->index, node->entry, thr);
3647
3648 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3649 "after_row_ins_index_entry_step");
3650
3651 DBUG_RETURN(err);
3652 }
3653
3654 /***********************************************************//**
3655 Allocates a row id for row and inits the node->index field. */
3656 UNIV_INLINE
3657 void
row_ins_alloc_row_id_step(ins_node_t * node)3658 row_ins_alloc_row_id_step(
3659 /*======================*/
3660 ins_node_t* node) /*!< in: row insert node */
3661 {
3662 row_id_t row_id;
3663
3664 ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3665
3666 if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3667
3668 /* No row id is stored if the clustered index is unique */
3669
3670 return;
3671 }
3672
3673 /* Fill in row id value to row */
3674
3675 row_id = dict_sys_get_new_row_id();
3676
3677 dict_sys_write_row_id(node->row_id_buf, row_id);
3678 }
3679
3680 /***********************************************************//**
3681 Gets a row to insert from the values list. */
3682 UNIV_INLINE
3683 void
row_ins_get_row_from_values(ins_node_t * node)3684 row_ins_get_row_from_values(
3685 /*========================*/
3686 ins_node_t* node) /*!< in: row insert node */
3687 {
3688 que_node_t* list_node;
3689 dfield_t* dfield;
3690 dtuple_t* row;
3691 ulint i;
3692
3693 /* The field values are copied in the buffers of the select node and
3694 it is safe to use them until we fetch from select again: therefore
3695 we can just copy the pointers */
3696
3697 row = node->row;
3698
3699 i = 0;
3700 list_node = node->values_list;
3701
3702 while (list_node) {
3703 eval_exp(list_node);
3704
3705 dfield = dtuple_get_nth_field(row, i);
3706 dfield_copy_data(dfield, que_node_get_val(list_node));
3707
3708 i++;
3709 list_node = que_node_get_next(list_node);
3710 }
3711 }
3712
3713 /***********************************************************//**
3714 Gets a row to insert from the select list. */
3715 UNIV_INLINE
3716 void
row_ins_get_row_from_select(ins_node_t * node)3717 row_ins_get_row_from_select(
3718 /*========================*/
3719 ins_node_t* node) /*!< in: row insert node */
3720 {
3721 que_node_t* list_node;
3722 dfield_t* dfield;
3723 dtuple_t* row;
3724 ulint i;
3725
3726 /* The field values are copied in the buffers of the select node and
3727 it is safe to use them until we fetch from select again: therefore
3728 we can just copy the pointers */
3729
3730 row = node->row;
3731
3732 i = 0;
3733 list_node = node->select->select_list;
3734
3735 while (list_node) {
3736 dfield = dtuple_get_nth_field(row, i);
3737 dfield_copy_data(dfield, que_node_get_val(list_node));
3738
3739 i++;
3740 list_node = que_node_get_next(list_node);
3741 }
3742 }
3743
3744 /***********************************************************//**
3745 Inserts a row to a table.
3746 @return DB_SUCCESS if operation successfully completed, else error
3747 code or DB_LOCK_WAIT */
3748 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3749 dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3750 row_ins(
3751 /*====*/
3752 ins_node_t* node, /*!< in: row insert node */
3753 que_thr_t* thr) /*!< in: query thread */
3754 {
3755 dberr_t err;
3756
3757 DBUG_ENTER("row_ins");
3758
3759 DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));
3760
3761 if (node->state == INS_NODE_ALLOC_ROW_ID) {
3762
3763 row_ins_alloc_row_id_step(node);
3764
3765 node->index = dict_table_get_first_index(node->table);
3766 node->entry = UT_LIST_GET_FIRST(node->entry_list);
3767
3768 if (node->ins_type == INS_SEARCHED) {
3769
3770 row_ins_get_row_from_select(node);
3771
3772 } else if (node->ins_type == INS_VALUES) {
3773
3774 row_ins_get_row_from_values(node);
3775 }
3776
3777 node->state = INS_NODE_INSERT_ENTRIES;
3778 }
3779
3780 ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3781
3782 while (node->index != NULL) {
3783 if (node->index->type != DICT_FTS) {
3784 err = row_ins_index_entry_step(node, thr);
3785 switch(err) {
3786 case DB_SUCCESS:
3787 break;
3788 case DB_DUPLICATE_KEY:
3789 thr_get_trx(thr)->error_state
3790 = DB_DUPLICATE_KEY;
3791 thr_get_trx(thr)->error_index
3792 = node->index;
3793 //fall through
3794 default:
3795 DBUG_RETURN(err);
3796 }
3797 }
3798
3799 node->index = dict_table_get_next_index(node->index);
3800 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3801
3802 DBUG_EXECUTE_IF(
3803 "row_ins_skip_sec",
3804 node->index = NULL; node->entry = NULL; break;);
3805
3806 /* Skip corrupted secondary index and its entry */
3807 while (node->index && dict_index_is_corrupted(node->index)) {
3808
3809 node->index = dict_table_get_next_index(node->index);
3810 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3811 }
3812
3813 }
3814
3815 ut_ad(node->entry == NULL);
3816 thr_get_trx(thr)->error_index = NULL;
3817 node->state = INS_NODE_ALLOC_ROW_ID;
3818
3819 DBUG_RETURN(DB_SUCCESS);
3820 }
3821
3822 /***********************************************************//**
3823 Inserts a row to a table. This is a high-level function used in SQL execution
3824 graphs.
3825 @return query thread to run next or NULL */
3826 que_thr_t*
row_ins_step(que_thr_t * thr)3827 row_ins_step(
3828 /*=========*/
3829 que_thr_t* thr) /*!< in: query thread */
3830 {
3831 ins_node_t* node;
3832 que_node_t* parent;
3833 sel_node_t* sel_node;
3834 trx_t* trx;
3835 dberr_t err;
3836
3837 ut_ad(thr);
3838
3839 DEBUG_SYNC_C("innodb_row_ins_step_enter");
3840
3841 trx = thr_get_trx(thr);
3842
3843 trx_start_if_not_started_xa(trx, true);
3844
3845 node = static_cast<ins_node_t*>(thr->run_node);
3846
3847 ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3848 ut_ad(!dict_table_is_intrinsic(node->table));
3849
3850 parent = que_node_get_parent(node);
3851 sel_node = node->select;
3852
3853 if (thr->prev_node == parent) {
3854 node->state = INS_NODE_SET_IX_LOCK;
3855 }
3856
3857 /* If this is the first time this node is executed (or when
3858 execution resumes after wait for the table IX lock), set an
3859 IX lock on the table and reset the possible select node. MySQL's
3860 partitioned table code may also call an insert within the same
3861 SQL statement AFTER it has used this table handle to do a search.
3862 This happens, for example, when a row update moves it to another
3863 partition. In that case, we have already set the IX lock on the
3864 table during the search operation, and there is no need to set
3865 it again here. But we must write trx->id to node->trx_id_buf. */
3866
3867 memset(node->trx_id_buf, 0, DATA_TRX_ID_LEN);
3868 trx_write_trx_id(node->trx_id_buf, trx->id);
3869
3870 if (node->state == INS_NODE_SET_IX_LOCK) {
3871
3872 node->state = INS_NODE_ALLOC_ROW_ID;
3873
3874 /* It may be that the current session has not yet started
3875 its transaction, or it has been committed: */
3876
3877 if (trx->id == node->trx_id) {
3878 /* No need to do IX-locking */
3879
3880 goto same_trx;
3881 }
3882
3883 err = lock_table(0, node->table, LOCK_IX, thr);
3884
3885 DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3886 err = DB_LOCK_WAIT;);
3887
3888 if (err != DB_SUCCESS) {
3889
3890 goto error_handling;
3891 }
3892
3893 node->trx_id = trx->id;
3894 same_trx:
3895 if (node->ins_type == INS_SEARCHED) {
3896 /* Reset the cursor */
3897 sel_node->state = SEL_NODE_OPEN;
3898
3899 /* Fetch a row to insert */
3900
3901 thr->run_node = sel_node;
3902
3903 return(thr);
3904 }
3905 }
3906
3907 if ((node->ins_type == INS_SEARCHED)
3908 && (sel_node->state != SEL_NODE_FETCH)) {
3909
3910 ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3911
3912 /* No more rows to insert */
3913 thr->run_node = parent;
3914
3915 return(thr);
3916 }
3917
3918 /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3919
3920 err = row_ins(node, thr);
3921
3922 error_handling:
3923 trx->error_state = err;
3924
3925 if (err != DB_SUCCESS) {
3926 /* err == DB_LOCK_WAIT or SQL error detected */
3927 return(NULL);
3928 }
3929
3930 /* DO THE TRIGGER ACTIONS HERE */
3931
3932 if (node->ins_type == INS_SEARCHED) {
3933 /* Fetch a row to insert */
3934
3935 thr->run_node = sel_node;
3936 } else {
3937 thr->run_node = que_node_get_parent(node);
3938 }
3939
3940 return(thr);
3941 }
3942