1 /*****************************************************************************
2
3 Copyright (c) 1996, 2016, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0ins.cc
29 Insert into a table
30
31 Created 4/20/1996 Heikki Tuuri
32 *******************************************************/
33
34 #include "row0ins.h"
35
36 #ifdef UNIV_NONINL
37 #include "row0ins.ic"
38 #endif
39
40 #include "ha_prototypes.h"
41 #include "dict0dict.h"
42 #include "dict0boot.h"
43 #include "trx0rec.h"
44 #include "trx0undo.h"
45 #include "btr0btr.h"
46 #include "btr0cur.h"
47 #include "mach0data.h"
48 #include "que0que.h"
49 #include "row0upd.h"
50 #include "row0sel.h"
51 #include "row0row.h"
52 #include "row0log.h"
53 #include "rem0cmp.h"
54 #include "lock0lock.h"
55 #include "log0log.h"
56 #include "eval0eval.h"
57 #include "data0data.h"
58 #include "usr0sess.h"
59 #include "buf0lru.h"
60 #include "fts0fts.h"
61 #include "fts0types.h"
62 #include "m_string.h"
63
64 /*************************************************************************
65 IMPORTANT NOTE: Any operation that generates redo MUST check that there
66 is enough space in the redo log before for that operation. This is
67 done by calling log_free_check(). The reason for checking the
68 availability of the redo log space before the start of the operation is
69 that we MUST not hold any synchonization objects when performing the
70 check.
71 If you make a change in this module make sure that no codepath is
72 introduced where a call to log_free_check() is bypassed. */
73
74 /*********************************************************************//**
75 Creates an insert node struct.
76 @return own: insert node struct */
77 UNIV_INTERN
78 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)79 ins_node_create(
80 /*============*/
81 ulint ins_type, /*!< in: INS_VALUES, ... */
82 dict_table_t* table, /*!< in: table where to insert */
83 mem_heap_t* heap) /*!< in: mem heap where created */
84 {
85 ins_node_t* node;
86
87 node = static_cast<ins_node_t*>(
88 mem_heap_alloc(heap, sizeof(ins_node_t)));
89
90 node->common.type = QUE_NODE_INSERT;
91
92 node->ins_type = ins_type;
93
94 node->state = INS_NODE_SET_IX_LOCK;
95 node->table = table;
96 node->index = NULL;
97 node->entry = NULL;
98
99 node->select = NULL;
100
101 node->trx_id = 0;
102
103 node->entry_sys_heap = mem_heap_create(128);
104
105 node->magic_n = INS_NODE_MAGIC_N;
106
107 return(node);
108 }
109
110 /***********************************************************//**
111 Creates an entry template for each index of a table. */
112 static
113 void
ins_node_create_entry_list(ins_node_t * node)114 ins_node_create_entry_list(
115 /*=======================*/
116 ins_node_t* node) /*!< in: row insert node */
117 {
118 dict_index_t* index;
119 dtuple_t* entry;
120
121 ut_ad(node->entry_sys_heap);
122
123 UT_LIST_INIT(node->entry_list);
124
125 /* We will include all indexes (include those corrupted
126 secondary indexes) in the entry list. Filteration of
127 these corrupted index will be done in row_ins() */
128
129 for (index = dict_table_get_first_index(node->table);
130 index != 0;
131 index = dict_table_get_next_index(index)) {
132
133 entry = row_build_index_entry(
134 node->row, NULL, index, node->entry_sys_heap);
135
136 UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
137 }
138 }
139
140 /*****************************************************************//**
141 Adds system field buffers to a row. */
142 static
143 void
row_ins_alloc_sys_fields(ins_node_t * node)144 row_ins_alloc_sys_fields(
145 /*=====================*/
146 ins_node_t* node) /*!< in: insert node */
147 {
148 dtuple_t* row;
149 dict_table_t* table;
150 mem_heap_t* heap;
151 const dict_col_t* col;
152 dfield_t* dfield;
153 byte* ptr;
154
155 row = node->row;
156 table = node->table;
157 heap = node->entry_sys_heap;
158
159 ut_ad(row && table && heap);
160 ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
161
162 /* allocate buffer to hold the needed system created hidden columns. */
163 uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
164 ptr = static_cast<byte*>(mem_heap_zalloc(heap, len));
165
166 /* 1. Populate row-id */
167 col = dict_table_get_sys_col(table, DATA_ROW_ID);
168
169 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
170
171 dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
172
173 node->row_id_buf = ptr;
174
175 ptr += DATA_ROW_ID_LEN;
176
177 /* 2. Populate trx id */
178 col = dict_table_get_sys_col(table, DATA_TRX_ID);
179
180 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
181
182 dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
183
184 node->trx_id_buf = ptr;
185
186 ptr += DATA_TRX_ID_LEN;
187
188 /* 3. Populate roll ptr */
189
190 col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
191
192 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
193
194 dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
195 }
196
197 /*********************************************************************//**
198 Sets a new row to insert for an INS_DIRECT node. This function is only used
199 if we have constructed the row separately, which is a rare case; this
200 function is quite slow. */
201 UNIV_INTERN
202 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)203 ins_node_set_new_row(
204 /*=================*/
205 ins_node_t* node, /*!< in: insert node */
206 dtuple_t* row) /*!< in: new row (or first row) for the node */
207 {
208 node->state = INS_NODE_SET_IX_LOCK;
209 node->index = NULL;
210 node->entry = NULL;
211
212 node->row = row;
213
214 mem_heap_empty(node->entry_sys_heap);
215
216 /* Create templates for index entries */
217
218 ins_node_create_entry_list(node);
219
220 /* Allocate from entry_sys_heap buffers for sys fields */
221
222 row_ins_alloc_sys_fields(node);
223
224 /* As we allocated a new trx id buf, the trx id should be written
225 there again: */
226
227 node->trx_id = 0;
228 }
229
230 /*******************************************************************//**
231 Does an insert operation by updating a delete-marked existing record
232 in the index. This situation can occur if the delete-marked record is
233 kept in the index for consistent reads.
234 @return DB_SUCCESS or error code */
235 static MY_ATTRIBUTE((nonnull, warn_unused_result))
236 dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)237 row_ins_sec_index_entry_by_modify(
238 /*==============================*/
239 ulint flags, /*!< in: undo logging and locking flags */
240 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
241 depending on whether mtr holds just a leaf
242 latch or also a tree latch */
243 btr_cur_t* cursor, /*!< in: B-tree cursor */
244 ulint** offsets,/*!< in/out: offsets on cursor->page_cur.rec */
245 mem_heap_t* offsets_heap,
246 /*!< in/out: memory heap that can be emptied */
247 mem_heap_t* heap, /*!< in/out: memory heap */
248 const dtuple_t* entry, /*!< in: index entry to insert */
249 que_thr_t* thr, /*!< in: query thread */
250 mtr_t* mtr) /*!< in: mtr; must be committed before
251 latching any further pages */
252 {
253 big_rec_t* dummy_big_rec;
254 upd_t* update;
255 rec_t* rec;
256 dberr_t err;
257
258 rec = btr_cur_get_rec(cursor);
259
260 ut_ad(!dict_index_is_clust(cursor->index));
261 ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
262 ut_ad(!entry->info_bits);
263
264 /* We know that in the alphabetical ordering, entry and rec are
265 identified. But in their binary form there may be differences if
266 there are char fields in them. Therefore we have to calculate the
267 difference. */
268
269 update = row_upd_build_sec_rec_difference_binary(
270 rec, cursor->index, *offsets, entry, heap);
271
272 if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
273 /* We should never insert in place of a record that
274 has not been delete-marked. The only exception is when
275 online CREATE INDEX copied the changes that we already
276 made to the clustered index, and completed the
277 secondary index creation before we got here. In this
278 case, the change would already be there. The CREATE
279 INDEX should be waiting for a MySQL meta-data lock
280 upgrade at least until this INSERT or UPDATE
281 returns. After that point, the TEMP_INDEX_PREFIX
282 would be dropped from the index name in
283 commit_inplace_alter_table(). */
284 ut_a(update->n_fields == 0);
285 ut_a(*cursor->index->name == TEMP_INDEX_PREFIX);
286 ut_ad(!dict_index_is_online_ddl(cursor->index));
287 return(DB_SUCCESS);
288 }
289
290 if (mode == BTR_MODIFY_LEAF) {
291 /* Try an optimistic updating of the record, keeping changes
292 within the page */
293
294 /* TODO: pass only *offsets */
295 err = btr_cur_optimistic_update(
296 flags | BTR_KEEP_SYS_FLAG, cursor,
297 offsets, &offsets_heap, update, 0, thr,
298 thr_get_trx(thr)->id, mtr);
299 switch (err) {
300 case DB_OVERFLOW:
301 case DB_UNDERFLOW:
302 case DB_ZIP_OVERFLOW:
303 err = DB_FAIL;
304 default:
305 break;
306 }
307 } else {
308 ut_a(mode == BTR_MODIFY_TREE);
309 if (buf_LRU_buf_pool_running_out()) {
310
311 return(DB_LOCK_TABLE_FULL);
312 }
313
314 err = btr_cur_pessimistic_update(
315 flags | BTR_KEEP_SYS_FLAG, cursor,
316 offsets, &offsets_heap,
317 heap, &dummy_big_rec, update, 0,
318 thr, thr_get_trx(thr)->id, mtr);
319 ut_ad(!dummy_big_rec);
320 }
321
322 return(err);
323 }
324
325 /*******************************************************************//**
326 Does an insert operation by delete unmarking and updating a delete marked
327 existing record in the index. This situation can occur if the delete marked
328 record is kept in the index for consistent reads.
329 @return DB_SUCCESS, DB_FAIL, or error code */
330 static MY_ATTRIBUTE((nonnull, warn_unused_result))
331 dberr_t
row_ins_clust_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,big_rec_t ** big_rec,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)332 row_ins_clust_index_entry_by_modify(
333 /*================================*/
334 ulint flags, /*!< in: undo logging and locking flags */
335 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
336 depending on whether mtr holds just a leaf
337 latch or also a tree latch */
338 btr_cur_t* cursor, /*!< in: B-tree cursor */
339 ulint** offsets,/*!< out: offsets on cursor->page_cur.rec */
340 mem_heap_t** offsets_heap,
341 /*!< in/out: pointer to memory heap that can
342 be emptied, or NULL */
343 mem_heap_t* heap, /*!< in/out: memory heap */
344 big_rec_t** big_rec,/*!< out: possible big rec vector of fields
345 which have to be stored externally by the
346 caller */
347 const dtuple_t* entry, /*!< in: index entry to insert */
348 que_thr_t* thr, /*!< in: query thread */
349 mtr_t* mtr) /*!< in: mtr; must be committed before
350 latching any further pages */
351 {
352 const rec_t* rec;
353 const upd_t* update;
354 dberr_t err;
355
356 ut_ad(dict_index_is_clust(cursor->index));
357
358 *big_rec = NULL;
359
360 rec = btr_cur_get_rec(cursor);
361
362 ut_ad(rec_get_deleted_flag(rec,
363 dict_table_is_comp(cursor->index->table)));
364
365 /* Build an update vector containing all the fields to be modified;
366 NOTE that this vector may NOT contain system columns trx_id or
367 roll_ptr */
368
369 update = row_upd_build_difference_binary(
370 cursor->index, entry, rec, NULL, true,
371 thr_get_trx(thr), heap);
372 if (mode != BTR_MODIFY_TREE) {
373 ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
374
375 /* Try optimistic updating of the record, keeping changes
376 within the page */
377
378 err = btr_cur_optimistic_update(
379 flags, cursor, offsets, offsets_heap, update, 0, thr,
380 thr_get_trx(thr)->id, mtr);
381 switch (err) {
382 case DB_OVERFLOW:
383 case DB_UNDERFLOW:
384 case DB_ZIP_OVERFLOW:
385 err = DB_FAIL;
386 default:
387 break;
388 }
389 } else {
390 if (buf_LRU_buf_pool_running_out()) {
391
392 return(DB_LOCK_TABLE_FULL);
393
394 }
395 err = btr_cur_pessimistic_update(
396 flags | BTR_KEEP_POS_FLAG,
397 cursor, offsets, offsets_heap, heap,
398 big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
399 }
400
401 return(err);
402 }
403
404 /*********************************************************************//**
405 Returns TRUE if in a cascaded update/delete an ancestor node of node
406 updates (not DELETE, but UPDATE) table.
407 @return TRUE if an ancestor updates table */
408 static
409 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)410 row_ins_cascade_ancestor_updates_table(
411 /*===================================*/
412 que_node_t* node, /*!< in: node in a query graph */
413 dict_table_t* table) /*!< in: table */
414 {
415 que_node_t* parent;
416
417 for (parent = que_node_get_parent(node);
418 que_node_get_type(parent) == QUE_NODE_UPDATE;
419 parent = que_node_get_parent(parent)) {
420
421 upd_node_t* upd_node;
422
423 upd_node = static_cast<upd_node_t*>(parent);
424
425 if (upd_node->table == table && upd_node->is_delete == FALSE) {
426
427 return(TRUE);
428 }
429 }
430
431 return(FALSE);
432 }
433
434 /*********************************************************************//**
435 Returns the number of ancestor UPDATE or DELETE nodes of a
436 cascaded update/delete node.
437 @return number of ancestors */
438 static MY_ATTRIBUTE((nonnull, warn_unused_result))
439 ulint
row_ins_cascade_n_ancestors(que_node_t * node)440 row_ins_cascade_n_ancestors(
441 /*========================*/
442 que_node_t* node) /*!< in: node in a query graph */
443 {
444 que_node_t* parent;
445 ulint n_ancestors = 0;
446
447 for (parent = que_node_get_parent(node);
448 que_node_get_type(parent) == QUE_NODE_UPDATE;
449 parent = que_node_get_parent(parent)) {
450
451 n_ancestors++;
452 }
453
454 return(n_ancestors);
455 }
456
457 /******************************************************************//**
458 Calculates the update vector node->cascade->update for a child table in
459 a cascaded update.
460 @return number of fields in the calculated update vector; the value
461 can also be 0 if no foreign key fields changed; the returned value is
462 ULINT_UNDEFINED if the column type in the child table is too short to
463 fit the new value in the parent table: that means the update fails */
464 static MY_ATTRIBUTE((nonnull, warn_unused_result))
465 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)466 row_ins_cascade_calc_update_vec(
467 /*============================*/
468 upd_node_t* node, /*!< in: update node of the parent
469 table */
470 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
471 type is != 0 */
472 mem_heap_t* heap, /*!< in: memory heap to use as
473 temporary storage */
474 trx_t* trx, /*!< in: update transaction */
475 ibool* fts_col_affected)/*!< out: is FTS column affected */
476 {
477 upd_node_t* cascade = node->cascade_node;
478 dict_table_t* table = foreign->foreign_table;
479 dict_index_t* index = foreign->foreign_index;
480 upd_t* update;
481 dict_table_t* parent_table;
482 dict_index_t* parent_index;
483 upd_t* parent_update;
484 ulint n_fields_updated;
485 ulint parent_field_no;
486 ulint i;
487 ulint j;
488 ibool doc_id_updated = FALSE;
489 ulint doc_id_pos = 0;
490 doc_id_t new_doc_id = FTS_NULL_DOC_ID;
491
492 ut_a(node);
493 ut_a(foreign);
494 ut_a(cascade);
495 ut_a(table);
496 ut_a(index);
497
498 /* Calculate the appropriate update vector which will set the fields
499 in the child index record to the same value (possibly padded with
500 spaces if the column is a fixed length CHAR or FIXBINARY column) as
501 the referenced index record will get in the update. */
502
503 parent_table = node->table;
504 ut_a(parent_table == foreign->referenced_table);
505 parent_index = foreign->referenced_index;
506 parent_update = node->update;
507
508 update = cascade->update;
509
510 update->info_bits = 0;
511 update->n_fields = foreign->n_fields;
512
513 n_fields_updated = 0;
514
515 *fts_col_affected = FALSE;
516
517 if (table->fts) {
518 doc_id_pos = dict_table_get_nth_col_pos(
519 table, table->fts->doc_col);
520 }
521
522 for (i = 0; i < foreign->n_fields; i++) {
523
524 parent_field_no = dict_table_get_nth_col_pos(
525 parent_table,
526 dict_index_get_nth_col_no(parent_index, i));
527
528 for (j = 0; j < parent_update->n_fields; j++) {
529 const upd_field_t* parent_ufield
530 = &parent_update->fields[j];
531
532 if (parent_ufield->field_no == parent_field_no) {
533
534 ulint min_size;
535 const dict_col_t* col;
536 ulint ufield_len;
537 upd_field_t* ufield;
538
539 col = dict_index_get_nth_col(index, i);
540
541 /* A field in the parent index record is
542 updated. Let us make the update vector
543 field for the child table. */
544
545 ufield = update->fields + n_fields_updated;
546
547 ufield->field_no
548 = dict_table_get_nth_col_pos(
549 table, dict_col_get_no(col));
550
551 ufield->orig_len = 0;
552 ufield->exp = NULL;
553
554 ufield->new_val = parent_ufield->new_val;
555 ufield_len = dfield_get_len(&ufield->new_val);
556
557 /* Clear the "external storage" flag */
558 dfield_set_len(&ufield->new_val, ufield_len);
559
560 /* Do not allow a NOT NULL column to be
561 updated as NULL */
562
563 if (dfield_is_null(&ufield->new_val)
564 && (col->prtype & DATA_NOT_NULL)) {
565
566 return(ULINT_UNDEFINED);
567 }
568
569 /* If the new value would not fit in the
570 column, do not allow the update */
571
572 if (!dfield_is_null(&ufield->new_val)
573 && dtype_get_at_most_n_mbchars(
574 col->prtype, col->mbminmaxlen,
575 col->len,
576 ufield_len,
577 static_cast<char*>(
578 dfield_get_data(
579 &ufield->new_val)))
580 < ufield_len) {
581
582 return(ULINT_UNDEFINED);
583 }
584
585 /* If the parent column type has a different
586 length than the child column type, we may
587 need to pad with spaces the new value of the
588 child column */
589
590 min_size = dict_col_get_min_size(col);
591
592 /* Because UNIV_SQL_NULL (the marker
593 of SQL NULL values) exceeds all possible
594 values of min_size, the test below will
595 not hold for SQL NULL columns. */
596
597 if (min_size > ufield_len) {
598
599 byte* pad;
600 ulint pad_len;
601 byte* padded_data;
602 ulint mbminlen;
603
604 padded_data = static_cast<byte*>(
605 mem_heap_alloc(
606 heap, min_size));
607
608 pad = padded_data + ufield_len;
609 pad_len = min_size - ufield_len;
610
611 memcpy(padded_data,
612 dfield_get_data(&ufield
613 ->new_val),
614 ufield_len);
615
616 mbminlen = dict_col_get_mbminlen(col);
617
618 ut_ad(!(ufield_len % mbminlen));
619 ut_ad(!(min_size % mbminlen));
620
621 if (mbminlen == 1
622 && dtype_get_charset_coll(
623 col->prtype)
624 == DATA_MYSQL_BINARY_CHARSET_COLL) {
625 /* Do not pad BINARY columns */
626 return(ULINT_UNDEFINED);
627 }
628
629 row_mysql_pad_col(mbminlen,
630 pad, pad_len);
631 dfield_set_data(&ufield->new_val,
632 padded_data, min_size);
633 }
634
635 /* Check whether the current column has
636 FTS index on it */
637 if (table->fts
638 && dict_table_is_fts_column(
639 table->fts->indexes,
640 dict_col_get_no(col))
641 != ULINT_UNDEFINED) {
642 *fts_col_affected = TRUE;
643 }
644
645 /* If Doc ID is updated, check whether the
646 Doc ID is valid */
647 if (table->fts
648 && ufield->field_no == doc_id_pos) {
649 doc_id_t n_doc_id;
650
651 n_doc_id =
652 table->fts->cache->next_doc_id;
653
654 new_doc_id = fts_read_doc_id(
655 static_cast<const byte*>(
656 dfield_get_data(
657 &ufield->new_val)));
658
659 if (new_doc_id <= 0) {
660 fprintf(stderr,
661 "InnoDB: FTS Doc ID "
662 "must be larger than "
663 "0 \n");
664 return(ULINT_UNDEFINED);
665 }
666
667 if (new_doc_id < n_doc_id) {
668 fprintf(stderr,
669 "InnoDB: FTS Doc ID "
670 "must be larger than "
671 IB_ID_FMT" for table",
672 n_doc_id -1);
673
674 ut_print_name(stderr, trx,
675 TRUE,
676 table->name);
677
678 putc('\n', stderr);
679 return(ULINT_UNDEFINED);
680 }
681
682 *fts_col_affected = TRUE;
683 doc_id_updated = TRUE;
684 }
685
686 n_fields_updated++;
687 }
688 }
689 }
690
691 /* Generate a new Doc ID if FTS index columns get updated */
692 if (table->fts && *fts_col_affected) {
693 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
694 doc_id_t doc_id;
695 upd_field_t* ufield;
696
697 ut_ad(!doc_id_updated);
698 ufield = update->fields + n_fields_updated;
699 fts_get_next_doc_id(table, &trx->fts_next_doc_id);
700 doc_id = fts_update_doc_id(table, ufield,
701 &trx->fts_next_doc_id);
702 n_fields_updated++;
703 fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
704 } else {
705 if (doc_id_updated) {
706 ut_ad(new_doc_id);
707 fts_trx_add_op(trx, table, new_doc_id,
708 FTS_INSERT, NULL);
709 } else {
710 fprintf(stderr, "InnoDB: FTS Doc ID must be "
711 "updated along with FTS indexed "
712 "column for table ");
713 ut_print_name(stderr, trx, TRUE, table->name);
714 putc('\n', stderr);
715 return(ULINT_UNDEFINED);
716 }
717 }
718 }
719
720 update->n_fields = n_fields_updated;
721
722 return(n_fields_updated);
723 }
724
725 /*********************************************************************//**
726 Set detailed error message associated with foreign key errors for
727 the given transaction. */
728 static
729 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)730 row_ins_set_detailed(
731 /*=================*/
732 trx_t* trx, /*!< in: transaction */
733 dict_foreign_t* foreign) /*!< in: foreign key constraint */
734 {
735 ut_ad(!srv_read_only_mode);
736
737 mutex_enter(&srv_misc_tmpfile_mutex);
738 rewind(srv_misc_tmpfile);
739
740 if (os_file_set_eof(srv_misc_tmpfile)) {
741 ut_print_name(srv_misc_tmpfile, trx, TRUE,
742 foreign->foreign_table_name);
743 dict_print_info_on_foreign_key_in_create_format(
744 srv_misc_tmpfile, trx, foreign, FALSE);
745 trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
746 } else {
747 trx_set_detailed_error(trx, "temp file operation failed");
748 }
749
750 mutex_exit(&srv_misc_tmpfile_mutex);
751 }
752
753 /*********************************************************************//**
754 Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
755 and displays information about the given transaction.
756 The caller must release dict_foreign_err_mutex. */
757 static
758 void
row_ins_foreign_trx_print(trx_t * trx)759 row_ins_foreign_trx_print(
760 /*======================*/
761 trx_t* trx) /*!< in: transaction */
762 {
763 ulint n_rec_locks;
764 ulint n_trx_locks;
765 ulint heap_size;
766
767 if (srv_read_only_mode) {
768 return;
769 }
770
771 lock_mutex_enter();
772 n_rec_locks = lock_number_of_rows_locked(&trx->lock);
773 n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
774 heap_size = mem_heap_get_size(trx->lock.lock_heap);
775 lock_mutex_exit();
776
777 mutex_enter(&trx_sys->mutex);
778
779 mutex_enter(&dict_foreign_err_mutex);
780 rewind(dict_foreign_err_file);
781 ut_print_timestamp(dict_foreign_err_file);
782 fputs(" Transaction:\n", dict_foreign_err_file);
783
784 trx_print_low(dict_foreign_err_file, trx, 600,
785 n_rec_locks, n_trx_locks, heap_size);
786
787 mutex_exit(&trx_sys->mutex);
788
789 ut_ad(mutex_own(&dict_foreign_err_mutex));
790 }
791
792 /*********************************************************************//**
793 Reports a foreign key error associated with an update or a delete of a
794 parent table index entry. */
795 static
796 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)797 row_ins_foreign_report_err(
798 /*=======================*/
799 const char* errstr, /*!< in: error string from the viewpoint
800 of the parent table */
801 que_thr_t* thr, /*!< in: query thread whose run_node
802 is an update node */
803 dict_foreign_t* foreign, /*!< in: foreign key constraint */
804 const rec_t* rec, /*!< in: a matching index record in the
805 child table */
806 const dtuple_t* entry) /*!< in: index entry in the parent
807 table */
808 {
809 if (srv_read_only_mode) {
810 return;
811 }
812
813 FILE* ef = dict_foreign_err_file;
814 trx_t* trx = thr_get_trx(thr);
815
816 row_ins_set_detailed(trx, foreign);
817
818 row_ins_foreign_trx_print(trx);
819
820 fputs("Foreign key constraint fails for table ", ef);
821 ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
822 fputs(":\n", ef);
823 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
824 TRUE);
825 putc('\n', ef);
826 fputs(errstr, ef);
827 fputs(" in parent table, in index ", ef);
828 ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
829 if (entry) {
830 fputs(" tuple:\n", ef);
831 dtuple_print(ef, entry);
832 }
833 fputs("\nBut in child table ", ef);
834 ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
835 fputs(", in index ", ef);
836 ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
837 if (rec) {
838 fputs(", there is a record:\n", ef);
839 rec_print(ef, rec, foreign->foreign_index);
840 } else {
841 fputs(", the record is not available\n", ef);
842 }
843 putc('\n', ef);
844
845 mutex_exit(&dict_foreign_err_mutex);
846 }
847
848 /*********************************************************************//**
849 Reports a foreign key error to dict_foreign_err_file when we are trying
850 to add an index entry to a child table. Note that the adding may be the result
851 of an update, too. */
852 static
853 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)854 row_ins_foreign_report_add_err(
855 /*===========================*/
856 trx_t* trx, /*!< in: transaction */
857 dict_foreign_t* foreign, /*!< in: foreign key constraint */
858 const rec_t* rec, /*!< in: a record in the parent table:
859 it does not match entry because we
860 have an error! */
861 const dtuple_t* entry) /*!< in: index entry to insert in the
862 child table */
863 {
864 if (srv_read_only_mode) {
865 return;
866 }
867
868 FILE* ef = dict_foreign_err_file;
869
870 row_ins_set_detailed(trx, foreign);
871
872 row_ins_foreign_trx_print(trx);
873
874 fputs("Foreign key constraint fails for table ", ef);
875 ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
876 fputs(":\n", ef);
877 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
878 TRUE);
879 fputs("\nTrying to add in child table, in index ", ef);
880 ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
881 if (entry) {
882 fputs(" tuple:\n", ef);
883 /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
884 It would be better to only display the user columns. */
885 dtuple_print(ef, entry);
886 }
887 fputs("\nBut in parent table ", ef);
888 ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
889 fputs(", in index ", ef);
890 ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
891 fputs(",\nthe closest match we can find is record:\n", ef);
892 if (rec && page_rec_is_supremum(rec)) {
893 /* If the cursor ended on a supremum record, it is better
894 to report the previous record in the error message, so that
895 the user gets a more descriptive error message. */
896 rec = page_rec_get_prev_const(rec);
897 }
898
899 if (rec) {
900 rec_print(ef, rec, foreign->referenced_index);
901 }
902 putc('\n', ef);
903
904 mutex_exit(&dict_foreign_err_mutex);
905 }
906
907 /*********************************************************************//**
908 Invalidate the query cache for the given table. */
909 static
910 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)911 row_ins_invalidate_query_cache(
912 /*===========================*/
913 que_thr_t* thr, /*!< in: query thread whose run_node
914 is an update node */
915 const char* name) /*!< in: table name prefixed with
916 database name and a '/' character */
917 {
918 char* buf;
919 char* ptr;
920 ulint len = strlen(name) + 1;
921
922 buf = mem_strdupl(name, len);
923
924 ptr = strchr(buf, '/');
925 ut_a(ptr);
926 *ptr = '\0';
927
928 innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
929 mem_free(buf);
930 }
931
932 /*********************************************************************//**
933 Perform referential actions or checks when a parent row is deleted or updated
934 and the constraint had an ON DELETE or ON UPDATE condition which was not
935 RESTRICT.
936 @return DB_SUCCESS, DB_LOCK_WAIT, or error code */
937 static MY_ATTRIBUTE((nonnull, warn_unused_result))
938 dberr_t
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)939 row_ins_foreign_check_on_constraint(
940 /*================================*/
941 que_thr_t* thr, /*!< in: query thread whose run_node
942 is an update node */
943 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
944 type is != 0 */
945 btr_pcur_t* pcur, /*!< in: cursor placed on a matching
946 index record in the child table */
947 dtuple_t* entry, /*!< in: index entry in the parent
948 table */
949 mtr_t* mtr) /*!< in: mtr holding the latch of pcur
950 page */
951 {
952 upd_node_t* node;
953 upd_node_t* cascade;
954 dict_table_t* table = foreign->foreign_table;
955 dict_index_t* index;
956 dict_index_t* clust_index;
957 dtuple_t* ref;
958 mem_heap_t* upd_vec_heap = NULL;
959 const rec_t* rec;
960 const rec_t* clust_rec;
961 const buf_block_t* clust_block;
962 upd_t* update;
963 ulint n_to_update;
964 dberr_t err;
965 ulint i;
966 trx_t* trx;
967 mem_heap_t* tmp_heap = NULL;
968 doc_id_t doc_id = FTS_NULL_DOC_ID;
969 ibool fts_col_affacted = FALSE;
970
971 ut_a(thr);
972 ut_a(foreign);
973 ut_a(pcur);
974 ut_a(mtr);
975
976 trx = thr_get_trx(thr);
977
978 /* Since we are going to delete or update a row, we have to invalidate
979 the MySQL query cache for table. A deadlock of threads is not possible
980 here because the caller of this function does not hold any latches with
981 the sync0sync.h rank above the lock_sys_t::mutex. The query cache mutex
982 has a rank just above the lock_sys_t::mutex. */
983
984 row_ins_invalidate_query_cache(thr, table->name);
985
986 node = static_cast<upd_node_t*>(thr->run_node);
987
988 if (node->is_delete && 0 == (foreign->type
989 & (DICT_FOREIGN_ON_DELETE_CASCADE
990 | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
991
992 row_ins_foreign_report_err("Trying to delete",
993 thr, foreign,
994 btr_pcur_get_rec(pcur), entry);
995
996 return(DB_ROW_IS_REFERENCED);
997 }
998
999 if (!node->is_delete && 0 == (foreign->type
1000 & (DICT_FOREIGN_ON_UPDATE_CASCADE
1001 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
1002
1003 /* This is an UPDATE */
1004
1005 row_ins_foreign_report_err("Trying to update",
1006 thr, foreign,
1007 btr_pcur_get_rec(pcur), entry);
1008
1009 return(DB_ROW_IS_REFERENCED);
1010 }
1011
1012 if (node->cascade_node == NULL) {
1013 /* Extend our query graph by creating a child to current
1014 update node. The child is used in the cascade or set null
1015 operation. */
1016
1017 node->cascade_heap = mem_heap_create(128);
1018 node->cascade_node = row_create_update_node_for_mysql(
1019 table, node->cascade_heap);
1020 que_node_set_parent(node->cascade_node, node);
1021 }
1022
1023 /* Initialize cascade_node to do the operation we want. Note that we
1024 use the SAME cascade node to do all foreign key operations of the
1025 SQL DELETE: the table of the cascade node may change if there are
1026 several child tables to the table where the delete is done! */
1027
1028 cascade = node->cascade_node;
1029
1030 cascade->table = table;
1031
1032 cascade->foreign = foreign;
1033
1034 if (node->is_delete
1035 && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1036 cascade->is_delete = TRUE;
1037 } else {
1038 cascade->is_delete = FALSE;
1039
1040 if (foreign->n_fields > cascade->update_n_fields) {
1041 /* We have to make the update vector longer */
1042
1043 cascade->update = upd_create(foreign->n_fields,
1044 node->cascade_heap);
1045 cascade->update_n_fields = foreign->n_fields;
1046 }
1047 }
1048
1049 /* We do not allow cyclic cascaded updating (DELETE is allowed,
1050 but not UPDATE) of the same table, as this can lead to an infinite
1051 cycle. Check that we are not updating the same table which is
1052 already being modified in this cascade chain. We have to check
1053 this also because the modification of the indexes of a 'parent'
1054 table may still be incomplete, and we must avoid seeing the indexes
1055 of the parent table in an inconsistent state! */
1056
1057 if (!cascade->is_delete
1058 && row_ins_cascade_ancestor_updates_table(cascade, table)) {
1059
1060 /* We do not know if this would break foreign key
1061 constraints, but play safe and return an error */
1062
1063 err = DB_ROW_IS_REFERENCED;
1064
1065 row_ins_foreign_report_err(
1066 "Trying an update, possibly causing a cyclic"
1067 " cascaded update\n"
1068 "in the child table,", thr, foreign,
1069 btr_pcur_get_rec(pcur), entry);
1070
1071 goto nonstandard_exit_func;
1072 }
1073
1074 if (row_ins_cascade_n_ancestors(cascade) >= 15) {
1075 err = DB_ROW_IS_REFERENCED;
1076
1077 row_ins_foreign_report_err(
1078 "Trying a too deep cascaded delete or update\n",
1079 thr, foreign, btr_pcur_get_rec(pcur), entry);
1080
1081 goto nonstandard_exit_func;
1082 }
1083
1084 index = btr_pcur_get_btr_cur(pcur)->index;
1085
1086 ut_a(index == foreign->foreign_index);
1087
1088 rec = btr_pcur_get_rec(pcur);
1089
1090 tmp_heap = mem_heap_create(256);
1091
1092 if (dict_index_is_clust(index)) {
1093 /* pcur is already positioned in the clustered index of
1094 the child table */
1095
1096 clust_index = index;
1097 clust_rec = rec;
1098 clust_block = btr_pcur_get_block(pcur);
1099 } else {
1100 /* We have to look for the record in the clustered index
1101 in the child table */
1102
1103 clust_index = dict_table_get_first_index(table);
1104
1105 ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1106 tmp_heap);
1107 btr_pcur_open_with_no_init(clust_index, ref,
1108 PAGE_CUR_LE, BTR_SEARCH_LEAF,
1109 cascade->pcur, 0, mtr);
1110
1111 clust_rec = btr_pcur_get_rec(cascade->pcur);
1112 clust_block = btr_pcur_get_block(cascade->pcur);
1113
1114 if (!page_rec_is_user_rec(clust_rec)
1115 || btr_pcur_get_low_match(cascade->pcur)
1116 < dict_index_get_n_unique(clust_index)) {
1117
1118 fputs("InnoDB: error in cascade of a foreign key op\n"
1119 "InnoDB: ", stderr);
1120 dict_index_name_print(stderr, trx, index);
1121
1122 fputs("\n"
1123 "InnoDB: record ", stderr);
1124 rec_print(stderr, rec, index);
1125 fputs("\n"
1126 "InnoDB: clustered record ", stderr);
1127 rec_print(stderr, clust_rec, clust_index);
1128 fputs("\n"
1129 "InnoDB: Submit a detailed bug report to"
1130 " http://bugs.mysql.com\n", stderr);
1131 ut_ad(0);
1132 err = DB_SUCCESS;
1133
1134 goto nonstandard_exit_func;
1135 }
1136 }
1137
1138 /* Set an X-lock on the row to delete or update in the child table */
1139
1140 err = lock_table(0, table, LOCK_IX, thr);
1141
1142 if (err == DB_SUCCESS) {
1143 /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1144 we already have a normal shared lock on the appropriate
1145 gap if the search criterion was not unique */
1146
1147 err = lock_clust_rec_read_check_and_lock_alt(
1148 0, clust_block, clust_rec, clust_index,
1149 LOCK_X, LOCK_REC_NOT_GAP, thr);
1150 }
1151
1152 if (err != DB_SUCCESS) {
1153
1154 goto nonstandard_exit_func;
1155 }
1156
1157 if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1158 /* This can happen if there is a circular reference of
1159 rows such that cascading delete comes to delete a row
1160 already in the process of being delete marked */
1161 err = DB_SUCCESS;
1162
1163 goto nonstandard_exit_func;
1164 }
1165
1166 if (table->fts) {
1167 doc_id = fts_get_doc_id_from_rec(table, clust_rec, tmp_heap);
1168 }
1169
1170 if (node->is_delete
1171 ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1172 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1173
1174 /* Build the appropriate update vector which sets
1175 foreign->n_fields first fields in rec to SQL NULL */
1176
1177 update = cascade->update;
1178
1179 update->info_bits = 0;
1180 update->n_fields = foreign->n_fields;
1181 UNIV_MEM_INVALID(update->fields,
1182 update->n_fields * sizeof *update->fields);
1183
1184 for (i = 0; i < foreign->n_fields; i++) {
1185 upd_field_t* ufield = &update->fields[i];
1186
1187 ufield->field_no = dict_table_get_nth_col_pos(
1188 table,
1189 dict_index_get_nth_col_no(index, i));
1190 ufield->orig_len = 0;
1191 ufield->exp = NULL;
1192 dfield_set_null(&ufield->new_val);
1193
1194 if (table->fts && dict_table_is_fts_column(
1195 table->fts->indexes,
1196 dict_index_get_nth_col_no(index, i))
1197 != ULINT_UNDEFINED) {
1198 fts_col_affacted = TRUE;
1199 }
1200 }
1201
1202 if (fts_col_affacted) {
1203 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1204 }
1205 } else if (table->fts && cascade->is_delete) {
1206 /* DICT_FOREIGN_ON_DELETE_CASCADE case */
1207 for (i = 0; i < foreign->n_fields; i++) {
1208 if (table->fts && dict_table_is_fts_column(
1209 table->fts->indexes,
1210 dict_index_get_nth_col_no(index, i))
1211 != ULINT_UNDEFINED) {
1212 fts_col_affacted = TRUE;
1213 }
1214 }
1215
1216 if (fts_col_affacted) {
1217 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1218 }
1219 }
1220
1221 if (!node->is_delete
1222 && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1223
1224 /* Build the appropriate update vector which sets changing
1225 foreign->n_fields first fields in rec to new values */
1226
1227 upd_vec_heap = mem_heap_create(256);
1228
1229 n_to_update = row_ins_cascade_calc_update_vec(
1230 node, foreign, upd_vec_heap, trx, &fts_col_affacted);
1231
1232 if (n_to_update == ULINT_UNDEFINED) {
1233 err = DB_ROW_IS_REFERENCED;
1234
1235 row_ins_foreign_report_err(
1236 "Trying a cascaded update where the"
1237 " updated value in the child\n"
1238 "table would not fit in the length"
1239 " of the column, or the value would\n"
1240 "be NULL and the column is"
1241 " declared as not NULL in the child table,",
1242 thr, foreign, btr_pcur_get_rec(pcur), entry);
1243
1244 goto nonstandard_exit_func;
1245 }
1246
1247 if (cascade->update->n_fields == 0) {
1248
1249 /* The update does not change any columns referred
1250 to in this foreign key constraint: no need to do
1251 anything */
1252
1253 err = DB_SUCCESS;
1254
1255 goto nonstandard_exit_func;
1256 }
1257
1258 /* Mark the old Doc ID as deleted */
1259 if (fts_col_affacted) {
1260 ut_ad(table->fts);
1261 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1262 }
1263 }
1264
1265 /* Store pcur position and initialize or store the cascade node
1266 pcur stored position */
1267
1268 btr_pcur_store_position(pcur, mtr);
1269
1270 if (index == clust_index) {
1271 btr_pcur_copy_stored_position(cascade->pcur, pcur);
1272 } else {
1273 btr_pcur_store_position(cascade->pcur, mtr);
1274 }
1275
1276 mtr_commit(mtr);
1277
1278 ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1279
1280 cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1281
1282 err = row_update_cascade_for_mysql(thr, cascade,
1283 foreign->foreign_table);
1284
1285 if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
1286 fprintf(stderr,
1287 "InnoDB: error: table %s has the counter 0"
1288 " though there is\n"
1289 "InnoDB: a FOREIGN KEY check running on it.\n",
1290 foreign->foreign_table->name);
1291 }
1292
1293 /* Release the data dictionary latch for a while, so that we do not
1294 starve other threads from doing CREATE TABLE etc. if we have a huge
1295 cascaded operation running. The counter n_foreign_key_checks_running
1296 will prevent other users from dropping or ALTERing the table when we
1297 release the latch. */
1298
1299 row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1300
1301 DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1302
1303 row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1304
1305 mtr_start(mtr);
1306
1307 /* Restore pcur position */
1308
1309 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1310
1311 if (tmp_heap) {
1312 mem_heap_free(tmp_heap);
1313 }
1314
1315 if (upd_vec_heap) {
1316 mem_heap_free(upd_vec_heap);
1317 }
1318
1319 return(err);
1320
1321 nonstandard_exit_func:
1322 if (tmp_heap) {
1323 mem_heap_free(tmp_heap);
1324 }
1325
1326 if (upd_vec_heap) {
1327 mem_heap_free(upd_vec_heap);
1328 }
1329
1330 btr_pcur_store_position(pcur, mtr);
1331
1332 mtr_commit(mtr);
1333 mtr_start(mtr);
1334
1335 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1336
1337 return(err);
1338 }
1339
1340 /*********************************************************************//**
1341 Sets a shared lock on a record. Used in locking possible duplicate key
1342 records and also in checking foreign key constraints.
1343 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1344 static
1345 dberr_t
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1346 row_ins_set_shared_rec_lock(
1347 /*========================*/
1348 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1349 LOCK_REC_NOT_GAP type lock */
1350 const buf_block_t* block, /*!< in: buffer block of rec */
1351 const rec_t* rec, /*!< in: record */
1352 dict_index_t* index, /*!< in: index */
1353 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1354 que_thr_t* thr) /*!< in: query thread */
1355 {
1356 dberr_t err;
1357
1358 ut_ad(rec_offs_validate(rec, index, offsets));
1359
1360 if (dict_index_is_clust(index)) {
1361 err = lock_clust_rec_read_check_and_lock(
1362 0, block, rec, index, offsets, LOCK_S, type, thr);
1363 } else {
1364 err = lock_sec_rec_read_check_and_lock(
1365 0, block, rec, index, offsets, LOCK_S, type, thr);
1366 }
1367
1368 return(err);
1369 }
1370
1371 /*********************************************************************//**
1372 Sets a exclusive lock on a record. Used in locking possible duplicate key
1373 records
1374 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1375 static
1376 dberr_t
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1377 row_ins_set_exclusive_rec_lock(
1378 /*===========================*/
1379 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1380 LOCK_REC_NOT_GAP type lock */
1381 const buf_block_t* block, /*!< in: buffer block of rec */
1382 const rec_t* rec, /*!< in: record */
1383 dict_index_t* index, /*!< in: index */
1384 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1385 que_thr_t* thr) /*!< in: query thread */
1386 {
1387 dberr_t err;
1388
1389 ut_ad(rec_offs_validate(rec, index, offsets));
1390
1391 if (dict_index_is_clust(index)) {
1392 err = lock_clust_rec_read_check_and_lock(
1393 0, block, rec, index, offsets, LOCK_X, type, thr);
1394 } else {
1395 err = lock_sec_rec_read_check_and_lock(
1396 0, block, rec, index, offsets, LOCK_X, type, thr);
1397 }
1398
1399 return(err);
1400 }
1401
1402 /***************************************************************//**
1403 Checks if foreign key constraint fails for an index entry. Sets shared locks
1404 which lock either the success or the failure of the constraint. NOTE that
1405 the caller must have a shared latch on dict_operation_lock.
1406 @return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1407 UNIV_INTERN
1408 dberr_t
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1409 row_ins_check_foreign_constraint(
1410 /*=============================*/
1411 ibool check_ref,/*!< in: TRUE if we want to check that
1412 the referenced table is ok, FALSE if we
1413 want to check the foreign key table */
1414 dict_foreign_t* foreign,/*!< in: foreign constraint; NOTE that the
1415 tables mentioned in it must be in the
1416 dictionary cache if they exist at all */
1417 dict_table_t* table, /*!< in: if check_ref is TRUE, then the foreign
1418 table, else the referenced table */
1419 dtuple_t* entry, /*!< in: index entry for index */
1420 que_thr_t* thr) /*!< in: query thread */
1421 {
1422 dberr_t err;
1423 upd_node_t* upd_node;
1424 dict_table_t* check_table;
1425 dict_index_t* check_index;
1426 ulint n_fields_cmp;
1427 btr_pcur_t pcur;
1428 int cmp;
1429 ulint i;
1430 mtr_t mtr;
1431 trx_t* trx = thr_get_trx(thr);
1432 mem_heap_t* heap = NULL;
1433 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1434 ulint* offsets = offsets_;
1435 rec_offs_init(offsets_);
1436
1437 run_again:
1438 #ifdef UNIV_SYNC_DEBUG
1439 ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
1440 #endif /* UNIV_SYNC_DEBUG */
1441
1442 err = DB_SUCCESS;
1443
1444 if (trx->check_foreigns == FALSE) {
1445 /* The user has suppressed foreign key checks currently for
1446 this session */
1447 goto exit_func;
1448 }
1449
1450 /* If any of the foreign key fields in entry is SQL NULL, we
1451 suppress the foreign key check: this is compatible with Oracle,
1452 for example */
1453
1454 for (i = 0; i < foreign->n_fields; i++) {
1455 if (UNIV_SQL_NULL == dfield_get_len(
1456 dtuple_get_nth_field(entry, i))) {
1457
1458 goto exit_func;
1459 }
1460 }
1461
1462 if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1463 upd_node = static_cast<upd_node_t*>(thr->run_node);
1464
1465 if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1466 /* If a cascaded update is done as defined by a
1467 foreign key constraint, do not check that
1468 constraint for the child row. In ON UPDATE CASCADE
1469 the update of the parent row is only half done when
1470 we come here: if we would check the constraint here
1471 for the child row it would fail.
1472
1473 A QUESTION remains: if in the child table there are
1474 several constraints which refer to the same parent
1475 table, we should merge all updates to the child as
1476 one update? And the updates can be contradictory!
1477 Currently we just perform the update associated
1478 with each foreign key constraint, one after
1479 another, and the user has problems predicting in
1480 which order they are performed. */
1481
1482 goto exit_func;
1483 }
1484 }
1485
1486 if (check_ref) {
1487 check_table = foreign->referenced_table;
1488 check_index = foreign->referenced_index;
1489 } else {
1490 check_table = foreign->foreign_table;
1491 check_index = foreign->foreign_index;
1492 }
1493
1494 if (check_table == NULL
1495 || check_table->ibd_file_missing
1496 || check_index == NULL) {
1497
1498 if (!srv_read_only_mode && check_ref) {
1499 FILE* ef = dict_foreign_err_file;
1500
1501 row_ins_set_detailed(trx, foreign);
1502
1503 row_ins_foreign_trx_print(trx);
1504
1505 fputs("Foreign key constraint fails for table ", ef);
1506 ut_print_name(ef, trx, TRUE,
1507 foreign->foreign_table_name);
1508 fputs(":\n", ef);
1509 dict_print_info_on_foreign_key_in_create_format(
1510 ef, trx, foreign, TRUE);
1511 fputs("\nTrying to add to index ", ef);
1512 ut_print_name(ef, trx, FALSE,
1513 foreign->foreign_index->name);
1514 fputs(" tuple:\n", ef);
1515 dtuple_print(ef, entry);
1516 fputs("\nBut the parent table ", ef);
1517 ut_print_name(ef, trx, TRUE,
1518 foreign->referenced_table_name);
1519 fputs("\nor its .ibd file does"
1520 " not currently exist!\n", ef);
1521 mutex_exit(&dict_foreign_err_mutex);
1522
1523 err = DB_NO_REFERENCED_ROW;
1524 }
1525
1526 goto exit_func;
1527 }
1528
1529 if (check_table != table) {
1530 /* We already have a LOCK_IX on table, but not necessarily
1531 on check_table */
1532
1533 err = lock_table(0, check_table, LOCK_IS, thr);
1534
1535 if (err != DB_SUCCESS) {
1536
1537 goto do_possible_lock_wait;
1538 }
1539 }
1540
1541 mtr_start(&mtr);
1542
1543 /* Store old value on n_fields_cmp */
1544
1545 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1546
1547 dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1548
1549 btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1550 BTR_SEARCH_LEAF, &pcur, &mtr);
1551
1552 /* Scan index records and check if there is a matching record */
1553
1554 do {
1555 const rec_t* rec = btr_pcur_get_rec(&pcur);
1556 const buf_block_t* block = btr_pcur_get_block(&pcur);
1557
1558 if (page_rec_is_infimum(rec)) {
1559
1560 continue;
1561 }
1562
1563 offsets = rec_get_offsets(rec, check_index,
1564 offsets, ULINT_UNDEFINED, &heap);
1565
1566 if (page_rec_is_supremum(rec)) {
1567
1568 err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1569 rec, check_index,
1570 offsets, thr);
1571 switch (err) {
1572 case DB_SUCCESS_LOCKED_REC:
1573 case DB_SUCCESS:
1574 continue;
1575 default:
1576 goto end_scan;
1577 }
1578 }
1579
1580 cmp = cmp_dtuple_rec(entry, rec, offsets);
1581
1582 if (cmp == 0) {
1583 if (rec_get_deleted_flag(rec,
1584 rec_offs_comp(offsets))) {
1585 err = row_ins_set_shared_rec_lock(
1586 LOCK_ORDINARY, block,
1587 rec, check_index, offsets, thr);
1588 switch (err) {
1589 case DB_SUCCESS_LOCKED_REC:
1590 case DB_SUCCESS:
1591 break;
1592 default:
1593 goto end_scan;
1594 }
1595 } else {
1596 /* Found a matching record. Lock only
1597 a record because we can allow inserts
1598 into gaps */
1599
1600 err = row_ins_set_shared_rec_lock(
1601 LOCK_REC_NOT_GAP, block,
1602 rec, check_index, offsets, thr);
1603
1604 switch (err) {
1605 case DB_SUCCESS_LOCKED_REC:
1606 case DB_SUCCESS:
1607 break;
1608 default:
1609 goto end_scan;
1610 }
1611
1612 if (check_ref) {
1613 err = DB_SUCCESS;
1614
1615 goto end_scan;
1616 } else if (foreign->type != 0) {
1617 /* There is an ON UPDATE or ON DELETE
1618 condition: check them in a separate
1619 function */
1620
1621 err = row_ins_foreign_check_on_constraint(
1622 thr, foreign, &pcur, entry,
1623 &mtr);
1624 if (err != DB_SUCCESS) {
1625 /* Since reporting a plain
1626 "duplicate key" error
1627 message to the user in
1628 cases where a long CASCADE
1629 operation would lead to a
1630 duplicate key in some
1631 other table is very
1632 confusing, map duplicate
1633 key errors resulting from
1634 FK constraints to a
1635 separate error code. */
1636
1637 if (err == DB_DUPLICATE_KEY) {
1638 err = DB_FOREIGN_DUPLICATE_KEY;
1639 }
1640
1641 goto end_scan;
1642 }
1643
1644 /* row_ins_foreign_check_on_constraint
1645 may have repositioned pcur on a
1646 different block */
1647 block = btr_pcur_get_block(&pcur);
1648 } else {
1649 row_ins_foreign_report_err(
1650 "Trying to delete or update",
1651 thr, foreign, rec, entry);
1652
1653 err = DB_ROW_IS_REFERENCED;
1654 goto end_scan;
1655 }
1656 }
1657 } else {
1658 ut_a(cmp < 0);
1659
1660 err = row_ins_set_shared_rec_lock(
1661 LOCK_GAP, block,
1662 rec, check_index, offsets, thr);
1663
1664 switch (err) {
1665 case DB_SUCCESS_LOCKED_REC:
1666 case DB_SUCCESS:
1667 if (check_ref) {
1668 err = DB_NO_REFERENCED_ROW;
1669 row_ins_foreign_report_add_err(
1670 trx, foreign, rec, entry);
1671 } else {
1672 err = DB_SUCCESS;
1673 }
1674 default:
1675 break;
1676 }
1677
1678 goto end_scan;
1679 }
1680 } while (btr_pcur_move_to_next(&pcur, &mtr));
1681
1682 if (check_ref) {
1683 row_ins_foreign_report_add_err(
1684 trx, foreign, btr_pcur_get_rec(&pcur), entry);
1685 err = DB_NO_REFERENCED_ROW;
1686 } else {
1687 err = DB_SUCCESS;
1688 }
1689
1690 end_scan:
1691 btr_pcur_close(&pcur);
1692
1693 mtr_commit(&mtr);
1694
1695 /* Restore old value */
1696 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1697
1698 do_possible_lock_wait:
1699 if (err == DB_LOCK_WAIT) {
1700 bool verified = false;
1701
1702 trx->error_state = err;
1703
1704 que_thr_stop_for_mysql(thr);
1705
1706 lock_wait_suspend_thread(thr);
1707
1708 if (check_table->to_be_dropped) {
1709 /* The table is being dropped. We shall timeout
1710 this operation */
1711 err = DB_LOCK_WAIT_TIMEOUT;
1712 goto exit_func;
1713 }
1714
1715 /* We had temporarily released dict_operation_lock in
1716 above lock sleep wait, now we have the lock again, and
1717 we will need to re-check whether the foreign key has been
1718 dropped. We only need to verify if the table is referenced
1719 table case (check_ref == 0), since MDL lock will prevent
1720 concurrent DDL and DML on the same table */
1721 if (!check_ref) {
1722 for (dict_foreign_set::iterator it
1723 = table->referenced_set.begin();
1724 it != table->referenced_set.end();
1725 ++it) {
1726 if (*it == foreign) {
1727 verified = true;
1728 break;
1729 }
1730 }
1731 } else {
1732 verified = true;
1733 }
1734
1735 if (!verified) {
1736 err = DB_DICT_CHANGED;
1737 } else if (trx->error_state == DB_SUCCESS) {
1738 goto run_again;
1739 } else {
1740 err = trx->error_state;
1741 }
1742 }
1743
1744 exit_func:
1745 if (UNIV_LIKELY_NULL(heap)) {
1746 mem_heap_free(heap);
1747 }
1748 return(err);
1749 }
1750
1751 /***************************************************************//**
1752 Checks if foreign key constraints fail for an index entry. If index
1753 is not mentioned in any constraint, this function does nothing,
1754 Otherwise does searches to the indexes of referenced tables and
1755 sets shared locks which lock either the success or the failure of
1756 a constraint.
1757 @return DB_SUCCESS or error code */
1758 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1759 dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1760 row_ins_check_foreign_constraints(
1761 /*==============================*/
1762 dict_table_t* table, /*!< in: table */
1763 dict_index_t* index, /*!< in: index */
1764 dtuple_t* entry, /*!< in: index entry for index */
1765 que_thr_t* thr) /*!< in: query thread */
1766 {
1767 dict_foreign_t* foreign;
1768 dberr_t err;
1769 trx_t* trx;
1770 ibool got_s_lock = FALSE;
1771
1772 trx = thr_get_trx(thr);
1773
1774 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1775 "foreign_constraint_check_for_ins");
1776
1777 for (dict_foreign_set::iterator it = table->foreign_set.begin();
1778 it != table->foreign_set.end();
1779 ++it) {
1780
1781 foreign = *it;
1782
1783 if (foreign->foreign_index == index) {
1784 dict_table_t* ref_table = NULL;
1785 dict_table_t* foreign_table = foreign->foreign_table;
1786 dict_table_t* referenced_table
1787 = foreign->referenced_table;
1788
1789 if (referenced_table == NULL) {
1790
1791 ref_table = dict_table_open_on_name(
1792 foreign->referenced_table_name_lookup,
1793 FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1794 }
1795
1796 if (0 == trx->dict_operation_lock_mode) {
1797 got_s_lock = TRUE;
1798
1799 row_mysql_freeze_data_dictionary(trx);
1800 }
1801
1802 if (referenced_table) {
1803 os_inc_counter(dict_sys->mutex,
1804 foreign_table
1805 ->n_foreign_key_checks_running);
1806 }
1807
1808 /* NOTE that if the thread ends up waiting for a lock
1809 we will release dict_operation_lock temporarily!
1810 But the counter on the table protects the referenced
1811 table from being dropped while the check is running. */
1812
1813 err = row_ins_check_foreign_constraint(
1814 TRUE, foreign, table, entry, thr);
1815
1816 DBUG_EXECUTE_IF("row_ins_dict_change_err",
1817 err = DB_DICT_CHANGED;);
1818
1819 if (referenced_table) {
1820 os_dec_counter(dict_sys->mutex,
1821 foreign_table
1822 ->n_foreign_key_checks_running);
1823 }
1824
1825 if (got_s_lock) {
1826 row_mysql_unfreeze_data_dictionary(trx);
1827 }
1828
1829 if (ref_table != NULL) {
1830 dict_table_close(ref_table, FALSE, FALSE);
1831 }
1832
1833 if (err != DB_SUCCESS) {
1834
1835 return(err);
1836 }
1837 }
1838 }
1839
1840 return(DB_SUCCESS);
1841 }
1842
1843 /***************************************************************//**
1844 Checks if a unique key violation to rec would occur at the index entry
1845 insert.
1846 @return TRUE if error */
1847 static
1848 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1849 row_ins_dupl_error_with_rec(
1850 /*========================*/
1851 const rec_t* rec, /*!< in: user record; NOTE that we assume
1852 that the caller already has a record lock on
1853 the record! */
1854 const dtuple_t* entry, /*!< in: entry to insert */
1855 dict_index_t* index, /*!< in: index */
1856 const ulint* offsets)/*!< in: rec_get_offsets(rec, index) */
1857 {
1858 ulint matched_fields;
1859 ulint matched_bytes;
1860 ulint n_unique;
1861 ulint i;
1862
1863 ut_ad(rec_offs_validate(rec, index, offsets));
1864
1865 n_unique = dict_index_get_n_unique(index);
1866
1867 matched_fields = 0;
1868 matched_bytes = 0;
1869
1870 cmp_dtuple_rec_with_match(entry, rec, offsets,
1871 &matched_fields, &matched_bytes);
1872
1873 if (matched_fields < n_unique) {
1874
1875 return(FALSE);
1876 }
1877
1878 /* In a unique secondary index we allow equal key values if they
1879 contain SQL NULLs */
1880
1881 if (!dict_index_is_clust(index)) {
1882
1883 for (i = 0; i < n_unique; i++) {
1884 if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1885
1886 return(FALSE);
1887 }
1888 }
1889 }
1890
1891 return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1892 }
1893
1894 /***************************************************************//**
1895 Scans a unique non-clustered index at a given index entry to determine
1896 whether a uniqueness violation has occurred for the key value of the entry.
1897 Set shared locks on possible duplicate records.
1898 @return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
1899 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1900 dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)1901 row_ins_scan_sec_index_for_duplicate(
1902 /*=================================*/
1903 ulint flags, /*!< in: undo logging and locking flags */
1904 dict_index_t* index, /*!< in: non-clustered unique index */
1905 dtuple_t* entry, /*!< in: index entry */
1906 que_thr_t* thr, /*!< in: query thread */
1907 bool s_latch,/*!< in: whether index->lock is being held */
1908 mtr_t* mtr, /*!< in/out: mini-transaction */
1909 mem_heap_t* offsets_heap)
1910 /*!< in/out: memory heap that can be emptied */
1911 {
1912 ulint n_unique;
1913 int cmp;
1914 ulint n_fields_cmp;
1915 btr_pcur_t pcur;
1916 dberr_t err = DB_SUCCESS;
1917 ulint allow_duplicates;
1918 ulint* offsets = NULL;
1919
1920 #ifdef UNIV_SYNC_DEBUG
1921 ut_ad(s_latch == rw_lock_own(&index->lock, RW_LOCK_SHARED));
1922 #endif /* UNIV_SYNC_DEBUG */
1923
1924 n_unique = dict_index_get_n_unique(index);
1925
1926 /* If the secondary index is unique, but one of the fields in the
1927 n_unique first fields is NULL, a unique key violation cannot occur,
1928 since we define NULL != NULL in this case */
1929
1930 for (ulint i = 0; i < n_unique; i++) {
1931 if (UNIV_SQL_NULL == dfield_get_len(
1932 dtuple_get_nth_field(entry, i))) {
1933
1934 return(DB_SUCCESS);
1935 }
1936 }
1937
1938 /* Store old value on n_fields_cmp */
1939
1940 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1941
1942 dtuple_set_n_fields_cmp(entry, n_unique);
1943
1944 btr_pcur_open(index, entry, PAGE_CUR_GE,
1945 s_latch
1946 ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED
1947 : BTR_SEARCH_LEAF,
1948 &pcur, mtr);
1949
1950 allow_duplicates = thr_get_trx(thr)->duplicates;
1951
1952 /* Scan index records and check if there is a duplicate */
1953
1954 do {
1955 const rec_t* rec = btr_pcur_get_rec(&pcur);
1956 const buf_block_t* block = btr_pcur_get_block(&pcur);
1957 const ulint lock_type = LOCK_ORDINARY;
1958
1959 if (page_rec_is_infimum(rec)) {
1960
1961 continue;
1962 }
1963
1964 offsets = rec_get_offsets(rec, index, offsets,
1965 ULINT_UNDEFINED, &offsets_heap);
1966
1967 if (flags & BTR_NO_LOCKING_FLAG) {
1968 /* Set no locks when applying log
1969 in online table rebuild. */
1970 } else if (allow_duplicates) {
1971
1972 /* If the SQL-query will update or replace
1973 duplicate key we will take X-lock for
1974 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1975 INSERT ON DUPLICATE KEY UPDATE). */
1976
1977 err = row_ins_set_exclusive_rec_lock(
1978 lock_type, block, rec, index, offsets, thr);
1979 } else {
1980
1981 err = row_ins_set_shared_rec_lock(
1982 lock_type, block, rec, index, offsets, thr);
1983 }
1984
1985 switch (err) {
1986 case DB_SUCCESS_LOCKED_REC:
1987 err = DB_SUCCESS;
1988 case DB_SUCCESS:
1989 break;
1990 default:
1991 goto end_scan;
1992 }
1993
1994 if (page_rec_is_supremum(rec)) {
1995
1996 continue;
1997 }
1998
1999 cmp = cmp_dtuple_rec(entry, rec, offsets);
2000
2001 if (cmp == 0) {
2002 if (row_ins_dupl_error_with_rec(rec, entry,
2003 index, offsets)) {
2004 err = DB_DUPLICATE_KEY;
2005
2006 thr_get_trx(thr)->error_info = index;
2007
2008 /* If the duplicate is on hidden FTS_DOC_ID,
2009 state so in the error log */
2010 if (DICT_TF2_FLAG_IS_SET(
2011 index->table,
2012 DICT_TF2_FTS_HAS_DOC_ID)
2013 && strcmp(index->name,
2014 FTS_DOC_ID_INDEX_NAME) == 0) {
2015 ib_logf(IB_LOG_LEVEL_ERROR,
2016 "Duplicate FTS_DOC_ID value"
2017 " on table %s",
2018 index->table->name);
2019 }
2020
2021 goto end_scan;
2022 }
2023 } else {
2024 ut_a(cmp < 0);
2025 goto end_scan;
2026 }
2027 } while (btr_pcur_move_to_next(&pcur, mtr));
2028
2029 end_scan:
2030 /* Restore old value */
2031 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2032
2033 return(err);
2034 }
2035
2036 /** Checks for a duplicate when the table is being rebuilt online.
2037 @retval DB_SUCCESS when no duplicate is detected
2038 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2039 a newer version of entry (the entry should not be inserted)
2040 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2041 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2042 dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,ulint * offsets)2043 row_ins_duplicate_online(
2044 /*=====================*/
2045 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2046 const dtuple_t* entry, /*!< in: entry that is being inserted */
2047 const rec_t* rec, /*!< in: clustered index record */
2048 ulint* offsets)/*!< in/out: rec_get_offsets(rec) */
2049 {
2050 ulint fields = 0;
2051 ulint bytes = 0;
2052
2053 /* During rebuild, there should not be any delete-marked rows
2054 in the new table. */
2055 ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2056 ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2057
2058 /* Compare the PRIMARY KEY fields and the
2059 DB_TRX_ID, DB_ROLL_PTR. */
2060 cmp_dtuple_rec_with_match_low(
2061 entry, rec, offsets, n_uniq + 2, &fields, &bytes);
2062
2063 if (fields < n_uniq) {
2064 /* Not a duplicate. */
2065 return(DB_SUCCESS);
2066 }
2067
2068 if (fields == n_uniq + 2) {
2069 /* rec is an exact match of entry. */
2070 ut_ad(bytes == 0);
2071 return(DB_SUCCESS_LOCKED_REC);
2072 }
2073
2074 return(DB_DUPLICATE_KEY);
2075 }
2076
2077 /** Checks for a duplicate when the table is being rebuilt online.
2078 @retval DB_SUCCESS when no duplicate is detected
2079 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2080 a newer version of entry (the entry should not be inserted)
2081 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2082 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2083 dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2084 row_ins_duplicate_error_in_clust_online(
2085 /*====================================*/
2086 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2087 const dtuple_t* entry, /*!< in: entry that is being inserted */
2088 const btr_cur_t*cursor, /*!< in: cursor on insert position */
2089 ulint** offsets,/*!< in/out: rec_get_offsets(rec) */
2090 mem_heap_t** heap) /*!< in/out: heap for offsets */
2091 {
2092 dberr_t err = DB_SUCCESS;
2093 const rec_t* rec = btr_cur_get_rec(cursor);
2094
2095 if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2096 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2097 ULINT_UNDEFINED, heap);
2098 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2099 if (err != DB_SUCCESS) {
2100 return(err);
2101 }
2102 }
2103
2104 rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2105
2106 if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2107 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2108 ULINT_UNDEFINED, heap);
2109 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2110 }
2111
2112 return(err);
2113 }
2114
2115 /***************************************************************//**
2116 Checks if a unique key violation error would occur at an index entry
2117 insert. Sets shared locks on possible duplicate records. Works only
2118 for a clustered index!
2119 @retval DB_SUCCESS if no error
2120 @retval DB_DUPLICATE_KEY if error,
2121 @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2122 record
2123 @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2124 in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2125 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2126 dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2127 row_ins_duplicate_error_in_clust(
2128 /*=============================*/
2129 ulint flags, /*!< in: undo logging and locking flags */
2130 btr_cur_t* cursor, /*!< in: B-tree cursor */
2131 const dtuple_t* entry, /*!< in: entry to insert */
2132 que_thr_t* thr, /*!< in: query thread */
2133 mtr_t* mtr) /*!< in: mtr */
2134 {
2135 dberr_t err;
2136 rec_t* rec;
2137 ulint n_unique;
2138 trx_t* trx = thr_get_trx(thr);
2139 mem_heap_t*heap = NULL;
2140 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2141 ulint* offsets = offsets_;
2142 rec_offs_init(offsets_);
2143
2144 UT_NOT_USED(mtr);
2145
2146 ut_ad(dict_index_is_clust(cursor->index));
2147
2148 /* NOTE: For unique non-clustered indexes there may be any number
2149 of delete marked records with the same value for the non-clustered
2150 index key (remember multiversioning), and which differ only in
2151 the row refererence part of the index record, containing the
2152 clustered index key fields. For such a secondary index record,
2153 to avoid race condition, we must FIRST do the insertion and after
2154 that check that the uniqueness condition is not breached! */
2155
2156 /* NOTE: A problem is that in the B-tree node pointers on an
2157 upper level may match more to the entry than the actual existing
2158 user records on the leaf level. So, even if low_match would suggest
2159 that a duplicate key violation may occur, this may not be the case. */
2160
2161 n_unique = dict_index_get_n_unique(cursor->index);
2162
2163 if (cursor->low_match >= n_unique) {
2164
2165 rec = btr_cur_get_rec(cursor);
2166
2167 if (!page_rec_is_infimum(rec)) {
2168 offsets = rec_get_offsets(rec, cursor->index, offsets,
2169 ULINT_UNDEFINED, &heap);
2170
2171 /* We set a lock on the possible duplicate: this
2172 is needed in logical logging of MySQL to make
2173 sure that in roll-forward we get the same duplicate
2174 errors as in original execution */
2175
2176 if (trx->duplicates) {
2177
2178 /* If the SQL-query will update or replace
2179 duplicate key we will take X-lock for
2180 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2181 INSERT ON DUPLICATE KEY UPDATE). */
2182
2183 err = row_ins_set_exclusive_rec_lock(
2184 LOCK_REC_NOT_GAP,
2185 btr_cur_get_block(cursor),
2186 rec, cursor->index, offsets, thr);
2187 } else {
2188
2189 err = row_ins_set_shared_rec_lock(
2190 LOCK_REC_NOT_GAP,
2191 btr_cur_get_block(cursor), rec,
2192 cursor->index, offsets, thr);
2193 }
2194
2195 switch (err) {
2196 case DB_SUCCESS_LOCKED_REC:
2197 case DB_SUCCESS:
2198 break;
2199 default:
2200 goto func_exit;
2201 }
2202
2203 if (row_ins_dupl_error_with_rec(
2204 rec, entry, cursor->index, offsets)) {
2205 duplicate:
2206 trx->error_info = cursor->index;
2207 err = DB_DUPLICATE_KEY;
2208 goto func_exit;
2209 }
2210 }
2211 }
2212
2213 if (cursor->up_match >= n_unique) {
2214
2215 rec = page_rec_get_next(btr_cur_get_rec(cursor));
2216
2217 if (!page_rec_is_supremum(rec)) {
2218 offsets = rec_get_offsets(rec, cursor->index, offsets,
2219 ULINT_UNDEFINED, &heap);
2220
2221 if (trx->duplicates) {
2222
2223 /* If the SQL-query will update or replace
2224 duplicate key we will take X-lock for
2225 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2226 INSERT ON DUPLICATE KEY UPDATE). */
2227
2228 err = row_ins_set_exclusive_rec_lock(
2229 LOCK_REC_NOT_GAP,
2230 btr_cur_get_block(cursor),
2231 rec, cursor->index, offsets, thr);
2232 } else {
2233
2234 err = row_ins_set_shared_rec_lock(
2235 LOCK_REC_NOT_GAP,
2236 btr_cur_get_block(cursor),
2237 rec, cursor->index, offsets, thr);
2238 }
2239
2240 switch (err) {
2241 case DB_SUCCESS_LOCKED_REC:
2242 case DB_SUCCESS:
2243 break;
2244 default:
2245 goto func_exit;
2246 }
2247
2248 if (row_ins_dupl_error_with_rec(
2249 rec, entry, cursor->index, offsets)) {
2250 goto duplicate;
2251 }
2252 }
2253
2254 /* This should never happen */
2255 ut_error;
2256 }
2257
2258 err = DB_SUCCESS;
2259 func_exit:
2260 if (UNIV_LIKELY_NULL(heap)) {
2261 mem_heap_free(heap);
2262 }
2263 return(err);
2264 }
2265
2266 /***************************************************************//**
2267 Checks if an index entry has long enough common prefix with an
2268 existing record so that the intended insert of the entry must be
2269 changed to a modify of the existing record. In the case of a clustered
2270 index, the prefix must be n_unique fields long. In the case of a
2271 secondary index, all fields must be equal. InnoDB never updates
2272 secondary index records in place, other than clearing or setting the
2273 delete-mark flag. We could be able to update the non-unique fields
2274 of a unique secondary index record by checking the cursor->up_match,
2275 but we do not do so, because it could have some locking implications.
2276 @return TRUE if the existing record should be updated; FALSE if not */
2277 UNIV_INLINE
2278 ibool
row_ins_must_modify_rec(const btr_cur_t * cursor)2279 row_ins_must_modify_rec(
2280 /*====================*/
2281 const btr_cur_t* cursor) /*!< in: B-tree cursor */
2282 {
2283 /* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2284 Because node pointers on upper levels of the B-tree may match more
2285 to entry than to actual user records on the leaf level, we
2286 have to check if the candidate record is actually a user record.
2287 A clustered index node pointer contains index->n_unique first fields,
2288 and a secondary index node pointer contains all index fields. */
2289
2290 return(cursor->low_match
2291 >= dict_index_get_n_unique_in_tree(cursor->index)
2292 && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2293 }
2294
2295 /***************************************************************//**
2296 Tries to insert an entry into a clustered index, ignoring foreign key
2297 constraints. If a record with the same unique key is found, the other
2298 record is necessarily marked deleted by a committed transaction, or a
2299 unique key violation error occurs. The delete marked record is then
2300 updated to an existing record, and we must write an undo log record on
2301 the delete marked record.
2302 @retval DB_SUCCESS on success
2303 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2304 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2305 @return error code */
2306 UNIV_INTERN
2307 dberr_t
row_ins_clust_index_entry_low(ulint flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,ulint n_ext,que_thr_t * thr)2308 row_ins_clust_index_entry_low(
2309 /*==========================*/
2310 ulint flags, /*!< in: undo logging and locking flags */
2311 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2312 depending on whether we wish optimistic or
2313 pessimistic descent down the index tree */
2314 dict_index_t* index, /*!< in: clustered index */
2315 ulint n_uniq, /*!< in: 0 or index->n_uniq */
2316 dtuple_t* entry, /*!< in/out: index entry to insert */
2317 ulint n_ext, /*!< in: number of externally stored columns */
2318 que_thr_t* thr) /*!< in: query thread */
2319 {
2320 btr_cur_t cursor;
2321 ulint* offsets = NULL;
2322 dberr_t err;
2323 big_rec_t* big_rec = NULL;
2324 mtr_t mtr;
2325 mem_heap_t* offsets_heap = NULL;
2326
2327 ut_ad(dict_index_is_clust(index));
2328 ut_ad(!dict_index_is_unique(index)
2329 || n_uniq == dict_index_get_n_unique(index));
2330 ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2331
2332 mtr_start(&mtr);
2333
2334 if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2335 mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
2336 mtr_s_lock(dict_index_get_lock(index), &mtr);
2337 }
2338
2339 cursor.thr = thr;
2340
2341 /* Note that we use PAGE_CUR_LE as the search mode, because then
2342 the function will return in both low_match and up_match of the
2343 cursor sensible values */
2344
2345 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, mode,
2346 &cursor, 0, __FILE__, __LINE__, &mtr);
2347
2348 #ifdef UNIV_DEBUG
2349 {
2350 page_t* page = btr_cur_get_page(&cursor);
2351 rec_t* first_rec = page_rec_get_next(
2352 page_get_infimum_rec(page));
2353
2354 ut_ad(page_rec_is_supremum(first_rec)
2355 || rec_get_n_fields(first_rec, index)
2356 == dtuple_get_n_fields(entry));
2357 }
2358 #endif
2359
2360 if (n_uniq && (cursor.up_match >= n_uniq
2361 || cursor.low_match >= n_uniq)) {
2362
2363 if (flags
2364 == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2365 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
2366 /* Set no locks when applying log
2367 in online table rebuild. Only check for duplicates. */
2368 err = row_ins_duplicate_error_in_clust_online(
2369 n_uniq, entry, &cursor,
2370 &offsets, &offsets_heap);
2371
2372 switch (err) {
2373 case DB_SUCCESS:
2374 break;
2375 default:
2376 ut_ad(0);
2377 /* fall through */
2378 case DB_SUCCESS_LOCKED_REC:
2379 case DB_DUPLICATE_KEY:
2380 thr_get_trx(thr)->error_info = cursor.index;
2381 }
2382 } else {
2383 /* Note that the following may return also
2384 DB_LOCK_WAIT */
2385
2386 err = row_ins_duplicate_error_in_clust(
2387 flags, &cursor, entry, thr, &mtr);
2388 }
2389
2390 if (err != DB_SUCCESS) {
2391 err_exit:
2392 mtr_commit(&mtr);
2393 goto func_exit;
2394 }
2395 }
2396
2397 if (row_ins_must_modify_rec(&cursor)) {
2398 /* There is already an index entry with a long enough common
2399 prefix, we must convert the insert into a modify of an
2400 existing record */
2401 mem_heap_t* entry_heap = mem_heap_create(1024);
2402
2403 err = row_ins_clust_index_entry_by_modify(
2404 flags, mode, &cursor, &offsets, &offsets_heap,
2405 entry_heap, &big_rec, entry, thr, &mtr);
2406
2407 rec_t* rec = btr_cur_get_rec(&cursor);
2408
2409 if (big_rec) {
2410 ut_a(err == DB_SUCCESS);
2411 /* Write out the externally stored
2412 columns while still x-latching
2413 index->lock and block->lock. Allocate
2414 pages for big_rec in the mtr that
2415 modified the B-tree, but be sure to skip
2416 any pages that were freed in mtr. We will
2417 write out the big_rec pages before
2418 committing the B-tree mini-transaction. If
2419 the system crashes so that crash recovery
2420 will not replay the mtr_commit(&mtr), the
2421 big_rec pages will be left orphaned until
2422 the pages are allocated for something else.
2423
2424 TODO: If the allocation extends the
2425 tablespace, it will not be redo
2426 logged, in either mini-transaction.
2427 Tablespace extension should be
2428 redo-logged in the big_rec
2429 mini-transaction, so that recovery
2430 will not fail when the big_rec was
2431 written to the extended portion of the
2432 file, in case the file was somehow
2433 truncated in the crash. */
2434
2435 DEBUG_SYNC_C_IF_THD(
2436 thr_get_trx(thr)->mysql_thd,
2437 "before_row_ins_upd_extern");
2438 err = btr_store_big_rec_extern_fields(
2439 index, btr_cur_get_block(&cursor),
2440 rec, offsets, big_rec, &mtr,
2441 BTR_STORE_INSERT_UPDATE);
2442 DEBUG_SYNC_C_IF_THD(
2443 thr_get_trx(thr)->mysql_thd,
2444 "after_row_ins_upd_extern");
2445 /* If writing big_rec fails (for
2446 example, because of DB_OUT_OF_FILE_SPACE),
2447 the record will be corrupted. Even if
2448 we did not update any externally
2449 stored columns, our update could cause
2450 the record to grow so that a
2451 non-updated column was selected for
2452 external storage. This non-update
2453 would not have been written to the
2454 undo log, and thus the record cannot
2455 be rolled back.
2456
2457 However, because we have not executed
2458 mtr_commit(mtr) yet, the update will
2459 not be replayed in crash recovery, and
2460 the following assertion failure will
2461 effectively "roll back" the operation. */
2462 ut_a(err == DB_SUCCESS);
2463 dtuple_big_rec_free(big_rec);
2464 }
2465
2466 if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2467 row_log_table_insert(rec, index, offsets);
2468 }
2469
2470 mtr_commit(&mtr);
2471 mem_heap_free(entry_heap);
2472 } else {
2473 rec_t* insert_rec;
2474
2475 if (mode != BTR_MODIFY_TREE) {
2476 ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2477 == BTR_MODIFY_LEAF);
2478 err = btr_cur_optimistic_insert(
2479 flags, &cursor, &offsets, &offsets_heap,
2480 entry, &insert_rec, &big_rec,
2481 n_ext, thr, &mtr);
2482 } else {
2483 if (buf_LRU_buf_pool_running_out()) {
2484
2485 err = DB_LOCK_TABLE_FULL;
2486 goto err_exit;
2487 }
2488
2489 err = btr_cur_optimistic_insert(
2490 flags, &cursor,
2491 &offsets, &offsets_heap,
2492 entry, &insert_rec, &big_rec,
2493 n_ext, thr, &mtr);
2494
2495 if (err == DB_FAIL) {
2496 err = btr_cur_pessimistic_insert(
2497 flags, &cursor,
2498 &offsets, &offsets_heap,
2499 entry, &insert_rec, &big_rec,
2500 n_ext, thr, &mtr);
2501 }
2502 }
2503
2504 if (UNIV_LIKELY_NULL(big_rec)) {
2505 mtr_commit(&mtr);
2506
2507 /* Online table rebuild could read (and
2508 ignore) the incomplete record at this point.
2509 If online rebuild is in progress, the
2510 row_ins_index_entry_big_rec() will write log. */
2511
2512 DBUG_EXECUTE_IF(
2513 "row_ins_extern_checkpoint",
2514 log_make_checkpoint_at(
2515 LSN_MAX, TRUE););
2516 err = row_ins_index_entry_big_rec(
2517 entry, big_rec, offsets, &offsets_heap, index,
2518 thr_get_trx(thr)->mysql_thd,
2519 __FILE__, __LINE__);
2520 dtuple_convert_back_big_rec(index, entry, big_rec);
2521 } else {
2522 if (err == DB_SUCCESS
2523 && dict_index_is_online_ddl(index)) {
2524 row_log_table_insert(
2525 insert_rec, index, offsets);
2526 }
2527
2528 mtr_commit(&mtr);
2529 }
2530 }
2531
2532 func_exit:
2533 if (offsets_heap) {
2534 mem_heap_free(offsets_heap);
2535 }
2536
2537 return(err);
2538 }
2539
2540 /***************************************************************//**
2541 Starts a mini-transaction and checks if the index will be dropped.
2542 @return true if the index is to be dropped */
2543 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2544 bool
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2545 row_ins_sec_mtr_start_and_check_if_aborted(
2546 /*=======================================*/
2547 mtr_t* mtr, /*!< out: mini-transaction */
2548 dict_index_t* index, /*!< in/out: secondary index */
2549 bool check, /*!< in: whether to check */
2550 ulint search_mode)
2551 /*!< in: flags */
2552 {
2553 ut_ad(!dict_index_is_clust(index));
2554
2555 mtr_start(mtr);
2556
2557 if (!check) {
2558 return(false);
2559 }
2560
2561 if (search_mode & BTR_ALREADY_S_LATCHED) {
2562 mtr_s_lock(dict_index_get_lock(index), mtr);
2563 } else {
2564 mtr_x_lock(dict_index_get_lock(index), mtr);
2565 }
2566
2567 switch (index->online_status) {
2568 case ONLINE_INDEX_ABORTED:
2569 case ONLINE_INDEX_ABORTED_DROPPED:
2570 ut_ad(*index->name == TEMP_INDEX_PREFIX);
2571 return(true);
2572 case ONLINE_INDEX_COMPLETE:
2573 return(false);
2574 case ONLINE_INDEX_CREATION:
2575 break;
2576 }
2577
2578 ut_error;
2579 return(true);
2580 }
2581
2582 /***************************************************************//**
2583 Tries to insert an entry into a secondary index. If a record with exactly the
2584 same fields is found, the other record is necessarily marked deleted.
2585 It is then unmarked. Otherwise, the entry is just inserted to the index.
2586 @retval DB_SUCCESS on success
2587 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2588 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2589 @return error code */
2590 UNIV_INTERN
2591 dberr_t
row_ins_sec_index_entry_low(ulint flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr)2592 row_ins_sec_index_entry_low(
2593 /*========================*/
2594 ulint flags, /*!< in: undo logging and locking flags */
2595 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2596 depending on whether we wish optimistic or
2597 pessimistic descent down the index tree */
2598 dict_index_t* index, /*!< in: secondary index */
2599 mem_heap_t* offsets_heap,
2600 /*!< in/out: memory heap that can be emptied */
2601 mem_heap_t* heap, /*!< in/out: memory heap */
2602 dtuple_t* entry, /*!< in/out: index entry to insert */
2603 trx_id_t trx_id, /*!< in: PAGE_MAX_TRX_ID during
2604 row_log_table_apply(), or 0 */
2605 que_thr_t* thr) /*!< in: query thread */
2606 {
2607 btr_cur_t cursor;
2608 ulint search_mode = mode | BTR_INSERT;
2609 dberr_t err = DB_SUCCESS;
2610 ulint n_unique;
2611 mtr_t mtr;
2612 ulint* offsets = NULL;
2613
2614 ut_ad(!dict_index_is_clust(index));
2615 ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2616
2617 cursor.thr = thr;
2618 ut_ad(thr_get_trx(thr)->id);
2619 mtr_start(&mtr);
2620
2621 /* Ensure that we acquire index->lock when inserting into an
2622 index with index->online_status == ONLINE_INDEX_COMPLETE, but
2623 could still be subject to rollback_inplace_alter_table().
2624 This prevents a concurrent change of index->online_status.
2625 The memory object cannot be freed as long as we have an open
2626 reference to the table, or index->table->n_ref_count > 0. */
2627 const bool check = *index->name == TEMP_INDEX_PREFIX;
2628 if (check) {
2629 DEBUG_SYNC_C("row_ins_sec_index_enter");
2630 if (mode == BTR_MODIFY_LEAF) {
2631 search_mode |= BTR_ALREADY_S_LATCHED;
2632 mtr_s_lock(dict_index_get_lock(index), &mtr);
2633 } else {
2634 mtr_x_lock(dict_index_get_lock(index), &mtr);
2635 }
2636
2637 if (row_log_online_op_try(
2638 index, entry, thr_get_trx(thr)->id)) {
2639 goto func_exit;
2640 }
2641 }
2642
2643 /* Note that we use PAGE_CUR_LE as the search mode, because then
2644 the function will return in both low_match and up_match of the
2645 cursor sensible values */
2646
2647 if (!thr_get_trx(thr)->check_unique_secondary) {
2648 search_mode |= BTR_IGNORE_SEC_UNIQUE;
2649 }
2650
2651 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2652 search_mode,
2653 &cursor, 0, __FILE__, __LINE__, &mtr);
2654
2655 if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2656 /* The insert was buffered during the search: we are done */
2657 goto func_exit;
2658 }
2659
2660 #ifdef UNIV_DEBUG
2661 {
2662 page_t* page = btr_cur_get_page(&cursor);
2663 rec_t* first_rec = page_rec_get_next(
2664 page_get_infimum_rec(page));
2665
2666 ut_ad(page_rec_is_supremum(first_rec)
2667 || rec_get_n_fields(first_rec, index)
2668 == dtuple_get_n_fields(entry));
2669 }
2670 #endif
2671
2672 n_unique = dict_index_get_n_unique(index);
2673
2674 if (dict_index_is_unique(index)
2675 && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
2676 mtr_commit(&mtr);
2677
2678 DEBUG_SYNC_C("row_ins_sec_index_unique");
2679
2680 if (row_ins_sec_mtr_start_and_check_if_aborted(
2681 &mtr, index, check, search_mode)) {
2682 goto func_exit;
2683 }
2684
2685 err = row_ins_scan_sec_index_for_duplicate(
2686 flags, index, entry, thr, check, &mtr, offsets_heap);
2687
2688 mtr_commit(&mtr);
2689
2690 switch (err) {
2691 case DB_SUCCESS:
2692 break;
2693 case DB_DUPLICATE_KEY:
2694 if (*index->name == TEMP_INDEX_PREFIX) {
2695 ut_ad(!thr_get_trx(thr)
2696 ->dict_operation_lock_mode);
2697 mutex_enter(&dict_sys->mutex);
2698 dict_set_corrupted_index_cache_only(
2699 index, index->table);
2700 mutex_exit(&dict_sys->mutex);
2701 /* Do not return any error to the
2702 caller. The duplicate will be reported
2703 by ALTER TABLE or CREATE UNIQUE INDEX.
2704 Unfortunately we cannot report the
2705 duplicate key value to the DDL thread,
2706 because the altered_table object is
2707 private to its call stack. */
2708 err = DB_SUCCESS;
2709 }
2710 /* fall through */
2711 default:
2712 return(err);
2713 }
2714
2715 if (row_ins_sec_mtr_start_and_check_if_aborted(
2716 &mtr, index, check, search_mode)) {
2717 goto func_exit;
2718 }
2719
2720 DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
2721
2722 /* We did not find a duplicate and we have now
2723 locked with s-locks the necessary records to
2724 prevent any insertion of a duplicate by another
2725 transaction. Let us now reposition the cursor and
2726 continue the insertion. */
2727
2728 btr_cur_search_to_nth_level(
2729 index, 0, entry, PAGE_CUR_LE,
2730 search_mode & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE),
2731 &cursor, 0, __FILE__, __LINE__, &mtr);
2732 }
2733
2734 if (row_ins_must_modify_rec(&cursor)) {
2735 /* There is already an index entry with a long enough common
2736 prefix, we must convert the insert into a modify of an
2737 existing record */
2738 offsets = rec_get_offsets(
2739 btr_cur_get_rec(&cursor), index, offsets,
2740 ULINT_UNDEFINED, &offsets_heap);
2741
2742 err = row_ins_sec_index_entry_by_modify(
2743 flags, mode, &cursor, &offsets,
2744 offsets_heap, heap, entry, thr, &mtr);
2745 } else {
2746 rec_t* insert_rec;
2747 big_rec_t* big_rec;
2748
2749 if (mode == BTR_MODIFY_LEAF) {
2750 err = btr_cur_optimistic_insert(
2751 flags, &cursor, &offsets, &offsets_heap,
2752 entry, &insert_rec,
2753 &big_rec, 0, thr, &mtr);
2754 } else {
2755 ut_ad(mode == BTR_MODIFY_TREE);
2756 if (buf_LRU_buf_pool_running_out()) {
2757
2758 err = DB_LOCK_TABLE_FULL;
2759 goto func_exit;
2760 }
2761
2762 err = btr_cur_optimistic_insert(
2763 flags, &cursor,
2764 &offsets, &offsets_heap,
2765 entry, &insert_rec,
2766 &big_rec, 0, thr, &mtr);
2767 if (err == DB_FAIL) {
2768 err = btr_cur_pessimistic_insert(
2769 flags, &cursor,
2770 &offsets, &offsets_heap,
2771 entry, &insert_rec,
2772 &big_rec, 0, thr, &mtr);
2773 }
2774 }
2775
2776 if (err == DB_SUCCESS && trx_id) {
2777 page_update_max_trx_id(
2778 btr_cur_get_block(&cursor),
2779 btr_cur_get_page_zip(&cursor),
2780 trx_id, &mtr);
2781 }
2782
2783 ut_ad(!big_rec);
2784 }
2785
2786 func_exit:
2787 mtr_commit(&mtr);
2788 return(err);
2789 }
2790
2791 /***************************************************************//**
2792 Tries to insert the externally stored fields (off-page columns)
2793 of a clustered index entry.
2794 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
2795 UNIV_INTERN
2796 dberr_t
row_ins_index_entry_big_rec_func(const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,dict_index_t * index,const char * file,const void * thd,ulint line)2797 row_ins_index_entry_big_rec_func(
2798 /*=============================*/
2799 const dtuple_t* entry, /*!< in/out: index entry to insert */
2800 const big_rec_t* big_rec,/*!< in: externally stored fields */
2801 ulint* offsets,/*!< in/out: rec offsets */
2802 mem_heap_t** heap, /*!< in/out: memory heap */
2803 dict_index_t* index, /*!< in: index */
2804 const char* file, /*!< in: file name of caller */
2805 #ifndef DBUG_OFF
2806 const void* thd, /*!< in: connection, or NULL */
2807 #endif /* DBUG_OFF */
2808 ulint line) /*!< in: line number of caller */
2809 {
2810 mtr_t mtr;
2811 btr_cur_t cursor;
2812 rec_t* rec;
2813 dberr_t error;
2814
2815 ut_ad(dict_index_is_clust(index));
2816
2817 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
2818
2819 mtr_start(&mtr);
2820 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2821 BTR_MODIFY_TREE, &cursor, 0,
2822 file, line, &mtr);
2823 rec = btr_cur_get_rec(&cursor);
2824 offsets = rec_get_offsets(rec, index, offsets,
2825 ULINT_UNDEFINED, heap);
2826
2827 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
2828 error = btr_store_big_rec_extern_fields(
2829 index, btr_cur_get_block(&cursor),
2830 rec, offsets, big_rec, &mtr, BTR_STORE_INSERT);
2831 DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
2832
2833 if (error == DB_SUCCESS
2834 && dict_index_is_online_ddl(index)) {
2835 row_log_table_insert(rec, index, offsets);
2836 }
2837
2838 mtr_commit(&mtr);
2839
2840 return(error);
2841 }
2842
2843 /***************************************************************//**
2844 Inserts an entry into a clustered index. Tries first optimistic,
2845 then pessimistic descent down the tree. If the entry matches enough
2846 to a delete marked record, performs the insert by updating or delete
2847 unmarking the delete marked record.
2848 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2849 UNIV_INTERN
2850 dberr_t
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,ulint n_ext)2851 row_ins_clust_index_entry(
2852 /*======================*/
2853 dict_index_t* index, /*!< in: clustered index */
2854 dtuple_t* entry, /*!< in/out: index entry to insert */
2855 que_thr_t* thr, /*!< in: query thread */
2856 ulint n_ext) /*!< in: number of externally stored columns */
2857 {
2858 dberr_t err;
2859 ulint n_uniq;
2860
2861 if (!index->table->foreign_set.empty()) {
2862 err = row_ins_check_foreign_constraints(
2863 index->table, index, entry, thr);
2864 if (err != DB_SUCCESS) {
2865
2866 return(err);
2867 }
2868 }
2869
2870 n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
2871
2872 /* Try first optimistic descent to the B-tree */
2873
2874 log_free_check();
2875
2876 err = row_ins_clust_index_entry_low(
2877 0, BTR_MODIFY_LEAF, index, n_uniq, entry, n_ext, thr);
2878
2879 #ifdef UNIV_DEBUG
2880 /* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
2881 Once it is fixed, remove the 'ifdef', 'if' and this comment. */
2882 if (!thr_get_trx(thr)->ddl) {
2883 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
2884 "after_row_ins_clust_index_entry_leaf");
2885 }
2886 #endif /* UNIV_DEBUG */
2887
2888 if (err != DB_FAIL) {
2889 DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
2890 return(err);
2891 }
2892
2893 /* Try then pessimistic descent to the B-tree */
2894
2895 log_free_check();
2896
2897 return(row_ins_clust_index_entry_low(
2898 0, BTR_MODIFY_TREE, index, n_uniq, entry, n_ext, thr));
2899 }
2900
2901 /***************************************************************//**
2902 Inserts an entry into a secondary index. Tries first optimistic,
2903 then pessimistic descent down the tree. If the entry matches enough
2904 to a delete marked record, performs the insert by updating or delete
2905 unmarking the delete marked record.
2906 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2907 UNIV_INTERN
2908 dberr_t
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)2909 row_ins_sec_index_entry(
2910 /*====================*/
2911 dict_index_t* index, /*!< in: secondary index */
2912 dtuple_t* entry, /*!< in/out: index entry to insert */
2913 que_thr_t* thr) /*!< in: query thread */
2914 {
2915 dberr_t err;
2916 mem_heap_t* offsets_heap;
2917 mem_heap_t* heap;
2918
2919 if (!index->table->foreign_set.empty()) {
2920 err = row_ins_check_foreign_constraints(index->table, index,
2921 entry, thr);
2922 if (err != DB_SUCCESS) {
2923
2924 return(err);
2925 }
2926 }
2927
2928 ut_ad(thr_get_trx(thr)->id);
2929
2930 offsets_heap = mem_heap_create(1024);
2931 heap = mem_heap_create(1024);
2932
2933 /* Try first optimistic descent to the B-tree */
2934
2935 log_free_check();
2936
2937 err = row_ins_sec_index_entry_low(
2938 0, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry, 0, thr);
2939 if (err == DB_FAIL) {
2940 mem_heap_empty(heap);
2941
2942 /* Try then pessimistic descent to the B-tree */
2943
2944 log_free_check();
2945
2946 err = row_ins_sec_index_entry_low(
2947 0, BTR_MODIFY_TREE, index,
2948 offsets_heap, heap, entry, 0, thr);
2949 }
2950
2951 mem_heap_free(heap);
2952 mem_heap_free(offsets_heap);
2953 return(err);
2954 }
2955
2956 /***************************************************************//**
2957 Inserts an index entry to index. Tries first optimistic, then pessimistic
2958 descent down the tree. If the entry matches enough to a delete marked record,
2959 performs the insert by updating or delete unmarking the delete marked
2960 record.
2961 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2962 static
2963 dberr_t
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)2964 row_ins_index_entry(
2965 /*================*/
2966 dict_index_t* index, /*!< in: index */
2967 dtuple_t* entry, /*!< in/out: index entry to insert */
2968 que_thr_t* thr) /*!< in: query thread */
2969 {
2970 DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
2971 DBUG_SET("-d,row_ins_index_entry_timeout");
2972 return(DB_LOCK_WAIT);});
2973
2974 if (dict_index_is_clust(index)) {
2975 return(row_ins_clust_index_entry(index, entry, thr, 0));
2976 } else {
2977 return(row_ins_sec_index_entry(index, entry, thr));
2978 }
2979 }
2980
2981 /***********************************************************//**
2982 Sets the values of the dtuple fields in entry from the values of appropriate
2983 columns in row. */
2984 static MY_ATTRIBUTE((nonnull))
2985 void
row_ins_index_entry_set_vals(dict_index_t * index,dtuple_t * entry,const dtuple_t * row)2986 row_ins_index_entry_set_vals(
2987 /*=========================*/
2988 dict_index_t* index, /*!< in: index */
2989 dtuple_t* entry, /*!< in: index entry to make */
2990 const dtuple_t* row) /*!< in: row */
2991 {
2992 ulint n_fields;
2993 ulint i;
2994
2995 n_fields = dtuple_get_n_fields(entry);
2996
2997 for (i = 0; i < n_fields; i++) {
2998 dict_field_t* ind_field;
2999 dfield_t* field;
3000 const dfield_t* row_field;
3001 ulint len;
3002
3003 field = dtuple_get_nth_field(entry, i);
3004 ind_field = dict_index_get_nth_field(index, i);
3005 row_field = dtuple_get_nth_field(row, ind_field->col->ind);
3006 len = dfield_get_len(row_field);
3007
3008 /* Check column prefix indexes */
3009 if (ind_field->prefix_len > 0
3010 && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3011
3012 const dict_col_t* col
3013 = dict_field_get_col(ind_field);
3014
3015 len = dtype_get_at_most_n_mbchars(
3016 col->prtype, col->mbminmaxlen,
3017 ind_field->prefix_len,
3018 len,
3019 static_cast<const char*>(
3020 dfield_get_data(row_field)));
3021
3022 ut_ad(!dfield_is_ext(row_field));
3023 }
3024
3025 dfield_set_data(field, dfield_get_data(row_field), len);
3026 if (dfield_is_ext(row_field)) {
3027 ut_ad(dict_index_is_clust(index));
3028 dfield_set_ext(field);
3029 }
3030 }
3031 }
3032
3033 /***********************************************************//**
3034 Inserts a single index entry to the table.
3035 @return DB_SUCCESS if operation successfully completed, else error
3036 code or DB_LOCK_WAIT */
3037 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3038 dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3039 row_ins_index_entry_step(
3040 /*=====================*/
3041 ins_node_t* node, /*!< in: row insert node */
3042 que_thr_t* thr) /*!< in: query thread */
3043 {
3044 dberr_t err;
3045
3046 ut_ad(dtuple_check_typed(node->row));
3047
3048 row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3049
3050 ut_ad(dtuple_check_typed(node->entry));
3051
3052 err = row_ins_index_entry(node->index, node->entry, thr);
3053
3054 #ifdef UNIV_DEBUG
3055 /* Work around Bug#14626800 ASSERTION FAILURE IN DEBUG_SYNC().
3056 Once it is fixed, remove the 'ifdef', 'if' and this comment. */
3057 if (!thr_get_trx(thr)->ddl) {
3058 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3059 "after_row_ins_index_entry_step");
3060 }
3061 #endif /* UNIV_DEBUG */
3062
3063 return(err);
3064 }
3065
3066 /***********************************************************//**
3067 Allocates a row id for row and inits the node->index field. */
3068 UNIV_INLINE
3069 void
row_ins_alloc_row_id_step(ins_node_t * node)3070 row_ins_alloc_row_id_step(
3071 /*======================*/
3072 ins_node_t* node) /*!< in: row insert node */
3073 {
3074 row_id_t row_id;
3075
3076 ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3077
3078 if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3079
3080 /* No row id is stored if the clustered index is unique */
3081
3082 return;
3083 }
3084
3085 /* Fill in row id value to row */
3086
3087 row_id = dict_sys_get_new_row_id();
3088
3089 dict_sys_write_row_id(node->row_id_buf, row_id);
3090 }
3091
3092 /***********************************************************//**
3093 Gets a row to insert from the values list. */
3094 UNIV_INLINE
3095 void
row_ins_get_row_from_values(ins_node_t * node)3096 row_ins_get_row_from_values(
3097 /*========================*/
3098 ins_node_t* node) /*!< in: row insert node */
3099 {
3100 que_node_t* list_node;
3101 dfield_t* dfield;
3102 dtuple_t* row;
3103 ulint i;
3104
3105 /* The field values are copied in the buffers of the select node and
3106 it is safe to use them until we fetch from select again: therefore
3107 we can just copy the pointers */
3108
3109 row = node->row;
3110
3111 i = 0;
3112 list_node = node->values_list;
3113
3114 while (list_node) {
3115 eval_exp(list_node);
3116
3117 dfield = dtuple_get_nth_field(row, i);
3118 dfield_copy_data(dfield, que_node_get_val(list_node));
3119
3120 i++;
3121 list_node = que_node_get_next(list_node);
3122 }
3123 }
3124
3125 /***********************************************************//**
3126 Gets a row to insert from the select list. */
3127 UNIV_INLINE
3128 void
row_ins_get_row_from_select(ins_node_t * node)3129 row_ins_get_row_from_select(
3130 /*========================*/
3131 ins_node_t* node) /*!< in: row insert node */
3132 {
3133 que_node_t* list_node;
3134 dfield_t* dfield;
3135 dtuple_t* row;
3136 ulint i;
3137
3138 /* The field values are copied in the buffers of the select node and
3139 it is safe to use them until we fetch from select again: therefore
3140 we can just copy the pointers */
3141
3142 row = node->row;
3143
3144 i = 0;
3145 list_node = node->select->select_list;
3146
3147 while (list_node) {
3148 dfield = dtuple_get_nth_field(row, i);
3149 dfield_copy_data(dfield, que_node_get_val(list_node));
3150
3151 i++;
3152 list_node = que_node_get_next(list_node);
3153 }
3154 }
3155
3156 /***********************************************************//**
3157 Inserts a row to a table.
3158 @return DB_SUCCESS if operation successfully completed, else error
3159 code or DB_LOCK_WAIT */
3160 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3161 dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3162 row_ins(
3163 /*====*/
3164 ins_node_t* node, /*!< in: row insert node */
3165 que_thr_t* thr) /*!< in: query thread */
3166 {
3167 dberr_t err;
3168
3169 if (node->state == INS_NODE_ALLOC_ROW_ID) {
3170
3171 row_ins_alloc_row_id_step(node);
3172
3173 node->index = dict_table_get_first_index(node->table);
3174 node->entry = UT_LIST_GET_FIRST(node->entry_list);
3175
3176 if (node->ins_type == INS_SEARCHED) {
3177
3178 row_ins_get_row_from_select(node);
3179
3180 } else if (node->ins_type == INS_VALUES) {
3181
3182 row_ins_get_row_from_values(node);
3183 }
3184
3185 node->state = INS_NODE_INSERT_ENTRIES;
3186 }
3187
3188 ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3189
3190 while (node->index != NULL) {
3191 if (node->index->type != DICT_FTS) {
3192 err = row_ins_index_entry_step(node, thr);
3193
3194 if (err != DB_SUCCESS) {
3195
3196 return(err);
3197 }
3198 }
3199
3200 node->index = dict_table_get_next_index(node->index);
3201 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3202
3203 DBUG_EXECUTE_IF(
3204 "row_ins_skip_sec",
3205 node->index = NULL; node->entry = NULL; break;);
3206
3207 /* Skip corrupted secondary index and its entry */
3208 while (node->index && dict_index_is_corrupted(node->index)) {
3209
3210 node->index = dict_table_get_next_index(node->index);
3211 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3212 }
3213 }
3214
3215 ut_ad(node->entry == NULL);
3216
3217 node->state = INS_NODE_ALLOC_ROW_ID;
3218
3219 return(DB_SUCCESS);
3220 }
3221
3222 /***********************************************************//**
3223 Inserts a row to a table. This is a high-level function used in SQL execution
3224 graphs.
3225 @return query thread to run next or NULL */
3226 UNIV_INTERN
3227 que_thr_t*
row_ins_step(que_thr_t * thr)3228 row_ins_step(
3229 /*=========*/
3230 que_thr_t* thr) /*!< in: query thread */
3231 {
3232 ins_node_t* node;
3233 que_node_t* parent;
3234 sel_node_t* sel_node;
3235 trx_t* trx;
3236 dberr_t err;
3237
3238 ut_ad(thr);
3239
3240 trx = thr_get_trx(thr);
3241
3242 trx_start_if_not_started_xa(trx);
3243
3244 node = static_cast<ins_node_t*>(thr->run_node);
3245
3246 ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3247
3248 parent = que_node_get_parent(node);
3249 sel_node = node->select;
3250
3251 if (thr->prev_node == parent) {
3252 node->state = INS_NODE_SET_IX_LOCK;
3253 }
3254
3255 /* If this is the first time this node is executed (or when
3256 execution resumes after wait for the table IX lock), set an
3257 IX lock on the table and reset the possible select node. MySQL's
3258 partitioned table code may also call an insert within the same
3259 SQL statement AFTER it has used this table handle to do a search.
3260 This happens, for example, when a row update moves it to another
3261 partition. In that case, we have already set the IX lock on the
3262 table during the search operation, and there is no need to set
3263 it again here. But we must write trx->id to node->trx_id_buf. */
3264
3265 trx_write_trx_id(node->trx_id_buf, trx->id);
3266
3267 if (node->state == INS_NODE_SET_IX_LOCK) {
3268
3269 node->state = INS_NODE_ALLOC_ROW_ID;
3270
3271 /* It may be that the current session has not yet started
3272 its transaction, or it has been committed: */
3273
3274 if (trx->id == node->trx_id) {
3275 /* No need to do IX-locking */
3276
3277 goto same_trx;
3278 }
3279
3280 err = lock_table(0, node->table, LOCK_IX, thr);
3281
3282 DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3283 err = DB_LOCK_WAIT;);
3284
3285 if (err != DB_SUCCESS) {
3286
3287 goto error_handling;
3288 }
3289
3290 node->trx_id = trx->id;
3291 same_trx:
3292 if (node->ins_type == INS_SEARCHED) {
3293 /* Reset the cursor */
3294 sel_node->state = SEL_NODE_OPEN;
3295
3296 /* Fetch a row to insert */
3297
3298 thr->run_node = sel_node;
3299
3300 return(thr);
3301 }
3302 }
3303
3304 if ((node->ins_type == INS_SEARCHED)
3305 && (sel_node->state != SEL_NODE_FETCH)) {
3306
3307 ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3308
3309 /* No more rows to insert */
3310 thr->run_node = parent;
3311
3312 return(thr);
3313 }
3314
3315 /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3316
3317 err = row_ins(node, thr);
3318
3319 error_handling:
3320 trx->error_state = err;
3321
3322 if (err != DB_SUCCESS) {
3323 /* err == DB_LOCK_WAIT or SQL error detected */
3324 return(NULL);
3325 }
3326
3327 /* DO THE TRIGGER ACTIONS HERE */
3328
3329 if (node->ins_type == INS_SEARCHED) {
3330 /* Fetch a row to insert */
3331
3332 thr->run_node = sel_node;
3333 } else {
3334 thr->run_node = que_node_get_parent(node);
3335 }
3336
3337 return(thr);
3338 }
3339