1 /*****************************************************************************
2
3 Copyright (c) 1996, 2018, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0ins.cc
29 Insert into a table
30
31 Created 4/20/1996 Heikki Tuuri
32 *******************************************************/
33
34 #include "ha_prototypes.h"
35
36 #include "row0ins.h"
37
38 #ifdef UNIV_NONINL
39 #include "row0ins.ic"
40 #endif
41
42 #include "dict0dict.h"
43 #include "dict0boot.h"
44 #include "trx0rec.h"
45 #include "trx0undo.h"
46 #include "btr0btr.h"
47 #include "btr0cur.h"
48 #include "mach0data.h"
49 #include "que0que.h"
50 #include "row0upd.h"
51 #include "row0sel.h"
52 #include "row0row.h"
53 #include "row0log.h"
54 #include "rem0cmp.h"
55 #include "lock0lock.h"
56 #include "log0log.h"
57 #include "eval0eval.h"
58 #include "data0data.h"
59 #include "usr0sess.h"
60 #include "buf0lru.h"
61 #include "fts0fts.h"
62 #include "fts0types.h"
63 #include "m_string.h"
64 #include "gis0geo.h"
65
66 /*************************************************************************
67 IMPORTANT NOTE: Any operation that generates redo MUST check that there
68 is enough space in the redo log before for that operation. This is
69 done by calling log_free_check(). The reason for checking the
70 availability of the redo log space before the start of the operation is
71 that we MUST not hold any synchonization objects when performing the
72 check.
73 If you make a change in this module make sure that no codepath is
74 introduced where a call to log_free_check() is bypassed. */
75
76 /*********************************************************************//**
77 Creates an insert node struct.
78 @return own: insert node struct */
79 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)80 ins_node_create(
81 /*============*/
82 ulint ins_type, /*!< in: INS_VALUES, ... */
83 dict_table_t* table, /*!< in: table where to insert */
84 mem_heap_t* heap) /*!< in: mem heap where created */
85 {
86 ins_node_t* node;
87
88 node = static_cast<ins_node_t*>(
89 mem_heap_alloc(heap, sizeof(ins_node_t)));
90
91 node->common.type = QUE_NODE_INSERT;
92
93 node->ins_type = ins_type;
94
95 node->state = INS_NODE_SET_IX_LOCK;
96 node->table = table;
97 node->index = NULL;
98 node->entry = NULL;
99
100 node->select = NULL;
101
102 node->trx_id = 0;
103
104 node->entry_sys_heap = mem_heap_create(128);
105
106 node->magic_n = INS_NODE_MAGIC_N;
107
108 return(node);
109 }
110
111 /***********************************************************//**
112 Creates an entry template for each index of a table. */
113 static
114 void
ins_node_create_entry_list(ins_node_t * node)115 ins_node_create_entry_list(
116 /*=======================*/
117 ins_node_t* node) /*!< in: row insert node */
118 {
119 dict_index_t* index;
120 dtuple_t* entry;
121
122 ut_ad(node->entry_sys_heap);
123
124 UT_LIST_INIT(node->entry_list, &dtuple_t::tuple_list);
125
126 /* We will include all indexes (include those corrupted
127 secondary indexes) in the entry list. Filteration of
128 these corrupted index will be done in row_ins() */
129
130 for (index = dict_table_get_first_index(node->table);
131 index != 0;
132 index = dict_table_get_next_index(index)) {
133
134 entry = row_build_index_entry_low(
135 node->row, NULL, index, node->entry_sys_heap,
136 ROW_BUILD_FOR_INSERT);
137
138 UT_LIST_ADD_LAST(node->entry_list, entry);
139 }
140 }
141
142 /*****************************************************************//**
143 Adds system field buffers to a row. */
144 static
145 void
row_ins_alloc_sys_fields(ins_node_t * node)146 row_ins_alloc_sys_fields(
147 /*=====================*/
148 ins_node_t* node) /*!< in: insert node */
149 {
150 dtuple_t* row;
151 dict_table_t* table;
152 mem_heap_t* heap;
153 const dict_col_t* col;
154 dfield_t* dfield;
155 byte* ptr;
156
157 row = node->row;
158 table = node->table;
159 heap = node->entry_sys_heap;
160
161 ut_ad(row && table && heap);
162 ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
163
164 /* allocate buffer to hold the needed system created hidden columns. */
165 uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN;
166 if (!dict_table_is_intrinsic(table)) {
167 len += DATA_ROLL_PTR_LEN;
168 }
169 ptr = static_cast<byte*>(mem_heap_zalloc(heap, len));
170
171 /* 1. Populate row-id */
172 col = dict_table_get_sys_col(table, DATA_ROW_ID);
173
174 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
175
176 dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
177
178 node->row_id_buf = ptr;
179
180 ptr += DATA_ROW_ID_LEN;
181
182 /* 2. Populate trx id */
183 col = dict_table_get_sys_col(table, DATA_TRX_ID);
184
185 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
186
187 dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
188
189 node->trx_id_buf = ptr;
190
191 ptr += DATA_TRX_ID_LEN;
192
193 if (!dict_table_is_intrinsic(table)) {
194 col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
195
196 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
197
198 dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
199 }
200 }
201
202 /*********************************************************************//**
203 Sets a new row to insert for an INS_DIRECT node. This function is only used
204 if we have constructed the row separately, which is a rare case; this
205 function is quite slow. */
206 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)207 ins_node_set_new_row(
208 /*=================*/
209 ins_node_t* node, /*!< in: insert node */
210 dtuple_t* row) /*!< in: new row (or first row) for the node */
211 {
212 node->state = INS_NODE_SET_IX_LOCK;
213 node->index = NULL;
214 node->entry = NULL;
215
216 node->row = row;
217
218 mem_heap_empty(node->entry_sys_heap);
219
220 /* Create templates for index entries */
221
222 ins_node_create_entry_list(node);
223
224 /* Allocate from entry_sys_heap buffers for sys fields */
225
226 row_ins_alloc_sys_fields(node);
227
228 /* As we allocated a new trx id buf, the trx id should be written
229 there again: */
230
231 node->trx_id = 0;
232 }
233
234 /*******************************************************************//**
235 Does an insert operation by updating a delete-marked existing record
236 in the index. This situation can occur if the delete-marked record is
237 kept in the index for consistent reads.
238 @return DB_SUCCESS or error code */
239 static MY_ATTRIBUTE((warn_unused_result))
240 dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)241 row_ins_sec_index_entry_by_modify(
242 /*==============================*/
243 ulint flags, /*!< in: undo logging and locking flags */
244 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
245 depending on whether mtr holds just a leaf
246 latch or also a tree latch */
247 btr_cur_t* cursor, /*!< in: B-tree cursor */
248 ulint** offsets,/*!< in/out: offsets on cursor->page_cur.rec */
249 mem_heap_t* offsets_heap,
250 /*!< in/out: memory heap that can be emptied */
251 mem_heap_t* heap, /*!< in/out: memory heap */
252 const dtuple_t* entry, /*!< in: index entry to insert */
253 que_thr_t* thr, /*!< in: query thread */
254 mtr_t* mtr) /*!< in: mtr; must be committed before
255 latching any further pages */
256 {
257 big_rec_t* dummy_big_rec;
258 upd_t* update;
259 rec_t* rec;
260 dberr_t err;
261
262 rec = btr_cur_get_rec(cursor);
263
264 ut_ad(!dict_index_is_clust(cursor->index));
265 ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
266 ut_ad(!entry->info_bits);
267
268 /* We know that in the alphabetical ordering, entry and rec are
269 identified. But in their binary form there may be differences if
270 there are char fields in them. Therefore we have to calculate the
271 difference. */
272
273 update = row_upd_build_sec_rec_difference_binary(
274 rec, cursor->index, *offsets, entry, heap);
275
276 if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
277 /* We should never insert in place of a record that
278 has not been delete-marked. The only exception is when
279 online CREATE INDEX copied the changes that we already
280 made to the clustered index, and completed the
281 secondary index creation before we got here. In this
282 case, the change would already be there. The CREATE
283 INDEX should be waiting for a MySQL meta-data lock
284 upgrade at least until this INSERT or UPDATE
285 returns. After that point, set_committed(true)
286 would be invoked in commit_inplace_alter_table(). */
287 ut_a(update->n_fields == 0);
288 ut_a(!cursor->index->is_committed());
289 ut_ad(!dict_index_is_online_ddl(cursor->index));
290 return(DB_SUCCESS);
291 }
292
293 if (mode == BTR_MODIFY_LEAF) {
294 /* Try an optimistic updating of the record, keeping changes
295 within the page */
296
297 /* TODO: pass only *offsets */
298 err = btr_cur_optimistic_update(
299 flags | BTR_KEEP_SYS_FLAG, cursor,
300 offsets, &offsets_heap, update, 0, thr,
301 thr_get_trx(thr)->id, mtr);
302 switch (err) {
303 case DB_OVERFLOW:
304 case DB_UNDERFLOW:
305 case DB_ZIP_OVERFLOW:
306 err = DB_FAIL;
307 default:
308 break;
309 }
310 } else {
311 ut_a(mode == BTR_MODIFY_TREE);
312 if (buf_LRU_buf_pool_running_out()) {
313
314 return(DB_LOCK_TABLE_FULL);
315 }
316
317 err = btr_cur_pessimistic_update(
318 flags | BTR_KEEP_SYS_FLAG, cursor,
319 offsets, &offsets_heap,
320 heap, &dummy_big_rec, update, 0,
321 thr, thr_get_trx(thr)->id, mtr);
322 ut_ad(!dummy_big_rec);
323 }
324
325 return(err);
326 }
327
328 /*******************************************************************//**
329 Does an insert operation by delete unmarking and updating a delete marked
330 existing record in the index. This situation can occur if the delete marked
331 record is kept in the index for consistent reads.
332 @return DB_SUCCESS, DB_FAIL, or error code */
333 static MY_ATTRIBUTE((warn_unused_result))
334 dberr_t
row_ins_clust_index_entry_by_modify(btr_pcur_t * pcur,ulint flags,ulint mode,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)335 row_ins_clust_index_entry_by_modify(
336 /*================================*/
337 btr_pcur_t* pcur, /*!< in/out: a persistent cursor pointing
338 to the clust_rec that is being modified. */
339 ulint flags, /*!< in: undo logging and locking flags */
340 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
341 depending on whether mtr holds just a leaf
342 latch or also a tree latch */
343 ulint** offsets,/*!< out: offsets on cursor->page_cur.rec */
344 mem_heap_t** offsets_heap,
345 /*!< in/out: pointer to memory heap that can
346 be emptied, or NULL */
347 mem_heap_t* heap, /*!< in/out: memory heap */
348 const dtuple_t* entry, /*!< in: index entry to insert */
349 que_thr_t* thr, /*!< in: query thread */
350 mtr_t* mtr) /*!< in: mtr; must be committed before
351 latching any further pages */
352 {
353 const rec_t* rec;
354 upd_t* update;
355 dberr_t err = DB_SUCCESS;
356 btr_cur_t* cursor = btr_pcur_get_btr_cur(pcur);
357 TABLE* mysql_table = NULL;
358 ut_ad(dict_index_is_clust(cursor->index));
359
360 rec = btr_cur_get_rec(cursor);
361
362 ut_ad(rec_get_deleted_flag(rec,
363 dict_table_is_comp(cursor->index->table)));
364
365 /* Build an update vector containing all the fields to be modified;
366 NOTE that this vector may NOT contain system columns trx_id or
367 roll_ptr */
368 if (thr->prebuilt != NULL) {
369 mysql_table = thr->prebuilt->m_mysql_table;
370 ut_ad(thr->prebuilt->trx == thr_get_trx(thr));
371 }
372
373 update = row_upd_build_difference_binary(
374 cursor->index, entry, rec, NULL, true,
375 thr_get_trx(thr), heap, mysql_table, &err);
376 if (err != DB_SUCCESS) {
377 return(err);
378 }
379
380 if (mode != BTR_MODIFY_TREE) {
381 ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
382
383 /* Try optimistic updating of the record, keeping changes
384 within the page */
385
386 err = btr_cur_optimistic_update(
387 flags, cursor, offsets, offsets_heap, update, 0, thr,
388 thr_get_trx(thr)->id, mtr);
389 switch (err) {
390 case DB_OVERFLOW:
391 case DB_UNDERFLOW:
392 case DB_ZIP_OVERFLOW:
393 err = DB_FAIL;
394 default:
395 break;
396 }
397 } else {
398 if (buf_LRU_buf_pool_running_out()) {
399
400 return(DB_LOCK_TABLE_FULL);
401
402 }
403
404 big_rec_t* big_rec = NULL;
405
406 err = btr_cur_pessimistic_update(
407 flags | BTR_KEEP_POS_FLAG,
408 cursor, offsets, offsets_heap, heap,
409 &big_rec, update, 0, thr, thr_get_trx(thr)->id, mtr);
410
411 if (big_rec) {
412 ut_a(err == DB_SUCCESS);
413
414 DEBUG_SYNC_C("before_row_ins_upd_extern");
415 err = btr_store_big_rec_extern_fields(
416 pcur, update, *offsets, big_rec, mtr,
417 BTR_STORE_INSERT_UPDATE);
418 DEBUG_SYNC_C("after_row_ins_upd_extern");
419 dtuple_big_rec_free(big_rec);
420 }
421 }
422
423 return(err);
424 }
425
426 /*********************************************************************//**
427 Returns TRUE if in a cascaded update/delete an ancestor node of node
428 updates (not DELETE, but UPDATE) table.
429 @return TRUE if an ancestor updates table */
430 static
431 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)432 row_ins_cascade_ancestor_updates_table(
433 /*===================================*/
434 que_node_t* node, /*!< in: node in a query graph */
435 dict_table_t* table) /*!< in: table */
436 {
437 que_node_t* parent;
438
439 for (parent = que_node_get_parent(node);
440 que_node_get_type(parent) == QUE_NODE_UPDATE;
441 parent = que_node_get_parent(parent)) {
442
443 upd_node_t* upd_node;
444
445 upd_node = static_cast<upd_node_t*>(parent);
446
447 if (upd_node->table == table && upd_node->is_delete == FALSE) {
448
449 return(TRUE);
450 }
451 }
452
453 return(FALSE);
454 }
455
456 /*********************************************************************//**
457 Returns the number of ancestor UPDATE or DELETE nodes of a
458 cascaded update/delete node.
459 @return number of ancestors */
460 static MY_ATTRIBUTE((warn_unused_result))
461 ulint
row_ins_cascade_n_ancestors(que_node_t * node)462 row_ins_cascade_n_ancestors(
463 /*========================*/
464 que_node_t* node) /*!< in: node in a query graph */
465 {
466 que_node_t* parent;
467 ulint n_ancestors = 0;
468
469 for (parent = que_node_get_parent(node);
470 que_node_get_type(parent) == QUE_NODE_UPDATE;
471 parent = que_node_get_parent(parent)) {
472
473 n_ancestors++;
474 }
475
476 return(n_ancestors);
477 }
478
479 /******************************************************************//**
480 Calculates the update vector node->cascade->update for a child table in
481 a cascaded update.
482 @return number of fields in the calculated update vector; the value
483 can also be 0 if no foreign key fields changed; the returned value is
484 ULINT_UNDEFINED if the column type in the child table is too short to
485 fit the new value in the parent table: that means the update fails */
486 static MY_ATTRIBUTE((warn_unused_result))
487 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)488 row_ins_cascade_calc_update_vec(
489 /*============================*/
490 upd_node_t* node, /*!< in: update node of the parent
491 table */
492 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
493 type is != 0 */
494 mem_heap_t* heap, /*!< in: memory heap to use as
495 temporary storage */
496 trx_t* trx, /*!< in: update transaction */
497 ibool* fts_col_affected)
498 /*!< out: is FTS column affected */
499 {
500 upd_node_t* cascade = node->cascade_node;
501 dict_table_t* table = foreign->foreign_table;
502 dict_index_t* index = foreign->foreign_index;
503 upd_t* update;
504 dict_table_t* parent_table;
505 dict_index_t* parent_index;
506 upd_t* parent_update;
507 ulint n_fields_updated;
508 ulint parent_field_no;
509 ulint i;
510 ulint j;
511 ibool doc_id_updated = FALSE;
512 ulint doc_id_pos = 0;
513 doc_id_t new_doc_id = FTS_NULL_DOC_ID;
514
515 ut_a(node);
516 ut_a(foreign);
517 ut_a(cascade);
518 ut_a(table);
519 ut_a(index);
520
521 /* Calculate the appropriate update vector which will set the fields
522 in the child index record to the same value (possibly padded with
523 spaces if the column is a fixed length CHAR or FIXBINARY column) as
524 the referenced index record will get in the update. */
525
526 parent_table = node->table;
527 ut_a(parent_table == foreign->referenced_table);
528 parent_index = foreign->referenced_index;
529 parent_update = node->update;
530
531 update = cascade->update;
532
533 update->info_bits = 0;
534
535 n_fields_updated = 0;
536
537 *fts_col_affected = FALSE;
538
539 if (table->fts) {
540 doc_id_pos = dict_table_get_nth_col_pos(
541 table, table->fts->doc_col);
542 }
543
544 for (i = 0; i < foreign->n_fields; i++) {
545
546 parent_field_no = dict_table_get_nth_col_pos(
547 parent_table,
548 dict_index_get_nth_col_no(parent_index, i));
549
550 for (j = 0; j < parent_update->n_fields; j++) {
551 const upd_field_t* parent_ufield
552 = &parent_update->fields[j];
553
554 if (parent_ufield->field_no == parent_field_no) {
555
556 ulint min_size;
557 const dict_col_t* col;
558 ulint ufield_len;
559 upd_field_t* ufield;
560
561 col = dict_index_get_nth_col(index, i);
562
563 /* A field in the parent index record is
564 updated. Let us make the update vector
565 field for the child table. */
566
567 ufield = update->fields + n_fields_updated;
568
569 ufield->field_no
570 = dict_table_get_nth_col_pos(
571 table, dict_col_get_no(col));
572
573 ufield->orig_len = 0;
574 ufield->exp = NULL;
575
576 ufield->new_val = parent_ufield->new_val;
577 ufield_len = dfield_get_len(&ufield->new_val);
578
579 /* Clear the "external storage" flag */
580 dfield_set_len(&ufield->new_val, ufield_len);
581
582 /* Do not allow a NOT NULL column to be
583 updated as NULL */
584
585 if (dfield_is_null(&ufield->new_val)
586 && (col->prtype & DATA_NOT_NULL)) {
587
588 return(ULINT_UNDEFINED);
589 }
590
591 /* If the new value would not fit in the
592 column, do not allow the update */
593
594 if (!dfield_is_null(&ufield->new_val)
595 && dtype_get_at_most_n_mbchars(
596 col->prtype, col->mbminmaxlen,
597 col->len,
598 ufield_len,
599 static_cast<char*>(
600 dfield_get_data(
601 &ufield->new_val)))
602 < ufield_len) {
603
604 return(ULINT_UNDEFINED);
605 }
606
607 /* If the parent column type has a different
608 length than the child column type, we may
609 need to pad with spaces the new value of the
610 child column */
611
612 min_size = dict_col_get_min_size(col);
613
614 /* Because UNIV_SQL_NULL (the marker
615 of SQL NULL values) exceeds all possible
616 values of min_size, the test below will
617 not hold for SQL NULL columns. */
618
619 if (min_size > ufield_len) {
620
621 byte* pad;
622 ulint pad_len;
623 byte* padded_data;
624 ulint mbminlen;
625
626 padded_data = static_cast<byte*>(
627 mem_heap_alloc(
628 heap, min_size));
629
630 pad = padded_data + ufield_len;
631 pad_len = min_size - ufield_len;
632
633 memcpy(padded_data,
634 dfield_get_data(&ufield
635 ->new_val),
636 ufield_len);
637
638 mbminlen = dict_col_get_mbminlen(col);
639
640 ut_ad(!(ufield_len % mbminlen));
641 ut_ad(!(min_size % mbminlen));
642
643 if (mbminlen == 1
644 && dtype_get_charset_coll(
645 col->prtype)
646 == DATA_MYSQL_BINARY_CHARSET_COLL) {
647 /* Do not pad BINARY columns */
648 return(ULINT_UNDEFINED);
649 }
650
651 row_mysql_pad_col(mbminlen,
652 pad, pad_len);
653 dfield_set_data(&ufield->new_val,
654 padded_data, min_size);
655 }
656
657 /* Check whether the current column has
658 FTS index on it */
659 if (table->fts
660 && dict_table_is_fts_column(
661 table->fts->indexes,
662 dict_col_get_no(col),
663 dict_col_is_virtual(col))
664 != ULINT_UNDEFINED) {
665 *fts_col_affected = TRUE;
666 }
667
668 /* If Doc ID is updated, check whether the
669 Doc ID is valid */
670 if (table->fts
671 && ufield->field_no == doc_id_pos) {
672 doc_id_t n_doc_id;
673
674 n_doc_id =
675 table->fts->cache->next_doc_id;
676
677 new_doc_id = fts_read_doc_id(
678 static_cast<const byte*>(
679 dfield_get_data(
680 &ufield->new_val)));
681
682 if (new_doc_id <= 0) {
683 ib::error() << "FTS Doc ID"
684 " must be larger than"
685 " 0";
686 return(ULINT_UNDEFINED);
687 }
688
689 if (new_doc_id < n_doc_id) {
690 ib::error() << "FTS Doc ID"
691 " must be larger than "
692 << n_doc_id - 1
693 << " for table "
694 << table->name;
695
696 return(ULINT_UNDEFINED);
697 }
698
699 *fts_col_affected = TRUE;
700 doc_id_updated = TRUE;
701 }
702
703 n_fields_updated++;
704 }
705 }
706 }
707
708 /* Generate a new Doc ID if FTS index columns get updated */
709 if (table->fts && *fts_col_affected) {
710 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
711 doc_id_t doc_id;
712 doc_id_t* next_doc_id;
713 upd_field_t* ufield;
714
715 next_doc_id = static_cast<doc_id_t*>(mem_heap_alloc(
716 heap, sizeof(doc_id_t)));
717
718 ut_ad(!doc_id_updated);
719 ufield = update->fields + n_fields_updated;
720 fts_get_next_doc_id(table, next_doc_id);
721 doc_id = fts_update_doc_id(table, ufield, next_doc_id);
722 n_fields_updated++;
723 fts_trx_add_op(trx, table, doc_id, FTS_INSERT, NULL);
724 } else {
725 if (doc_id_updated) {
726 ut_ad(new_doc_id);
727 fts_trx_add_op(trx, table, new_doc_id,
728 FTS_INSERT, NULL);
729 } else {
730 ib::error() << "FTS Doc ID must be updated"
731 " along with FTS indexed column for"
732 " table " << table->name;
733 return(ULINT_UNDEFINED);
734 }
735 }
736 }
737
738 update->n_fields = n_fields_updated;
739
740 return(n_fields_updated);
741 }
742
743 /*********************************************************************//**
744 Set detailed error message associated with foreign key errors for
745 the given transaction. */
746 static
747 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)748 row_ins_set_detailed(
749 /*=================*/
750 trx_t* trx, /*!< in: transaction */
751 dict_foreign_t* foreign) /*!< in: foreign key constraint */
752 {
753 ut_ad(!srv_read_only_mode);
754
755 mutex_enter(&srv_misc_tmpfile_mutex);
756 rewind(srv_misc_tmpfile);
757
758 if (os_file_set_eof(srv_misc_tmpfile)) {
759 ut_print_name(srv_misc_tmpfile, trx,
760 foreign->foreign_table_name);
761 dict_print_info_on_foreign_key_in_create_format(
762 srv_misc_tmpfile, trx, foreign, FALSE);
763 trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
764 } else {
765 trx_set_detailed_error(trx, "temp file operation failed");
766 }
767
768 mutex_exit(&srv_misc_tmpfile_mutex);
769 }
770
771 /*********************************************************************//**
772 Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
773 and displays information about the given transaction.
774 The caller must release dict_foreign_err_mutex. */
775 static
776 void
row_ins_foreign_trx_print(trx_t * trx)777 row_ins_foreign_trx_print(
778 /*======================*/
779 trx_t* trx) /*!< in: transaction */
780 {
781 ulint n_rec_locks;
782 ulint n_trx_locks;
783 ulint heap_size;
784
785 if (srv_read_only_mode) {
786 return;
787 }
788
789 lock_mutex_enter();
790 n_rec_locks = lock_number_of_rows_locked(&trx->lock);
791 n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
792 heap_size = mem_heap_get_size(trx->lock.lock_heap);
793 lock_mutex_exit();
794
795 trx_sys_mutex_enter();
796
797 mutex_enter(&dict_foreign_err_mutex);
798 rewind(dict_foreign_err_file);
799 ut_print_timestamp(dict_foreign_err_file);
800 fputs(" Transaction:\n", dict_foreign_err_file);
801
802 trx_print_low(dict_foreign_err_file, trx, 600,
803 n_rec_locks, n_trx_locks, heap_size);
804
805 trx_sys_mutex_exit();
806
807 ut_ad(mutex_own(&dict_foreign_err_mutex));
808 }
809
810 /*********************************************************************//**
811 Reports a foreign key error associated with an update or a delete of a
812 parent table index entry. */
813 static
814 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)815 row_ins_foreign_report_err(
816 /*=======================*/
817 const char* errstr, /*!< in: error string from the viewpoint
818 of the parent table */
819 que_thr_t* thr, /*!< in: query thread whose run_node
820 is an update node */
821 dict_foreign_t* foreign, /*!< in: foreign key constraint */
822 const rec_t* rec, /*!< in: a matching index record in the
823 child table */
824 const dtuple_t* entry) /*!< in: index entry in the parent
825 table */
826 {
827 if (srv_read_only_mode) {
828 return;
829 }
830
831 FILE* ef = dict_foreign_err_file;
832 trx_t* trx = thr_get_trx(thr);
833
834 row_ins_set_detailed(trx, foreign);
835
836 row_ins_foreign_trx_print(trx);
837
838 fputs("Foreign key constraint fails for table ", ef);
839 ut_print_name(ef, trx, foreign->foreign_table_name);
840 fputs(":\n", ef);
841 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
842 TRUE);
843 putc('\n', ef);
844 fputs(errstr, ef);
845 fprintf(ef, " in parent table, in index %s",
846 foreign->referenced_index->name());
847 if (entry) {
848 fputs(" tuple:\n", ef);
849 dtuple_print(ef, entry);
850 }
851 fputs("\nBut in child table ", ef);
852 ut_print_name(ef, trx, foreign->foreign_table_name);
853 fprintf(ef, ", in index %s", foreign->foreign_index->name());
854 if (rec) {
855 fputs(", there is a record:\n", ef);
856 rec_print(ef, rec, foreign->foreign_index);
857 } else {
858 fputs(", the record is not available\n", ef);
859 }
860 putc('\n', ef);
861
862 mutex_exit(&dict_foreign_err_mutex);
863 }
864
865 /*********************************************************************//**
866 Reports a foreign key error to dict_foreign_err_file when we are trying
867 to add an index entry to a child table. Note that the adding may be the result
868 of an update, too. */
869 static
870 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)871 row_ins_foreign_report_add_err(
872 /*===========================*/
873 trx_t* trx, /*!< in: transaction */
874 dict_foreign_t* foreign, /*!< in: foreign key constraint */
875 const rec_t* rec, /*!< in: a record in the parent table:
876 it does not match entry because we
877 have an error! */
878 const dtuple_t* entry) /*!< in: index entry to insert in the
879 child table */
880 {
881 if (srv_read_only_mode) {
882 return;
883 }
884
885 FILE* ef = dict_foreign_err_file;
886
887 row_ins_set_detailed(trx, foreign);
888
889 row_ins_foreign_trx_print(trx);
890
891 fputs("Foreign key constraint fails for table ", ef);
892 ut_print_name(ef, trx, foreign->foreign_table_name);
893 fputs(":\n", ef);
894 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
895 TRUE);
896 fprintf(ef, "\nTrying to add in child table, in index %s",
897 foreign->foreign_index->name());
898 if (entry) {
899 fputs(" tuple:\n", ef);
900 /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
901 It would be better to only display the user columns. */
902 dtuple_print(ef, entry);
903 }
904 fputs("\nBut in parent table ", ef);
905 ut_print_name(ef, trx, foreign->referenced_table_name);
906 fprintf(ef, ", in index %s,\n"
907 "the closest match we can find is record:\n",
908 foreign->referenced_index->name());
909 if (rec && page_rec_is_supremum(rec)) {
910 /* If the cursor ended on a supremum record, it is better
911 to report the previous record in the error message, so that
912 the user gets a more descriptive error message. */
913 rec = page_rec_get_prev_const(rec);
914 }
915
916 if (rec) {
917 rec_print(ef, rec, foreign->referenced_index);
918 }
919 putc('\n', ef);
920
921 mutex_exit(&dict_foreign_err_mutex);
922 }
923
924 /*********************************************************************//**
925 Invalidate the query cache for the given table. */
926 static
927 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)928 row_ins_invalidate_query_cache(
929 /*===========================*/
930 que_thr_t* thr, /*!< in: query thread whose run_node
931 is an update node */
932 const char* name) /*!< in: table name prefixed with
933 database name and a '/' character */
934 {
935 ulint len = strlen(name) + 1;
936 innobase_invalidate_query_cache(thr_get_trx(thr), name, len);
937 }
938
939 /** Fill virtual column information in cascade node for the child table.
940 @param[out] cascade child update node
941 @param[in] rec clustered rec of child table
942 @param[in] index clustered index of child table
943 @param[in] node parent update node
944 @param[in] foreign foreign key information
945 @param[out] err error code. */
946 static
947 void
row_ins_foreign_fill_virtual(upd_node_t * cascade,const rec_t * rec,dict_index_t * index,upd_node_t * node,dict_foreign_t * foreign,dberr_t * err)948 row_ins_foreign_fill_virtual(
949 upd_node_t* cascade,
950 const rec_t* rec,
951 dict_index_t* index,
952 upd_node_t* node,
953 dict_foreign_t* foreign,
954 dberr_t* err)
955 {
956 row_ext_t* ext;
957 THD* thd = current_thd;
958 ulint offsets_[REC_OFFS_NORMAL_SIZE];
959 rec_offs_init(offsets_);
960 const ulint* offsets =
961 rec_get_offsets(rec, index, offsets_,
962 ULINT_UNDEFINED, &cascade->heap);
963 mem_heap_t* v_heap = NULL;
964 upd_t* update = cascade->update;
965 ulint n_v_fld = index->table->n_v_def;
966 ulint n_diff;
967 upd_field_t* upd_field;
968 dict_vcol_set* v_cols = foreign->v_cols;
969
970 update->old_vrow = row_build(
971 ROW_COPY_POINTERS, index, rec,
972 offsets, index->table, NULL, NULL,
973 &ext, cascade->heap);
974
975 n_diff = update->n_fields;
976
977 update->n_fields += n_v_fld;
978
979 if (index->table->vc_templ == NULL) {
980 /** This can occur when there is a cascading
981 delete or update after restart. */
982 innobase_init_vc_templ(index->table);
983 }
984
985 for (ulint i = 0; i < n_v_fld; i++) {
986
987 dict_v_col_t* col = dict_table_get_nth_v_col(
988 index->table, i);
989
990 dict_vcol_set::iterator it = v_cols->find(col);
991
992 if (it == v_cols->end()) {
993 continue;
994 }
995
996 dfield_t* vfield = innobase_get_computed_value(
997 update->old_vrow, col, index,
998 &v_heap, update->heap, NULL, thd, NULL,
999 NULL, NULL, NULL);
1000
1001 if (vfield == NULL) {
1002 *err = DB_COMPUTE_VALUE_FAILED;
1003 goto func_exit;
1004 }
1005
1006 upd_field = upd_get_nth_field(update, n_diff);
1007
1008 upd_field->old_v_val = static_cast<dfield_t*>(
1009 mem_heap_alloc(cascade->heap,
1010 sizeof *upd_field->old_v_val));
1011
1012 dfield_copy(upd_field->old_v_val, vfield);
1013
1014 upd_field_set_v_field_no(upd_field, i, index);
1015
1016 if (node->is_delete
1017 ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1018 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1019
1020 dfield_set_null(&upd_field->new_val);
1021 }
1022
1023 if (!node->is_delete
1024 && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1025
1026 dfield_t* new_vfield = innobase_get_computed_value(
1027 update->old_vrow, col, index,
1028 &v_heap, update->heap, NULL, thd,
1029 NULL, NULL, node->update, foreign);
1030
1031 if (new_vfield == NULL) {
1032 *err = DB_COMPUTE_VALUE_FAILED;
1033 goto func_exit;
1034 }
1035
1036 dfield_copy(&(upd_field->new_val), new_vfield);
1037 }
1038
1039 n_diff++;
1040 }
1041
1042 update->n_fields = n_diff;
1043 *err = DB_SUCCESS;
1044
1045 func_exit:
1046 if (v_heap) {
1047 mem_heap_free(v_heap);
1048 }
1049 }
1050
1051 /*********************************************************************//**
1052 Perform referential actions or checks when a parent row is deleted or updated
1053 and the constraint had an ON DELETE or ON UPDATE condition which was not
1054 RESTRICT.
1055 @return DB_SUCCESS, DB_LOCK_WAIT, or error code */
1056 static MY_ATTRIBUTE((warn_unused_result))
1057 dberr_t
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)1058 row_ins_foreign_check_on_constraint(
1059 /*================================*/
1060 que_thr_t* thr, /*!< in: query thread whose run_node
1061 is an update node */
1062 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
1063 type is != 0 */
1064 btr_pcur_t* pcur, /*!< in: cursor placed on a matching
1065 index record in the child table */
1066 dtuple_t* entry, /*!< in: index entry in the parent
1067 table */
1068 mtr_t* mtr) /*!< in: mtr holding the latch of pcur
1069 page */
1070 {
1071 upd_node_t* node;
1072 upd_node_t* cascade;
1073 dict_table_t* table = foreign->foreign_table;
1074 dict_index_t* index;
1075 dict_index_t* clust_index;
1076 dtuple_t* ref;
1077 const rec_t* rec;
1078 const rec_t* clust_rec;
1079 const buf_block_t* clust_block;
1080 upd_t* update;
1081 ulint n_to_update;
1082 dberr_t err;
1083 ulint i;
1084 trx_t* trx;
1085 mem_heap_t* tmp_heap = NULL;
1086 doc_id_t doc_id = FTS_NULL_DOC_ID;
1087 ibool fts_col_affacted = FALSE;
1088
1089 DBUG_ENTER("row_ins_foreign_check_on_constraint");
1090 ut_a(thr);
1091 ut_a(foreign);
1092 ut_a(pcur);
1093 ut_a(mtr);
1094
1095 trx = thr_get_trx(thr);
1096
1097 /* Since we are going to delete or update a row, we have to invalidate
1098 the MySQL query cache for table. A deadlock of threads is not possible
1099 here because the caller of this function does not hold any latches with
1100 the mutex rank above the lock_sys_t::mutex. The query cache mutex
1101 has a rank just above the lock_sys_t::mutex. */
1102
1103 row_ins_invalidate_query_cache(thr, table->name.m_name);
1104
1105 node = static_cast<upd_node_t*>(thr->run_node);
1106
1107 if (node->is_delete && 0 == (foreign->type
1108 & (DICT_FOREIGN_ON_DELETE_CASCADE
1109 | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
1110
1111 row_ins_foreign_report_err("Trying to delete",
1112 thr, foreign,
1113 btr_pcur_get_rec(pcur), entry);
1114
1115 DBUG_RETURN(DB_ROW_IS_REFERENCED);
1116 }
1117
1118 if (!node->is_delete && 0 == (foreign->type
1119 & (DICT_FOREIGN_ON_UPDATE_CASCADE
1120 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
1121
1122 /* This is an UPDATE */
1123
1124 row_ins_foreign_report_err("Trying to update",
1125 thr, foreign,
1126 btr_pcur_get_rec(pcur), entry);
1127
1128 DBUG_RETURN(DB_ROW_IS_REFERENCED);
1129 }
1130
1131 if (node->cascade_node == NULL) {
1132 node->cascade_heap = mem_heap_create(128);
1133 node->cascade_node = row_create_update_node_for_mysql(
1134 table, node->cascade_heap);
1135 que_node_set_parent(node->cascade_node, node);
1136
1137 }
1138 cascade = node->cascade_node;
1139 cascade->table = table;
1140 cascade->foreign = foreign;
1141
1142 if (node->is_delete
1143 && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1144 cascade->is_delete = TRUE;
1145 } else {
1146 cascade->is_delete = FALSE;
1147
1148 if (foreign->n_fields > cascade->update_n_fields) {
1149 /* We have to make the update vector longer */
1150
1151 cascade->update = upd_create(foreign->n_fields,
1152 node->cascade_heap);
1153 cascade->update_n_fields = foreign->n_fields;
1154 }
1155 }
1156
1157 /* We do not allow cyclic cascaded updating (DELETE is allowed,
1158 but not UPDATE) of the same table, as this can lead to an infinite
1159 cycle. Check that we are not updating the same table which is
1160 already being modified in this cascade chain. We have to check
1161 this also because the modification of the indexes of a 'parent'
1162 table may still be incomplete, and we must avoid seeing the indexes
1163 of the parent table in an inconsistent state! */
1164
1165 if (!cascade->is_delete
1166 && row_ins_cascade_ancestor_updates_table(cascade, table)) {
1167
1168 /* We do not know if this would break foreign key
1169 constraints, but play safe and return an error */
1170
1171 err = DB_ROW_IS_REFERENCED;
1172
1173 row_ins_foreign_report_err(
1174 "Trying an update, possibly causing a cyclic"
1175 " cascaded update\n"
1176 "in the child table,", thr, foreign,
1177 btr_pcur_get_rec(pcur), entry);
1178
1179 goto nonstandard_exit_func;
1180 }
1181
1182 if (row_ins_cascade_n_ancestors(cascade) >= FK_MAX_CASCADE_DEL) {
1183 err = DB_FOREIGN_EXCEED_MAX_CASCADE;
1184
1185 row_ins_foreign_report_err(
1186 "Trying a too deep cascaded delete or update\n",
1187 thr, foreign, btr_pcur_get_rec(pcur), entry);
1188
1189 goto nonstandard_exit_func;
1190 }
1191
1192 index = btr_pcur_get_btr_cur(pcur)->index;
1193
1194 ut_a(index == foreign->foreign_index);
1195
1196 rec = btr_pcur_get_rec(pcur);
1197
1198 tmp_heap = mem_heap_create(256);
1199
1200 if (dict_index_is_clust(index)) {
1201 /* pcur is already positioned in the clustered index of
1202 the child table */
1203
1204 clust_index = index;
1205 clust_rec = rec;
1206 clust_block = btr_pcur_get_block(pcur);
1207 } else {
1208 /* We have to look for the record in the clustered index
1209 in the child table */
1210
1211 clust_index = dict_table_get_first_index(table);
1212
1213 ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
1214 tmp_heap);
1215 btr_pcur_open_with_no_init(clust_index, ref,
1216 PAGE_CUR_LE, BTR_SEARCH_LEAF,
1217 cascade->pcur, 0, mtr);
1218
1219 clust_rec = btr_pcur_get_rec(cascade->pcur);
1220 clust_block = btr_pcur_get_block(cascade->pcur);
1221
1222 if (!page_rec_is_user_rec(clust_rec)
1223 || btr_pcur_get_low_match(cascade->pcur)
1224 < dict_index_get_n_unique(clust_index)) {
1225
1226 ib::error() << "In cascade of a foreign key op index "
1227 << index->name
1228 << " of table " << index->table->name;
1229
1230 fputs("InnoDB: record ", stderr);
1231 rec_print(stderr, rec, index);
1232 fputs("\n"
1233 "InnoDB: clustered record ", stderr);
1234 rec_print(stderr, clust_rec, clust_index);
1235 fputs("\n"
1236 "InnoDB: Submit a detailed bug report to"
1237 " http://bugs.mysql.com\n", stderr);
1238 ut_ad(0);
1239 err = DB_SUCCESS;
1240
1241 goto nonstandard_exit_func;
1242 }
1243 }
1244
1245 /* Set an X-lock on the row to delete or update in the child table */
1246
1247 err = lock_table(0, table, LOCK_IX, thr);
1248
1249 if (err == DB_SUCCESS) {
1250 /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1251 we already have a normal shared lock on the appropriate
1252 gap if the search criterion was not unique */
1253
1254 err = lock_clust_rec_read_check_and_lock_alt(
1255 0, clust_block, clust_rec, clust_index,
1256 LOCK_X, LOCK_REC_NOT_GAP, thr);
1257 }
1258
1259 if (err != DB_SUCCESS) {
1260
1261 goto nonstandard_exit_func;
1262 }
1263
1264 if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1265 /* This can happen if there is a circular reference of
1266 rows such that cascading delete comes to delete a row
1267 already in the process of being delete marked */
1268 err = DB_SUCCESS;
1269
1270 goto nonstandard_exit_func;
1271 }
1272
1273
1274 if (table->fts) {
1275 doc_id = fts_get_doc_id_from_rec(table, clust_rec,
1276 clust_index, tmp_heap);
1277 }
1278
1279 if (cascade->is_delete && foreign->v_cols != NULL
1280 && foreign->v_cols->size() > 0
1281 && table->vc_templ == NULL) {
1282 innobase_init_vc_templ(table);
1283 }
1284 if (node->is_delete
1285 ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1286 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1287 /* Build the appropriate update vector which sets
1288 foreign->n_fields first fields in rec to SQL NULL */
1289
1290 update = cascade->update;
1291
1292 update->info_bits = 0;
1293 update->n_fields = foreign->n_fields;
1294 UNIV_MEM_INVALID(update->fields,
1295 update->n_fields * sizeof *update->fields);
1296
1297 for (i = 0; i < foreign->n_fields; i++) {
1298 upd_field_t* ufield = &update->fields[i];
1299 ulint col_no = dict_index_get_nth_col_no(
1300 index, i);
1301
1302 ufield->field_no = dict_table_get_nth_col_pos(
1303 table, col_no);
1304 dict_col_t* col = dict_table_get_nth_col(
1305 table, col_no);
1306 dict_col_copy_type(col, dfield_get_type(&ufield->new_val));
1307
1308 ufield->orig_len = 0;
1309 ufield->exp = NULL;
1310 dfield_set_null(&ufield->new_val);
1311
1312 if (table->fts && dict_table_is_fts_column(
1313 table->fts->indexes,
1314 dict_index_get_nth_col_no(index, i),
1315 dict_col_is_virtual(
1316 dict_index_get_nth_col(index, i)))
1317 != ULINT_UNDEFINED) {
1318 fts_col_affacted = TRUE;
1319 }
1320 }
1321
1322 if (fts_col_affacted) {
1323 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1324 }
1325
1326 if (foreign->v_cols != NULL
1327 && foreign->v_cols->size() > 0) {
1328 row_ins_foreign_fill_virtual(
1329 cascade, clust_rec, clust_index,
1330 node, foreign, &err);
1331
1332 if (err != DB_SUCCESS) {
1333 goto nonstandard_exit_func;
1334 }
1335 }
1336
1337 } else if (table->fts && cascade->is_delete) {
1338 /* DICT_FOREIGN_ON_DELETE_CASCADE case */
1339 for (i = 0; i < foreign->n_fields; i++) {
1340 if (table->fts && dict_table_is_fts_column(
1341 table->fts->indexes,
1342 dict_index_get_nth_col_no(index, i),
1343 dict_col_is_virtual(
1344 dict_index_get_nth_col(index, i)))
1345 != ULINT_UNDEFINED) {
1346 fts_col_affacted = TRUE;
1347 }
1348 }
1349
1350 if (fts_col_affacted) {
1351 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1352 }
1353 }
1354
1355 if (!node->is_delete
1356 && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1357
1358 /* Build the appropriate update vector which sets changing
1359 foreign->n_fields first fields in rec to new values */
1360
1361 n_to_update = row_ins_cascade_calc_update_vec(
1362 node, foreign, tmp_heap,
1363 trx, &fts_col_affacted);
1364
1365
1366 if (foreign->v_cols != NULL
1367 && foreign->v_cols->size() > 0) {
1368 row_ins_foreign_fill_virtual(
1369 cascade, clust_rec, clust_index,
1370 node, foreign, &err);
1371
1372 if (err != DB_SUCCESS) {
1373 goto nonstandard_exit_func;
1374 }
1375 }
1376
1377 if (n_to_update == ULINT_UNDEFINED) {
1378 err = DB_ROW_IS_REFERENCED;
1379
1380 row_ins_foreign_report_err(
1381 "Trying a cascaded update where the"
1382 " updated value in the child\n"
1383 "table would not fit in the length"
1384 " of the column, or the value would\n"
1385 "be NULL and the column is"
1386 " declared as not NULL in the child table,",
1387 thr, foreign, btr_pcur_get_rec(pcur), entry);
1388
1389 goto nonstandard_exit_func;
1390 }
1391
1392 if (cascade->update->n_fields == 0) {
1393
1394 /* The update does not change any columns referred
1395 to in this foreign key constraint: no need to do
1396 anything */
1397
1398 err = DB_SUCCESS;
1399
1400 goto nonstandard_exit_func;
1401 }
1402
1403 /* Mark the old Doc ID as deleted */
1404 if (fts_col_affacted) {
1405 ut_ad(table->fts);
1406 fts_trx_add_op(trx, table, doc_id, FTS_DELETE, NULL);
1407 }
1408 }
1409
1410 /* Store pcur position and initialize or store the cascade node
1411 pcur stored position */
1412
1413 btr_pcur_store_position(pcur, mtr);
1414
1415 if (index == clust_index) {
1416 btr_pcur_copy_stored_position(cascade->pcur, pcur);
1417 } else {
1418 btr_pcur_store_position(cascade->pcur, mtr);
1419 }
1420
1421 mtr_commit(mtr);
1422
1423 ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1424
1425 cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1426
1427 err = row_update_cascade_for_mysql(thr, cascade,
1428 foreign->foreign_table);
1429
1430 /* Release the data dictionary latch for a while, so that we do not
1431 starve other threads from doing CREATE TABLE etc. if we have a huge
1432 cascaded operation running. The counter n_foreign_key_checks_running
1433 will prevent other users from dropping or ALTERing the table when we
1434 release the latch. */
1435
1436 row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1437
1438 DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1439
1440 row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1441
1442 mtr_start(mtr);
1443
1444 /* Restore pcur position */
1445
1446 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1447
1448 if (tmp_heap) {
1449 mem_heap_free(tmp_heap);
1450 }
1451
1452 DBUG_RETURN(err);
1453
1454 nonstandard_exit_func:
1455
1456 if (tmp_heap) {
1457 mem_heap_free(tmp_heap);
1458 }
1459
1460 btr_pcur_store_position(pcur, mtr);
1461
1462 mtr_commit(mtr);
1463 mtr_start(mtr);
1464
1465 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1466
1467 DBUG_RETURN(err);
1468 }
1469
1470 /*********************************************************************//**
1471 Sets a shared lock on a record. Used in locking possible duplicate key
1472 records and also in checking foreign key constraints.
1473 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1474 static
1475 dberr_t
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1476 row_ins_set_shared_rec_lock(
1477 /*========================*/
1478 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1479 LOCK_REC_NOT_GAP type lock */
1480 const buf_block_t* block, /*!< in: buffer block of rec */
1481 const rec_t* rec, /*!< in: record */
1482 dict_index_t* index, /*!< in: index */
1483 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1484 que_thr_t* thr) /*!< in: query thread */
1485 {
1486 dberr_t err;
1487
1488 ut_ad(rec_offs_validate(rec, index, offsets));
1489
1490 if (dict_index_is_clust(index)) {
1491 err = lock_clust_rec_read_check_and_lock(
1492 0, block, rec, index, offsets, LOCK_S, type, thr);
1493 } else {
1494 err = lock_sec_rec_read_check_and_lock(
1495 0, block, rec, index, offsets, LOCK_S, type, thr);
1496 }
1497
1498 return(err);
1499 }
1500
1501 /*********************************************************************//**
1502 Sets a exclusive lock on a record. Used in locking possible duplicate key
1503 records
1504 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1505 static
1506 dberr_t
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1507 row_ins_set_exclusive_rec_lock(
1508 /*===========================*/
1509 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1510 LOCK_REC_NOT_GAP type lock */
1511 const buf_block_t* block, /*!< in: buffer block of rec */
1512 const rec_t* rec, /*!< in: record */
1513 dict_index_t* index, /*!< in: index */
1514 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1515 que_thr_t* thr) /*!< in: query thread */
1516 {
1517 dberr_t err;
1518
1519 ut_ad(rec_offs_validate(rec, index, offsets));
1520
1521 if (dict_index_is_clust(index)) {
1522 err = lock_clust_rec_read_check_and_lock(
1523 0, block, rec, index, offsets, LOCK_X, type, thr);
1524 } else {
1525 err = lock_sec_rec_read_check_and_lock(
1526 0, block, rec, index, offsets, LOCK_X, type, thr);
1527 }
1528
1529 return(err);
1530 }
1531
1532 /* Decrement a counter in the destructor. */
1533 class ib_dec_in_dtor {
1534 public:
ib_dec_in_dtor(ulint & c)1535 ib_dec_in_dtor(ulint& c): counter(c) {}
~ib_dec_in_dtor()1536 ~ib_dec_in_dtor() {
1537 os_atomic_decrement_ulint(&counter, 1);
1538 }
1539 private:
1540 ulint& counter;
1541 };
1542
1543 /***************************************************************//**
1544 Checks if foreign key constraint fails for an index entry. Sets shared locks
1545 which lock either the success or the failure of the constraint. NOTE that
1546 the caller must have a shared latch on dict_operation_lock.
1547 @return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1548 dberr_t
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1549 row_ins_check_foreign_constraint(
1550 /*=============================*/
1551 ibool check_ref,/*!< in: TRUE if we want to check that
1552 the referenced table is ok, FALSE if we
1553 want to check the foreign key table */
1554 dict_foreign_t* foreign,/*!< in: foreign constraint; NOTE that the
1555 tables mentioned in it must be in the
1556 dictionary cache if they exist at all */
1557 dict_table_t* table, /*!< in: if check_ref is TRUE, then the foreign
1558 table, else the referenced table */
1559 dtuple_t* entry, /*!< in: index entry for index */
1560 que_thr_t* thr) /*!< in: query thread */
1561 {
1562 dberr_t err;
1563 upd_node_t* upd_node;
1564 dict_table_t* check_table;
1565 dict_index_t* check_index;
1566 ulint n_fields_cmp;
1567 btr_pcur_t pcur;
1568 int cmp;
1569 mtr_t mtr;
1570 trx_t* trx = thr_get_trx(thr);
1571 mem_heap_t* heap = NULL;
1572 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1573 ulint* offsets = offsets_;
1574
1575 bool skip_gap_lock;
1576
1577 skip_gap_lock = (trx->isolation_level <= TRX_ISO_READ_COMMITTED);
1578
1579 DBUG_ENTER("row_ins_check_foreign_constraint");
1580
1581 rec_offs_init(offsets_);
1582
1583 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_S));
1584
1585 err = DB_SUCCESS;
1586
1587 if (trx->check_foreigns == FALSE) {
1588 /* The user has suppressed foreign key checks currently for
1589 this session */
1590 goto exit_func;
1591 }
1592
1593 /* If any of the foreign key fields in entry is SQL NULL, we
1594 suppress the foreign key check: this is compatible with Oracle,
1595 for example */
1596 for (ulint i = 0; i < foreign->n_fields; i++) {
1597 if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1598 goto exit_func;
1599 }
1600 }
1601
1602 if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1603 upd_node = static_cast<upd_node_t*>(thr->run_node);
1604
1605 if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1606 /* If a cascaded update is done as defined by a
1607 foreign key constraint, do not check that
1608 constraint for the child row. In ON UPDATE CASCADE
1609 the update of the parent row is only half done when
1610 we come here: if we would check the constraint here
1611 for the child row it would fail.
1612
1613 A QUESTION remains: if in the child table there are
1614 several constraints which refer to the same parent
1615 table, we should merge all updates to the child as
1616 one update? And the updates can be contradictory!
1617 Currently we just perform the update associated
1618 with each foreign key constraint, one after
1619 another, and the user has problems predicting in
1620 which order they are performed. */
1621
1622 goto exit_func;
1623 }
1624 }
1625
1626 if (check_ref) {
1627 check_table = foreign->referenced_table;
1628 check_index = foreign->referenced_index;
1629 } else {
1630 check_table = foreign->foreign_table;
1631 check_index = foreign->foreign_index;
1632 }
1633
1634 if (check_table == NULL
1635 || check_table->ibd_file_missing
1636 || check_index == NULL
1637 || fil_space_is_being_truncated(check_table->space)) {
1638
1639 if (!srv_read_only_mode && check_ref) {
1640 FILE* ef = dict_foreign_err_file;
1641
1642 row_ins_set_detailed(trx, foreign);
1643
1644 row_ins_foreign_trx_print(trx);
1645
1646 fputs("Foreign key constraint fails for table ", ef);
1647 ut_print_name(ef, trx,
1648 foreign->foreign_table_name);
1649 fputs(":\n", ef);
1650 dict_print_info_on_foreign_key_in_create_format(
1651 ef, trx, foreign, TRUE);
1652 fprintf(ef, "\nTrying to add to index %s tuple:\n",
1653 foreign->foreign_index->name());
1654 dtuple_print(ef, entry);
1655 fputs("\nBut the parent table ", ef);
1656 ut_print_name(ef, trx,
1657 foreign->referenced_table_name);
1658 fputs("\nor its .ibd file does"
1659 " not currently exist!, or"
1660 " is undergoing truncate!\n", ef);
1661 mutex_exit(&dict_foreign_err_mutex);
1662
1663 err = DB_NO_REFERENCED_ROW;
1664 }
1665
1666 goto exit_func;
1667 }
1668
1669 if (check_table != table) {
1670 /* We already have a LOCK_IX on table, but not necessarily
1671 on check_table */
1672
1673 err = lock_table(0, check_table, LOCK_IS, thr);
1674
1675 if (err != DB_SUCCESS) {
1676
1677 goto do_possible_lock_wait;
1678 }
1679 }
1680
1681 mtr_start(&mtr);
1682
1683 /* Store old value on n_fields_cmp */
1684
1685 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1686
1687 dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1688
1689 btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1690 BTR_SEARCH_LEAF, &pcur, &mtr);
1691
1692 /* Scan index records and check if there is a matching record */
1693
1694 do {
1695 const rec_t* rec = btr_pcur_get_rec(&pcur);
1696 const buf_block_t* block = btr_pcur_get_block(&pcur);
1697
1698 if (page_rec_is_infimum(rec)) {
1699
1700 continue;
1701 }
1702
1703 offsets = rec_get_offsets(rec, check_index,
1704 offsets, ULINT_UNDEFINED, &heap);
1705
1706 if (page_rec_is_supremum(rec)) {
1707
1708 if (skip_gap_lock) {
1709
1710 continue;
1711 }
1712
1713 err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1714 rec, check_index,
1715 offsets, thr);
1716 switch (err) {
1717 case DB_SUCCESS_LOCKED_REC:
1718 case DB_SUCCESS:
1719 continue;
1720 default:
1721 goto end_scan;
1722 }
1723 }
1724
1725 cmp = cmp_dtuple_rec(entry, rec, offsets);
1726
1727 if (cmp == 0) {
1728
1729 ulint lock_type;
1730
1731 lock_type = skip_gap_lock
1732 ? LOCK_REC_NOT_GAP
1733 : LOCK_ORDINARY;
1734
1735 if (rec_get_deleted_flag(rec,
1736 rec_offs_comp(offsets))) {
1737 err = row_ins_set_shared_rec_lock(
1738 lock_type, block,
1739 rec, check_index, offsets, thr);
1740 switch (err) {
1741 case DB_SUCCESS_LOCKED_REC:
1742 case DB_SUCCESS:
1743 break;
1744 default:
1745 goto end_scan;
1746 }
1747 } else {
1748 /* Found a matching record. Lock only
1749 a record because we can allow inserts
1750 into gaps */
1751
1752 err = row_ins_set_shared_rec_lock(
1753 LOCK_REC_NOT_GAP, block,
1754 rec, check_index, offsets, thr);
1755
1756 switch (err) {
1757 case DB_SUCCESS_LOCKED_REC:
1758 case DB_SUCCESS:
1759 break;
1760 default:
1761 goto end_scan;
1762 }
1763
1764 if (check_ref) {
1765 err = DB_SUCCESS;
1766
1767 goto end_scan;
1768 } else if (foreign->type != 0) {
1769 /* There is an ON UPDATE or ON DELETE
1770 condition: check them in a separate
1771 function */
1772
1773 err = row_ins_foreign_check_on_constraint(
1774 thr, foreign, &pcur, entry,
1775 &mtr);
1776 if (err != DB_SUCCESS) {
1777 /* Since reporting a plain
1778 "duplicate key" error
1779 message to the user in
1780 cases where a long CASCADE
1781 operation would lead to a
1782 duplicate key in some
1783 other table is very
1784 confusing, map duplicate
1785 key errors resulting from
1786 FK constraints to a
1787 separate error code. */
1788
1789 if (err == DB_DUPLICATE_KEY) {
1790 err = DB_FOREIGN_DUPLICATE_KEY;
1791 }
1792
1793 goto end_scan;
1794 }
1795
1796 /* row_ins_foreign_check_on_constraint
1797 may have repositioned pcur on a
1798 different block */
1799 block = btr_pcur_get_block(&pcur);
1800 } else {
1801 row_ins_foreign_report_err(
1802 "Trying to delete or update",
1803 thr, foreign, rec, entry);
1804
1805 err = DB_ROW_IS_REFERENCED;
1806 goto end_scan;
1807 }
1808 }
1809 } else {
1810 ut_a(cmp < 0);
1811
1812 err = DB_SUCCESS;
1813
1814 if (!skip_gap_lock) {
1815 err = row_ins_set_shared_rec_lock(
1816 LOCK_GAP, block,
1817 rec, check_index, offsets, thr);
1818 }
1819
1820 switch (err) {
1821 case DB_SUCCESS_LOCKED_REC:
1822 case DB_SUCCESS:
1823 if (check_ref) {
1824 err = DB_NO_REFERENCED_ROW;
1825 row_ins_foreign_report_add_err(
1826 trx, foreign, rec, entry);
1827 } else {
1828 err = DB_SUCCESS;
1829 }
1830 default:
1831 break;
1832 }
1833
1834 goto end_scan;
1835 }
1836 } while (btr_pcur_move_to_next(&pcur, &mtr));
1837
1838 if (check_ref) {
1839 row_ins_foreign_report_add_err(
1840 trx, foreign, btr_pcur_get_rec(&pcur), entry);
1841 err = DB_NO_REFERENCED_ROW;
1842 } else {
1843 err = DB_SUCCESS;
1844 }
1845
1846 end_scan:
1847 btr_pcur_close(&pcur);
1848
1849 mtr_commit(&mtr);
1850
1851 /* Restore old value */
1852 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1853
1854 do_possible_lock_wait:
1855 if (err == DB_LOCK_WAIT) {
1856 /* An object that will correctly decrement the FK check counter
1857 when it goes out of this scope. */
1858 ib_dec_in_dtor dec(check_table->n_foreign_key_checks_running);
1859
1860 trx->error_state = err;
1861
1862 que_thr_stop_for_mysql(thr);
1863
1864 thr->lock_state = QUE_THR_LOCK_ROW;
1865
1866 /* To avoid check_table being dropped, increment counter */
1867 os_atomic_increment_ulint(
1868 &check_table->n_foreign_key_checks_running, 1);
1869
1870 trx_kill_blocking(trx);
1871
1872 lock_wait_suspend_thread(thr);
1873
1874 thr->lock_state = QUE_THR_LOCK_NOLOCK;
1875
1876 if(trx->error_state != DB_SUCCESS) {
1877 err = trx->error_state;
1878 goto exit_func;
1879 }
1880
1881 DBUG_PRINT("to_be_dropped",
1882 ("table: %s", check_table->name.m_name));
1883 if (check_table->to_be_dropped) {
1884 /* The table is being dropped. We shall timeout
1885 this operation */
1886 err = DB_LOCK_WAIT_TIMEOUT;
1887
1888 goto exit_func;
1889 }
1890 }
1891
1892
1893 exit_func:
1894 if (heap != NULL) {
1895 mem_heap_free(heap);
1896 }
1897
1898 DEBUG_SYNC_C("finished_scanning_index");
1899 DBUG_RETURN(err);
1900 }
1901
1902 /***************************************************************//**
1903 Checks if foreign key constraints fail for an index entry. If index
1904 is not mentioned in any constraint, this function does nothing,
1905 Otherwise does searches to the indexes of referenced tables and
1906 sets shared locks which lock either the success or the failure of
1907 a constraint.
1908 @return DB_SUCCESS or error code */
1909 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1910 dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1911 row_ins_check_foreign_constraints(
1912 /*==============================*/
1913 dict_table_t* table, /*!< in: table */
1914 dict_index_t* index, /*!< in: index */
1915 dtuple_t* entry, /*!< in: index entry for index */
1916 que_thr_t* thr) /*!< in: query thread */
1917 {
1918 dict_foreign_t* foreign;
1919 dberr_t err;
1920 trx_t* trx;
1921 ibool got_s_lock = FALSE;
1922
1923 trx = thr_get_trx(thr);
1924
1925 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1926 "foreign_constraint_check_for_ins");
1927
1928 for (dict_foreign_set::iterator it = table->foreign_set.begin();
1929 it != table->foreign_set.end();
1930 ++it) {
1931
1932 foreign = *it;
1933
1934 if (foreign->foreign_index == index) {
1935 dict_table_t* ref_table = NULL;
1936 dict_table_t* foreign_table = foreign->foreign_table;
1937 dict_table_t* referenced_table
1938 = foreign->referenced_table;
1939
1940 if (referenced_table == NULL) {
1941
1942 ref_table = dict_table_open_on_name(
1943 foreign->referenced_table_name_lookup,
1944 FALSE, FALSE, DICT_ERR_IGNORE_NONE);
1945 }
1946
1947 if (0 == trx->dict_operation_lock_mode) {
1948 got_s_lock = TRUE;
1949
1950 row_mysql_freeze_data_dictionary(trx);
1951 }
1952
1953 if (referenced_table) {
1954 os_atomic_increment_ulint(
1955 &foreign_table->n_foreign_key_checks_running, 1);
1956 }
1957
1958 /* NOTE that if the thread ends up waiting for a lock
1959 we will release dict_operation_lock temporarily!
1960 But the counter on the table protects the referenced
1961 table from being dropped while the check is running. */
1962
1963 err = row_ins_check_foreign_constraint(
1964 TRUE, foreign, table, entry, thr);
1965
1966 if (referenced_table) {
1967 os_atomic_decrement_ulint(
1968 &foreign_table->n_foreign_key_checks_running, 1);
1969 }
1970
1971 if (got_s_lock) {
1972 row_mysql_unfreeze_data_dictionary(trx);
1973 }
1974
1975 if (ref_table != NULL) {
1976 dict_table_close(ref_table, FALSE, FALSE);
1977 }
1978
1979 if (err != DB_SUCCESS) {
1980
1981 return(err);
1982 }
1983 }
1984 }
1985
1986 return(DB_SUCCESS);
1987 }
1988
1989 /***************************************************************//**
1990 Checks if a unique key violation to rec would occur at the index entry
1991 insert.
1992 @return TRUE if error */
1993 static
1994 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1995 row_ins_dupl_error_with_rec(
1996 /*========================*/
1997 const rec_t* rec, /*!< in: user record; NOTE that we assume
1998 that the caller already has a record lock on
1999 the record! */
2000 const dtuple_t* entry, /*!< in: entry to insert */
2001 dict_index_t* index, /*!< in: index */
2002 const ulint* offsets)/*!< in: rec_get_offsets(rec, index) */
2003 {
2004 ulint matched_fields;
2005 ulint n_unique;
2006 ulint i;
2007
2008 ut_ad(rec_offs_validate(rec, index, offsets));
2009
2010 n_unique = dict_index_get_n_unique(index);
2011
2012 matched_fields = 0;
2013
2014 cmp_dtuple_rec_with_match(entry, rec, offsets, &matched_fields);
2015
2016 if (matched_fields < n_unique) {
2017
2018 return(FALSE);
2019 }
2020
2021 /* In a unique secondary index we allow equal key values if they
2022 contain SQL NULLs */
2023
2024 if (!dict_index_is_clust(index) && !index->nulls_equal) {
2025
2026 for (i = 0; i < n_unique; i++) {
2027 if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
2028
2029 return(FALSE);
2030 }
2031 }
2032 }
2033
2034 return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2035 }
2036
2037 /***************************************************************//**
2038 Scans a unique non-clustered index at a given index entry to determine
2039 whether a uniqueness violation has occurred for the key value of the entry.
2040 Set shared locks on possible duplicate records.
2041 @return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
2042 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2043 dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)2044 row_ins_scan_sec_index_for_duplicate(
2045 /*=================================*/
2046 ulint flags, /*!< in: undo logging and locking flags */
2047 dict_index_t* index, /*!< in: non-clustered unique index */
2048 dtuple_t* entry, /*!< in: index entry */
2049 que_thr_t* thr, /*!< in: query thread */
2050 bool s_latch,/*!< in: whether index->lock is being held */
2051 mtr_t* mtr, /*!< in/out: mini-transaction */
2052 mem_heap_t* offsets_heap)
2053 /*!< in/out: memory heap that can be emptied */
2054 {
2055 ulint n_unique;
2056 int cmp;
2057 ulint n_fields_cmp;
2058 btr_pcur_t pcur;
2059 dberr_t err = DB_SUCCESS;
2060 ulint allow_duplicates;
2061 ulint* offsets = NULL;
2062 DBUG_ENTER("row_ins_scan_sec_index_for_duplicate");
2063
2064
2065 ut_ad(s_latch == rw_lock_own_flagged(
2066 &index->lock, RW_LOCK_FLAG_S | RW_LOCK_FLAG_SX));
2067
2068 n_unique = dict_index_get_n_unique(index);
2069
2070 /* If the secondary index is unique, but one of the fields in the
2071 n_unique first fields is NULL, a unique key violation cannot occur,
2072 since we define NULL != NULL in this case */
2073
2074 if (!index->nulls_equal) {
2075 for (ulint i = 0; i < n_unique; i++) {
2076 if (UNIV_SQL_NULL == dfield_get_len(
2077 dtuple_get_nth_field(entry, i))) {
2078
2079 DBUG_RETURN(DB_SUCCESS);
2080 }
2081 }
2082 }
2083
2084 /* Store old value on n_fields_cmp */
2085
2086 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
2087
2088 dtuple_set_n_fields_cmp(entry, n_unique);
2089
2090 btr_pcur_open(index, entry, PAGE_CUR_GE,
2091 s_latch
2092 ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED
2093 : BTR_SEARCH_LEAF,
2094 &pcur, mtr);
2095
2096 allow_duplicates = thr_get_trx(thr)->duplicates;
2097
2098 /* Scan index records and check if there is a duplicate */
2099
2100 do {
2101 const rec_t* rec = btr_pcur_get_rec(&pcur);
2102 const buf_block_t* block = btr_pcur_get_block(&pcur);
2103 const ulint lock_type = LOCK_ORDINARY;
2104
2105 if (page_rec_is_infimum(rec)) {
2106
2107 continue;
2108 }
2109
2110 offsets = rec_get_offsets(rec, index, offsets,
2111 ULINT_UNDEFINED, &offsets_heap);
2112
2113 if (flags & BTR_NO_LOCKING_FLAG) {
2114 /* Set no locks when applying log
2115 in online table rebuild. */
2116 } else if (allow_duplicates) {
2117
2118 /* If the SQL-query will update or replace
2119 duplicate key we will take X-lock for
2120 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2121 INSERT ON DUPLICATE KEY UPDATE). */
2122
2123 err = row_ins_set_exclusive_rec_lock(
2124 lock_type, block, rec, index, offsets, thr);
2125 } else {
2126
2127 err = row_ins_set_shared_rec_lock(
2128 lock_type, block, rec, index, offsets, thr);
2129 }
2130
2131 switch (err) {
2132 case DB_SUCCESS_LOCKED_REC:
2133 err = DB_SUCCESS;
2134 case DB_SUCCESS:
2135 break;
2136 default:
2137 goto end_scan;
2138 }
2139
2140 if (page_rec_is_supremum(rec)) {
2141
2142 continue;
2143 }
2144
2145 cmp = cmp_dtuple_rec(entry, rec, offsets);
2146
2147 if (cmp == 0 && !index->allow_duplicates) {
2148 if (row_ins_dupl_error_with_rec(rec, entry,
2149 index, offsets)) {
2150 err = DB_DUPLICATE_KEY;
2151
2152 thr_get_trx(thr)->error_index = index;
2153
2154 /* If the duplicate is on hidden FTS_DOC_ID,
2155 state so in the error log */
2156 if (index == index->table->fts_doc_id_index
2157 && DICT_TF2_FLAG_IS_SET(
2158 index->table,
2159 DICT_TF2_FTS_HAS_DOC_ID)) {
2160
2161 ib::error() << "Duplicate FTS_DOC_ID"
2162 " value on table "
2163 << index->table->name;
2164 }
2165
2166 goto end_scan;
2167 }
2168 } else {
2169 ut_a(cmp < 0 || index->allow_duplicates);
2170 goto end_scan;
2171 }
2172 } while (btr_pcur_move_to_next(&pcur, mtr));
2173
2174 end_scan:
2175 /* Restore old value */
2176 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2177
2178 DBUG_RETURN(err);
2179 }
2180
2181 /** Checks for a duplicate when the table is being rebuilt online.
2182 @retval DB_SUCCESS when no duplicate is detected
2183 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2184 a newer version of entry (the entry should not be inserted)
2185 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2186 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2187 dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,ulint * offsets)2188 row_ins_duplicate_online(
2189 /*=====================*/
2190 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2191 const dtuple_t* entry, /*!< in: entry that is being inserted */
2192 const rec_t* rec, /*!< in: clustered index record */
2193 ulint* offsets)/*!< in/out: rec_get_offsets(rec) */
2194 {
2195 ulint fields = 0;
2196
2197 /* During rebuild, there should not be any delete-marked rows
2198 in the new table. */
2199 ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2200 ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2201
2202 /* Compare the PRIMARY KEY fields and the
2203 DB_TRX_ID, DB_ROLL_PTR. */
2204 cmp_dtuple_rec_with_match_low(
2205 entry, rec, offsets, n_uniq + 2, &fields);
2206
2207 if (fields < n_uniq) {
2208 /* Not a duplicate. */
2209 return(DB_SUCCESS);
2210 }
2211
2212 if (fields == n_uniq + 2) {
2213 /* rec is an exact match of entry. */
2214 return(DB_SUCCESS_LOCKED_REC);
2215 }
2216
2217 return(DB_DUPLICATE_KEY);
2218 }
2219
2220 /** Checks for a duplicate when the table is being rebuilt online.
2221 @retval DB_SUCCESS when no duplicate is detected
2222 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2223 a newer version of entry (the entry should not be inserted)
2224 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2225 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2226 dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2227 row_ins_duplicate_error_in_clust_online(
2228 /*====================================*/
2229 ulint n_uniq, /*!< in: offset of DB_TRX_ID */
2230 const dtuple_t* entry, /*!< in: entry that is being inserted */
2231 const btr_cur_t*cursor, /*!< in: cursor on insert position */
2232 ulint** offsets,/*!< in/out: rec_get_offsets(rec) */
2233 mem_heap_t** heap) /*!< in/out: heap for offsets */
2234 {
2235 dberr_t err = DB_SUCCESS;
2236 const rec_t* rec = btr_cur_get_rec(cursor);
2237
2238 if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2239 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2240 ULINT_UNDEFINED, heap);
2241 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2242 if (err != DB_SUCCESS) {
2243 return(err);
2244 }
2245 }
2246
2247 rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2248
2249 if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2250 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2251 ULINT_UNDEFINED, heap);
2252 err = row_ins_duplicate_online(n_uniq, entry, rec, *offsets);
2253 }
2254
2255 return(err);
2256 }
2257
2258 /***************************************************************//**
2259 Checks if a unique key violation error would occur at an index entry
2260 insert. Sets shared locks on possible duplicate records. Works only
2261 for a clustered index!
2262 @retval DB_SUCCESS if no error
2263 @retval DB_DUPLICATE_KEY if error,
2264 @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2265 record
2266 @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2267 in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2268 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2269 dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2270 row_ins_duplicate_error_in_clust(
2271 /*=============================*/
2272 ulint flags, /*!< in: undo logging and locking flags */
2273 btr_cur_t* cursor, /*!< in: B-tree cursor */
2274 const dtuple_t* entry, /*!< in: entry to insert */
2275 que_thr_t* thr, /*!< in: query thread */
2276 mtr_t* mtr) /*!< in: mtr */
2277 {
2278 dberr_t err;
2279 rec_t* rec;
2280 ulint n_unique;
2281 trx_t* trx = thr_get_trx(thr);
2282 mem_heap_t*heap = NULL;
2283 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2284 ulint* offsets = offsets_;
2285 rec_offs_init(offsets_);
2286
2287 UT_NOT_USED(mtr);
2288
2289 ut_ad(dict_index_is_clust(cursor->index));
2290
2291 /* NOTE: For unique non-clustered indexes there may be any number
2292 of delete marked records with the same value for the non-clustered
2293 index key (remember multiversioning), and which differ only in
2294 the row refererence part of the index record, containing the
2295 clustered index key fields. For such a secondary index record,
2296 to avoid race condition, we must FIRST do the insertion and after
2297 that check that the uniqueness condition is not breached! */
2298
2299 /* NOTE: A problem is that in the B-tree node pointers on an
2300 upper level may match more to the entry than the actual existing
2301 user records on the leaf level. So, even if low_match would suggest
2302 that a duplicate key violation may occur, this may not be the case. */
2303
2304 n_unique = dict_index_get_n_unique(cursor->index);
2305
2306 if (cursor->low_match >= n_unique) {
2307
2308 rec = btr_cur_get_rec(cursor);
2309
2310 if (!page_rec_is_infimum(rec)) {
2311 offsets = rec_get_offsets(rec, cursor->index, offsets,
2312 ULINT_UNDEFINED, &heap);
2313
2314 /* We set a lock on the possible duplicate: this
2315 is needed in logical logging of MySQL to make
2316 sure that in roll-forward we get the same duplicate
2317 errors as in original execution */
2318
2319 if (flags & BTR_NO_LOCKING_FLAG) {
2320 /* Do nothing if no-locking is set */
2321 err = DB_SUCCESS;
2322 } else if (trx->duplicates) {
2323
2324 /* If the SQL-query will update or replace
2325 duplicate key we will take X-lock for
2326 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2327 INSERT ON DUPLICATE KEY UPDATE). */
2328
2329 err = row_ins_set_exclusive_rec_lock(
2330 LOCK_REC_NOT_GAP,
2331 btr_cur_get_block(cursor),
2332 rec, cursor->index, offsets, thr);
2333 } else {
2334
2335 err = row_ins_set_shared_rec_lock(
2336 LOCK_REC_NOT_GAP,
2337 btr_cur_get_block(cursor), rec,
2338 cursor->index, offsets, thr);
2339 }
2340
2341 switch (err) {
2342 case DB_SUCCESS_LOCKED_REC:
2343 case DB_SUCCESS:
2344 break;
2345 default:
2346 goto func_exit;
2347 }
2348
2349 if (row_ins_dupl_error_with_rec(
2350 rec, entry, cursor->index, offsets)) {
2351 duplicate:
2352 trx->error_index = cursor->index;
2353 err = DB_DUPLICATE_KEY;
2354 goto func_exit;
2355 }
2356 }
2357 }
2358
2359 if (cursor->up_match >= n_unique) {
2360
2361 rec = page_rec_get_next(btr_cur_get_rec(cursor));
2362
2363 if (!page_rec_is_supremum(rec)) {
2364 offsets = rec_get_offsets(rec, cursor->index, offsets,
2365 ULINT_UNDEFINED, &heap);
2366
2367 if (trx->duplicates) {
2368
2369 /* If the SQL-query will update or replace
2370 duplicate key we will take X-lock for
2371 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2372 INSERT ON DUPLICATE KEY UPDATE). */
2373
2374 err = row_ins_set_exclusive_rec_lock(
2375 LOCK_REC_NOT_GAP,
2376 btr_cur_get_block(cursor),
2377 rec, cursor->index, offsets, thr);
2378 } else {
2379
2380 err = row_ins_set_shared_rec_lock(
2381 LOCK_REC_NOT_GAP,
2382 btr_cur_get_block(cursor),
2383 rec, cursor->index, offsets, thr);
2384 }
2385
2386 switch (err) {
2387 case DB_SUCCESS_LOCKED_REC:
2388 case DB_SUCCESS:
2389 break;
2390 default:
2391 goto func_exit;
2392 }
2393
2394 if (row_ins_dupl_error_with_rec(
2395 rec, entry, cursor->index, offsets)) {
2396 goto duplicate;
2397 }
2398 }
2399
2400 /* This should never happen */
2401 ut_error;
2402 }
2403
2404 err = DB_SUCCESS;
2405 func_exit:
2406 if (UNIV_LIKELY_NULL(heap)) {
2407 mem_heap_free(heap);
2408 }
2409 return(err);
2410 }
2411
2412 /***************************************************************//**
2413 Checks if an index entry has long enough common prefix with an
2414 existing record so that the intended insert of the entry must be
2415 changed to a modify of the existing record. In the case of a clustered
2416 index, the prefix must be n_unique fields long. In the case of a
2417 secondary index, all fields must be equal. InnoDB never updates
2418 secondary index records in place, other than clearing or setting the
2419 delete-mark flag. We could be able to update the non-unique fields
2420 of a unique secondary index record by checking the cursor->up_match,
2421 but we do not do so, because it could have some locking implications.
2422 @return TRUE if the existing record should be updated; FALSE if not */
2423 UNIV_INLINE
2424 ibool
row_ins_must_modify_rec(const btr_cur_t * cursor)2425 row_ins_must_modify_rec(
2426 /*====================*/
2427 const btr_cur_t* cursor) /*!< in: B-tree cursor */
2428 {
2429 /* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2430 Because node pointers on upper levels of the B-tree may match more
2431 to entry than to actual user records on the leaf level, we
2432 have to check if the candidate record is actually a user record.
2433 A clustered index node pointer contains index->n_unique first fields,
2434 and a secondary index node pointer contains all index fields. */
2435
2436 return(cursor->low_match
2437 >= dict_index_get_n_unique_in_tree(cursor->index)
2438 && !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2439 }
2440
2441 /***************************************************************//**
2442 Tries to insert an entry into a clustered index, ignoring foreign key
2443 constraints. If a record with the same unique key is found, the other
2444 record is necessarily marked deleted by a committed transaction, or a
2445 unique key violation error occurs. The delete marked record is then
2446 updated to an existing record, and we must write an undo log record on
2447 the delete marked record.
2448 @retval DB_SUCCESS on success
2449 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2450 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2451 @return error code */
2452 dberr_t
row_ins_clust_index_entry_low(ulint flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,ulint n_ext,que_thr_t * thr,bool dup_chk_only)2453 row_ins_clust_index_entry_low(
2454 /*==========================*/
2455 ulint flags, /*!< in: undo logging and locking flags */
2456 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2457 depending on whether we wish optimistic or
2458 pessimistic descent down the index tree */
2459 dict_index_t* index, /*!< in: clustered index */
2460 ulint n_uniq, /*!< in: 0 or index->n_uniq */
2461 dtuple_t* entry, /*!< in/out: index entry to insert */
2462 ulint n_ext, /*!< in: number of externally stored columns */
2463 que_thr_t* thr, /*!< in: query thread */
2464 bool dup_chk_only)
2465 /*!< in: if true, just do duplicate check
2466 and return. don't execute actual insert. */
2467 {
2468 btr_pcur_t pcur;
2469 btr_cur_t* cursor;
2470 dberr_t err = DB_SUCCESS;
2471 big_rec_t* big_rec = NULL;
2472 mtr_t mtr;
2473 mem_heap_t* offsets_heap = NULL;
2474 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2475 ulint* offsets = offsets_;
2476 rec_offs_init(offsets_);
2477
2478 DBUG_ENTER("row_ins_clust_index_entry_low");
2479
2480 ut_ad(dict_index_is_clust(index));
2481 ut_ad(!dict_index_is_unique(index)
2482 || n_uniq == dict_index_get_n_unique(index));
2483 ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2484 ut_ad(!thr_get_trx(thr)->in_rollback);
2485
2486 mtr_start(&mtr);
2487 mtr.set_named_space(index->space);
2488
2489 if (dict_table_is_temporary(index->table)) {
2490 /* Disable REDO logging as the lifetime of temp-tables is
2491 limited to server or connection lifetime and so REDO
2492 information is not needed on restart for recovery.
2493 Disable locking as temp-tables are local to a connection. */
2494
2495 ut_ad(flags & BTR_NO_LOCKING_FLAG);
2496 ut_ad(!dict_table_is_intrinsic(index->table)
2497 || (flags & BTR_NO_UNDO_LOG_FLAG));
2498
2499 mtr.set_log_mode(MTR_LOG_NO_REDO);
2500 }
2501
2502 if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2503 mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
2504 mtr_s_lock(dict_index_get_lock(index), &mtr);
2505 }
2506
2507 /* Note that we use PAGE_CUR_LE as the search mode, because then
2508 the function will return in both low_match and up_match of the
2509 cursor sensible values */
2510 btr_pcur_open(index, entry, PAGE_CUR_LE, mode, &pcur, &mtr);
2511 cursor = btr_pcur_get_btr_cur(&pcur);
2512 cursor->thr = thr;
2513
2514 ut_ad(!dict_table_is_intrinsic(index->table)
2515 || cursor->page_cur.block->made_dirty_with_no_latch);
2516
2517 #ifdef UNIV_DEBUG
2518 {
2519 page_t* page = btr_cur_get_page(cursor);
2520 rec_t* first_rec = page_rec_get_next(
2521 page_get_infimum_rec(page));
2522
2523 ut_ad(page_rec_is_supremum(first_rec)
2524 || rec_n_fields_is_sane(index, first_rec, entry));
2525 }
2526 #endif /* UNIV_DEBUG */
2527
2528 /* Allowing duplicates in clustered index is currently enabled
2529 only for intrinsic table and caller understand the limited
2530 operation that can be done in this case. */
2531 ut_ad(!index->allow_duplicates
2532 || (index->allow_duplicates
2533 && dict_table_is_intrinsic(index->table)));
2534
2535 if (!index->allow_duplicates
2536 && n_uniq
2537 && (cursor->up_match >= n_uniq || cursor->low_match >= n_uniq)) {
2538
2539 if (flags
2540 == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2541 | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG)) {
2542 /* Set no locks when applying log
2543 in online table rebuild. Only check for duplicates. */
2544 err = row_ins_duplicate_error_in_clust_online(
2545 n_uniq, entry, cursor,
2546 &offsets, &offsets_heap);
2547
2548 switch (err) {
2549 case DB_SUCCESS:
2550 break;
2551 default:
2552 ut_ad(0);
2553 /* fall through */
2554 case DB_SUCCESS_LOCKED_REC:
2555 case DB_DUPLICATE_KEY:
2556 thr_get_trx(thr)->error_index = cursor->index;
2557 }
2558 } else {
2559 /* Note that the following may return also
2560 DB_LOCK_WAIT */
2561
2562 err = row_ins_duplicate_error_in_clust(
2563 flags, cursor, entry, thr, &mtr);
2564 }
2565
2566 if (err != DB_SUCCESS) {
2567 err_exit:
2568 mtr_commit(&mtr);
2569 goto func_exit;
2570 }
2571 }
2572
2573 if (dup_chk_only) {
2574 mtr_commit(&mtr);
2575 goto func_exit;
2576 }
2577
2578 /* Note: Allowing duplicates would qualify for modification of
2579 an existing record as the new entry is exactly same as old entry.
2580 Avoid this check if allow duplicates is enabled. */
2581 if (!index->allow_duplicates && row_ins_must_modify_rec(cursor)) {
2582 /* There is already an index entry with a long enough common
2583 prefix, we must convert the insert into a modify of an
2584 existing record */
2585 mem_heap_t* entry_heap = mem_heap_create(1024);
2586
2587 /* If the existing record is being modified and the new record
2588 doesn't fit the provided slot then existing record is added
2589 to free list and new record is inserted. This also means
2590 cursor that we have cached for SELECT is now invalid. */
2591 if(index->last_sel_cur) {
2592 index->last_sel_cur->invalid = true;
2593 }
2594
2595 err = row_ins_clust_index_entry_by_modify(
2596 &pcur, flags, mode, &offsets, &offsets_heap,
2597 entry_heap, entry, thr, &mtr);
2598
2599 if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2600 row_log_table_insert(btr_cur_get_rec(cursor), entry,
2601 index, offsets);
2602 }
2603
2604 mtr_commit(&mtr);
2605 mem_heap_free(entry_heap);
2606 } else {
2607 rec_t* insert_rec;
2608
2609 if (mode != BTR_MODIFY_TREE) {
2610 ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2611 == BTR_MODIFY_LEAF);
2612 err = btr_cur_optimistic_insert(
2613 flags, cursor, &offsets, &offsets_heap,
2614 entry, &insert_rec, &big_rec,
2615 n_ext, thr, &mtr);
2616 } else {
2617 if (buf_LRU_buf_pool_running_out()) {
2618
2619 err = DB_LOCK_TABLE_FULL;
2620 goto err_exit;
2621 }
2622
2623 DEBUG_SYNC_C("before_insert_pessimitic_row_ins_clust");
2624
2625 err = btr_cur_optimistic_insert(
2626 flags, cursor,
2627 &offsets, &offsets_heap,
2628 entry, &insert_rec, &big_rec,
2629 n_ext, thr, &mtr);
2630
2631 if (err == DB_FAIL) {
2632 err = btr_cur_pessimistic_insert(
2633 flags, cursor,
2634 &offsets, &offsets_heap,
2635 entry, &insert_rec, &big_rec,
2636 n_ext, thr, &mtr);
2637 }
2638 }
2639
2640 if (big_rec != NULL) {
2641 mtr_commit(&mtr);
2642
2643 /* Online table rebuild could read (and
2644 ignore) the incomplete record at this point.
2645 If online rebuild is in progress, the
2646 row_ins_index_entry_big_rec() will write log. */
2647
2648 DBUG_EXECUTE_IF(
2649 "row_ins_extern_checkpoint",
2650 log_make_checkpoint_at(
2651 LSN_MAX, TRUE););
2652 err = row_ins_index_entry_big_rec(
2653 entry, big_rec, offsets, &offsets_heap, index,
2654 thr_get_trx(thr)->mysql_thd,
2655 __FILE__, __LINE__);
2656 dtuple_convert_back_big_rec(index, entry, big_rec);
2657 } else {
2658 if (err == DB_SUCCESS
2659 && dict_index_is_online_ddl(index)) {
2660 row_log_table_insert(
2661 insert_rec, entry, index, offsets);
2662 }
2663
2664 mtr_commit(&mtr);
2665 }
2666 }
2667
2668 func_exit:
2669 if (offsets_heap != NULL) {
2670 mem_heap_free(offsets_heap);
2671 }
2672
2673 btr_pcur_close(&pcur);
2674
2675 DBUG_RETURN(err);
2676 }
2677
2678 /** This is a specialized function meant for direct insertion to
2679 auto-generated clustered index based on cached position from
2680 last successful insert. To be used when data is sorted.
2681
2682 @param[in] mode BTR_MODIFY_LEAF or BTR_MODIFY_TREE.
2683 depending on whether we wish optimistic or
2684 pessimistic descent down the index tree
2685 @param[in,out] index clustered index
2686 @param[in,out] entry index entry to insert
2687 @param[in] thr query thread
2688
2689 @return error code */
2690 static
2691 dberr_t
row_ins_sorted_clust_index_entry(ulint mode,dict_index_t * index,dtuple_t * entry,ulint n_ext,que_thr_t * thr)2692 row_ins_sorted_clust_index_entry(
2693 ulint mode,
2694 dict_index_t* index,
2695 dtuple_t* entry,
2696 ulint n_ext,
2697 que_thr_t* thr)
2698 {
2699 dberr_t err;
2700 mtr_t* mtr;
2701 const bool commit_mtr = mode == BTR_MODIFY_TREE;
2702
2703 mem_heap_t* offsets_heap = NULL;
2704 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2705 ulint* offsets = offsets_;
2706 rec_offs_init(offsets_);
2707
2708 DBUG_ENTER("row_ins_sorted_clust_index_entry");
2709
2710 ut_ad(index->last_ins_cur != NULL);
2711 ut_ad(dict_index_is_clust(index));
2712 ut_ad(dict_table_is_intrinsic(index->table));
2713 ut_ad(dict_index_is_auto_gen_clust(index));
2714
2715 btr_cur_t cursor;
2716 cursor.thr = thr;
2717 mtr = &index->last_ins_cur->mtr;
2718
2719 /* Search for position if tree needs to be split or if last position
2720 is not cached. */
2721 if (mode == BTR_MODIFY_TREE
2722 || index->last_ins_cur->rec == NULL
2723 || index->last_ins_cur->disable_caching) {
2724
2725 /* Commit the previous mtr. */
2726 index->last_ins_cur->release();
2727
2728 mtr_start(mtr);
2729 mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
2730
2731 btr_cur_search_to_nth_level_with_no_latch(
2732 index, 0, entry, PAGE_CUR_LE, &cursor,
2733 __FILE__, __LINE__, mtr);
2734 ut_ad(cursor.page_cur.block != NULL);
2735 ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2736 } else {
2737 cursor.index = index;
2738
2739 cursor.page_cur.index = index;
2740
2741 cursor.page_cur.rec = index->last_ins_cur->rec;
2742
2743 cursor.page_cur.block = index->last_ins_cur->block;
2744 }
2745
2746 const ulint flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
2747
2748 for (;;) {
2749 rec_t* insert_rec;
2750 big_rec_t* big_rec = NULL;
2751
2752 if (mode != BTR_MODIFY_TREE) {
2753 ut_ad((mode & ~BTR_ALREADY_S_LATCHED)
2754 == BTR_MODIFY_LEAF);
2755
2756 err = btr_cur_optimistic_insert(
2757 flags, &cursor, &offsets, &offsets_heap, entry,
2758 &insert_rec, &big_rec, n_ext, thr, mtr);
2759 if (err != DB_SUCCESS) {
2760 break;
2761 }
2762 } else {
2763 /* TODO: Check if this is needed for intrinsic table. */
2764 if (buf_LRU_buf_pool_running_out()) {
2765 err = DB_LOCK_TABLE_FULL;
2766 break;
2767 }
2768
2769 err = btr_cur_optimistic_insert(
2770 flags, &cursor, &offsets, &offsets_heap, entry,
2771 &insert_rec, &big_rec, n_ext, thr, mtr);
2772
2773 if (err == DB_FAIL) {
2774 err = btr_cur_pessimistic_insert(
2775 flags, &cursor, &offsets, &offsets_heap,
2776 entry, &insert_rec, &big_rec, n_ext,
2777 thr, mtr);
2778 }
2779 }
2780
2781 if (big_rec != NULL) {
2782 /* If index involves big-record optimization is
2783 turned-off. */
2784 index->last_ins_cur->release();
2785 index->last_ins_cur->disable_caching = true;
2786
2787 err = row_ins_index_entry_big_rec(
2788 entry, big_rec, offsets, &offsets_heap, index,
2789 thr_get_trx(thr)->mysql_thd, __FILE__, __LINE__);
2790
2791 dtuple_convert_back_big_rec(index, entry, big_rec);
2792
2793 } else if (err == DB_SUCCESS ) {
2794 if (!commit_mtr
2795 && !index->last_ins_cur->disable_caching) {
2796 index->last_ins_cur->rec = insert_rec;
2797
2798 index->last_ins_cur->block
2799 = cursor.page_cur.block;
2800 } else {
2801 index->last_ins_cur->release();
2802 }
2803 }
2804
2805 break;
2806 }
2807
2808 if (err != DB_SUCCESS) {
2809 index->last_ins_cur->release();
2810 }
2811
2812 if (offsets_heap != NULL) {
2813 mem_heap_free(offsets_heap);
2814 }
2815
2816 DBUG_RETURN(err);
2817 }
2818
2819 /** Start a mini-transaction and check if the index will be dropped.
2820 @param[in,out] mtr mini-transaction
2821 @param[in,out] index secondary index
2822 @param[in] check whether to check
2823 @param[in] search_mode flags
2824 @return true if the index is to be dropped */
2825 static MY_ATTRIBUTE((warn_unused_result))
2826 bool
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2827 row_ins_sec_mtr_start_and_check_if_aborted(
2828 mtr_t* mtr,
2829 dict_index_t* index,
2830 bool check,
2831 ulint search_mode)
2832 {
2833 ut_ad(!dict_index_is_clust(index));
2834 ut_ad(mtr->is_named_space(index->space));
2835
2836 const mtr_log_t log_mode = mtr->get_log_mode();
2837
2838 mtr_start(mtr);
2839 mtr->set_named_space(index->space);
2840 mtr->set_log_mode(log_mode);
2841
2842 if (!check) {
2843 return(false);
2844 }
2845
2846 if (search_mode & BTR_ALREADY_S_LATCHED) {
2847 mtr_s_lock(dict_index_get_lock(index), mtr);
2848 } else {
2849 mtr_sx_lock(dict_index_get_lock(index), mtr);
2850 }
2851
2852 switch (index->online_status) {
2853 case ONLINE_INDEX_ABORTED:
2854 case ONLINE_INDEX_ABORTED_DROPPED:
2855 ut_ad(!index->is_committed());
2856 return(true);
2857 case ONLINE_INDEX_COMPLETE:
2858 return(false);
2859 case ONLINE_INDEX_CREATION:
2860 break;
2861 }
2862
2863 ut_error;
2864 return(true);
2865 }
2866
2867 /***************************************************************//**
2868 Tries to insert an entry into a secondary index. If a record with exactly the
2869 same fields is found, the other record is necessarily marked deleted.
2870 It is then unmarked. Otherwise, the entry is just inserted to the index.
2871 @retval DB_SUCCESS on success
2872 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2873 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2874 @return error code */
2875 dberr_t
row_ins_sec_index_entry_low(ulint flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr,bool dup_chk_only)2876 row_ins_sec_index_entry_low(
2877 /*========================*/
2878 ulint flags, /*!< in: undo logging and locking flags */
2879 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2880 depending on whether we wish optimistic or
2881 pessimistic descent down the index tree */
2882 dict_index_t* index, /*!< in: secondary index */
2883 mem_heap_t* offsets_heap,
2884 /*!< in/out: memory heap that can be emptied */
2885 mem_heap_t* heap, /*!< in/out: memory heap */
2886 dtuple_t* entry, /*!< in/out: index entry to insert */
2887 trx_id_t trx_id, /*!< in: PAGE_MAX_TRX_ID during
2888 row_log_table_apply(), or 0 */
2889 que_thr_t* thr, /*!< in: query thread */
2890 bool dup_chk_only)
2891 /*!< in: if true, just do duplicate check
2892 and return. don't execute actual insert. */
2893 {
2894 DBUG_ENTER("row_ins_sec_index_entry_low");
2895
2896 btr_cur_t cursor;
2897 ulint search_mode = mode;
2898 dberr_t err = DB_SUCCESS;
2899 ulint n_unique;
2900 mtr_t mtr;
2901 ulint offsets_[REC_OFFS_NORMAL_SIZE];
2902 ulint* offsets = offsets_;
2903 rec_offs_init(offsets_);
2904 rtr_info_t rtr_info;
2905
2906 ut_ad(!dict_index_is_clust(index));
2907 ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2908
2909 cursor.thr = thr;
2910 cursor.rtr_info = NULL;
2911 ut_ad(thr_get_trx(thr)->id != 0
2912 || dict_table_is_intrinsic(index->table));
2913
2914 mtr_start(&mtr);
2915 mtr.set_named_space(index->space);
2916
2917 if (dict_table_is_temporary(index->table)) {
2918 /* Disable REDO logging as the lifetime of temp-tables is
2919 limited to server or connection lifetime and so REDO
2920 information is not needed on restart for recovery.
2921 Disable locking as temp-tables are local to a connection. */
2922
2923 ut_ad(flags & BTR_NO_LOCKING_FLAG);
2924 ut_ad(!dict_table_is_intrinsic(index->table)
2925 || (flags & BTR_NO_UNDO_LOG_FLAG));
2926
2927 mtr.set_log_mode(MTR_LOG_NO_REDO);
2928 } else if (!dict_index_is_spatial(index)) {
2929 /* Enable insert buffering if it's neither temp-table
2930 nor spatial index. */
2931 search_mode |= BTR_INSERT;
2932 }
2933
2934 /* Ensure that we acquire index->lock when inserting into an
2935 index with index->online_status == ONLINE_INDEX_COMPLETE, but
2936 could still be subject to rollback_inplace_alter_table().
2937 This prevents a concurrent change of index->online_status.
2938 The memory object cannot be freed as long as we have an open
2939 reference to the table, or index->table->n_ref_count > 0. */
2940 const bool check = !index->is_committed();
2941 if (check) {
2942 DEBUG_SYNC_C("row_ins_sec_index_enter");
2943 if (mode == BTR_MODIFY_LEAF) {
2944 search_mode |= BTR_ALREADY_S_LATCHED;
2945 mtr_s_lock(dict_index_get_lock(index), &mtr);
2946 } else {
2947 mtr_sx_lock(dict_index_get_lock(index), &mtr);
2948 }
2949
2950 if (row_log_online_op_try(
2951 index, entry, thr_get_trx(thr)->id)) {
2952 goto func_exit;
2953 }
2954 }
2955
2956 /* Note that we use PAGE_CUR_LE as the search mode, because then
2957 the function will return in both low_match and up_match of the
2958 cursor sensible values */
2959
2960 if (!thr_get_trx(thr)->check_unique_secondary) {
2961 search_mode |= BTR_IGNORE_SEC_UNIQUE;
2962 }
2963
2964 if (dict_index_is_spatial(index)) {
2965 cursor.index = index;
2966 rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2967 rtr_info_update_btr(&cursor, &rtr_info);
2968
2969 btr_cur_search_to_nth_level(
2970 index, 0, entry, PAGE_CUR_RTREE_INSERT,
2971 search_mode,
2972 &cursor, 0, __FILE__, __LINE__, &mtr);
2973
2974 if (mode == BTR_MODIFY_LEAF && rtr_info.mbr_adj) {
2975 mtr_commit(&mtr);
2976 rtr_clean_rtr_info(&rtr_info, true);
2977 rtr_init_rtr_info(&rtr_info, false, &cursor,
2978 index, false);
2979 rtr_info_update_btr(&cursor, &rtr_info);
2980 mtr_start(&mtr);
2981 mtr.set_named_space(index->space);
2982 search_mode &= ~BTR_MODIFY_LEAF;
2983 search_mode |= BTR_MODIFY_TREE;
2984 btr_cur_search_to_nth_level(
2985 index, 0, entry, PAGE_CUR_RTREE_INSERT,
2986 search_mode,
2987 &cursor, 0, __FILE__, __LINE__, &mtr);
2988 mode = BTR_MODIFY_TREE;
2989 }
2990
2991 DBUG_EXECUTE_IF(
2992 "rtree_test_check_count", {
2993 goto func_exit;});
2994
2995 } else {
2996 if (dict_table_is_intrinsic(index->table)) {
2997 btr_cur_search_to_nth_level_with_no_latch(
2998 index, 0, entry, PAGE_CUR_LE, &cursor,
2999 __FILE__, __LINE__, &mtr);
3000 ut_ad(cursor.page_cur.block != NULL);
3001 ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
3002 } else {
3003 btr_cur_search_to_nth_level(
3004 index, 0, entry, PAGE_CUR_LE,
3005 search_mode,
3006 &cursor, 0, __FILE__, __LINE__, &mtr);
3007 }
3008 }
3009
3010 if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
3011 ut_ad(!dict_index_is_spatial(index));
3012 /* The insert was buffered during the search: we are done */
3013 goto func_exit;
3014 }
3015
3016 #ifdef UNIV_DEBUG
3017 {
3018 page_t* page = btr_cur_get_page(&cursor);
3019 rec_t* first_rec = page_rec_get_next(
3020 page_get_infimum_rec(page));
3021
3022 ut_ad(page_rec_is_supremum(first_rec)
3023 || rec_n_fields_is_sane(index, first_rec, entry));
3024 }
3025 #endif /* UNIV_DEBUG */
3026
3027 n_unique = dict_index_get_n_unique(index);
3028
3029 if (dict_index_is_unique(index)
3030 && (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
3031 mtr_commit(&mtr);
3032
3033 DEBUG_SYNC_C("row_ins_sec_index_unique");
3034
3035 if (row_ins_sec_mtr_start_and_check_if_aborted(
3036 &mtr, index, check, search_mode)) {
3037 goto func_exit;
3038 }
3039
3040 err = row_ins_scan_sec_index_for_duplicate(
3041 flags, index, entry, thr, check, &mtr, offsets_heap);
3042
3043 mtr_commit(&mtr);
3044
3045 switch (err) {
3046 case DB_SUCCESS:
3047 break;
3048 case DB_DUPLICATE_KEY:
3049 if (!index->is_committed()) {
3050 ut_ad(!thr_get_trx(thr)
3051 ->dict_operation_lock_mode);
3052 mutex_enter(&dict_sys->mutex);
3053 dict_set_corrupted_index_cache_only(index);
3054 mutex_exit(&dict_sys->mutex);
3055 /* Do not return any error to the
3056 caller. The duplicate will be reported
3057 by ALTER TABLE or CREATE UNIQUE INDEX.
3058 Unfortunately we cannot report the
3059 duplicate key value to the DDL thread,
3060 because the altered_table object is
3061 private to its call stack. */
3062 err = DB_SUCCESS;
3063 }
3064 /* fall through */
3065 default:
3066 if (dict_index_is_spatial(index)) {
3067 rtr_clean_rtr_info(&rtr_info, true);
3068 }
3069 DBUG_RETURN(err);
3070 }
3071
3072 if (row_ins_sec_mtr_start_and_check_if_aborted(
3073 &mtr, index, check, search_mode)) {
3074 goto func_exit;
3075 }
3076
3077 DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
3078
3079 /* We did not find a duplicate and we have now
3080 locked with s-locks the necessary records to
3081 prevent any insertion of a duplicate by another
3082 transaction. Let us now reposition the cursor and
3083 continue the insertion. */
3084 if (dict_table_is_intrinsic(index->table)) {
3085 btr_cur_search_to_nth_level_with_no_latch(
3086 index, 0, entry, PAGE_CUR_LE, &cursor,
3087 __FILE__, __LINE__, &mtr);
3088 ut_ad(cursor.page_cur.block != NULL);
3089 ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
3090 } else {
3091 btr_cur_search_to_nth_level(
3092 index, 0, entry, PAGE_CUR_LE,
3093 (search_mode
3094 & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE)),
3095 &cursor, 0, __FILE__, __LINE__, &mtr);
3096 }
3097 }
3098
3099 if (dup_chk_only) {
3100 goto func_exit;
3101 }
3102
3103 if (row_ins_must_modify_rec(&cursor)) {
3104 /* If the existing record is being modified and the new record
3105 is doesn't fit the provided slot then existing record is added
3106 to free list and new record is inserted. This also means
3107 cursor that we have cached for SELECT is now invalid. */
3108 if(index->last_sel_cur) {
3109 index->last_sel_cur->invalid = true;
3110 }
3111
3112 /* There is already an index entry with a long enough common
3113 prefix, we must convert the insert into a modify of an
3114 existing record */
3115 offsets = rec_get_offsets(
3116 btr_cur_get_rec(&cursor), index, offsets,
3117 ULINT_UNDEFINED, &offsets_heap);
3118
3119 err = row_ins_sec_index_entry_by_modify(
3120 flags, mode, &cursor, &offsets,
3121 offsets_heap, heap, entry, thr, &mtr);
3122
3123 if (err == DB_SUCCESS && dict_index_is_spatial(index)
3124 && rtr_info.mbr_adj) {
3125 err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3126 }
3127 } else {
3128 rec_t* insert_rec;
3129 big_rec_t* big_rec;
3130
3131 if (mode == BTR_MODIFY_LEAF) {
3132 err = btr_cur_optimistic_insert(
3133 flags, &cursor, &offsets, &offsets_heap,
3134 entry, &insert_rec,
3135 &big_rec, 0, thr, &mtr);
3136 if (err == DB_SUCCESS
3137 && dict_index_is_spatial(index)
3138 && rtr_info.mbr_adj) {
3139 err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3140 }
3141 } else {
3142 ut_ad(mode == BTR_MODIFY_TREE);
3143 if (buf_LRU_buf_pool_running_out()) {
3144
3145 err = DB_LOCK_TABLE_FULL;
3146 goto func_exit;
3147 }
3148
3149 err = btr_cur_optimistic_insert(
3150 flags, &cursor,
3151 &offsets, &offsets_heap,
3152 entry, &insert_rec,
3153 &big_rec, 0, thr, &mtr);
3154 if (err == DB_FAIL) {
3155 err = btr_cur_pessimistic_insert(
3156 flags, &cursor,
3157 &offsets, &offsets_heap,
3158 entry, &insert_rec,
3159 &big_rec, 0, thr, &mtr);
3160 }
3161 if (err == DB_SUCCESS
3162 && dict_index_is_spatial(index)
3163 && rtr_info.mbr_adj) {
3164 err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3165 }
3166 }
3167
3168 if (err == DB_SUCCESS && trx_id) {
3169 page_update_max_trx_id(
3170 btr_cur_get_block(&cursor),
3171 btr_cur_get_page_zip(&cursor),
3172 trx_id, &mtr);
3173 }
3174
3175 ut_ad(!big_rec);
3176 }
3177
3178 func_exit:
3179 if (dict_index_is_spatial(index)) {
3180 rtr_clean_rtr_info(&rtr_info, true);
3181 }
3182
3183 mtr_commit(&mtr);
3184 DBUG_RETURN(err);
3185 }
3186
3187 /***************************************************************//**
3188 Tries to insert the externally stored fields (off-page columns)
3189 of a clustered index entry.
3190 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
3191 dberr_t
row_ins_index_entry_big_rec_func(const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,dict_index_t * index,const char * file,const void * thd,ulint line)3192 row_ins_index_entry_big_rec_func(
3193 /*=============================*/
3194 const dtuple_t* entry, /*!< in/out: index entry to insert */
3195 const big_rec_t* big_rec,/*!< in: externally stored fields */
3196 ulint* offsets,/*!< in/out: rec offsets */
3197 mem_heap_t** heap, /*!< in/out: memory heap */
3198 dict_index_t* index, /*!< in: index */
3199 const char* file, /*!< in: file name of caller */
3200 #ifndef DBUG_OFF
3201 const void* thd, /*!< in: connection, or NULL */
3202 #endif /* DBUG_OFF */
3203 ulint line) /*!< in: line number of caller */
3204 {
3205 mtr_t mtr;
3206 btr_pcur_t pcur;
3207 rec_t* rec;
3208 dberr_t error;
3209
3210 ut_ad(dict_index_is_clust(index));
3211
3212 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
3213 DEBUG_SYNC_C("before_insertion_of_blob");
3214
3215 mtr_start(&mtr);
3216 mtr.set_named_space(index->space);
3217 dict_disable_redo_if_temporary(index->table, &mtr);
3218
3219 btr_pcur_open(index, entry, PAGE_CUR_LE, BTR_MODIFY_TREE,
3220 &pcur, &mtr);
3221 rec = btr_pcur_get_rec(&pcur);
3222 offsets = rec_get_offsets(rec, index, offsets,
3223 ULINT_UNDEFINED, heap);
3224
3225 DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
3226 error = btr_store_big_rec_extern_fields(
3227 &pcur, 0, offsets, big_rec, &mtr, BTR_STORE_INSERT);
3228 DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
3229
3230 if (error == DB_SUCCESS
3231 && dict_index_is_online_ddl(index)) {
3232 row_log_table_insert(btr_pcur_get_rec(&pcur), entry,
3233 index, offsets);
3234 }
3235
3236 mtr_commit(&mtr);
3237
3238 btr_pcur_close(&pcur);
3239
3240 return(error);
3241 }
3242
3243 /***************************************************************//**
3244 Inserts an entry into a clustered index. Tries first optimistic,
3245 then pessimistic descent down the tree. If the entry matches enough
3246 to a delete marked record, performs the insert by updating or delete
3247 unmarking the delete marked record.
3248 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3249 dberr_t
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,ulint n_ext,bool dup_chk_only)3250 row_ins_clust_index_entry(
3251 /*======================*/
3252 dict_index_t* index, /*!< in: clustered index */
3253 dtuple_t* entry, /*!< in/out: index entry to insert */
3254 que_thr_t* thr, /*!< in: query thread */
3255 ulint n_ext, /*!< in: number of externally stored columns */
3256 bool dup_chk_only)
3257 /*!< in: if true, just do duplicate check
3258 and return. don't execute actual insert. */
3259 {
3260 dberr_t err;
3261 ulint n_uniq;
3262
3263 DBUG_ENTER("row_ins_clust_index_entry");
3264
3265 if (!index->table->foreign_set.empty()) {
3266 err = row_ins_check_foreign_constraints(
3267 index->table, index, entry, thr);
3268 if (err != DB_SUCCESS) {
3269
3270 DBUG_RETURN(err);
3271 }
3272 }
3273
3274 n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
3275
3276 /* Try first optimistic descent to the B-tree */
3277 ulint flags;
3278
3279 if (!dict_table_is_intrinsic(index->table)) {
3280 log_free_check();
3281 flags = dict_table_is_temporary(index->table)
3282 ? BTR_NO_LOCKING_FLAG
3283 : 0;
3284 } else {
3285 flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3286 }
3287
3288 if (dict_table_is_intrinsic(index->table)
3289 && dict_index_is_auto_gen_clust(index)) {
3290
3291 /* Check if the memory allocated for intrinsic cache*/
3292 if(!index->last_ins_cur) {
3293 dict_allocate_mem_intrinsic_cache(index);
3294 }
3295
3296 err = row_ins_sorted_clust_index_entry(
3297 BTR_MODIFY_LEAF, index, entry, n_ext, thr);
3298 } else {
3299 err = row_ins_clust_index_entry_low(
3300 flags, BTR_MODIFY_LEAF, index, n_uniq, entry,
3301 n_ext, thr, dup_chk_only);
3302 }
3303
3304
3305 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3306 "after_row_ins_clust_index_entry_leaf");
3307
3308 if (err != DB_FAIL) {
3309 DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
3310 DBUG_RETURN(err);
3311 }
3312
3313 /* Try then pessimistic descent to the B-tree */
3314 if (!dict_table_is_intrinsic(index->table)) {
3315 log_free_check();
3316 } else if(!index->last_sel_cur) {
3317 dict_allocate_mem_intrinsic_cache(index);
3318 index->last_sel_cur->invalid = true;
3319 } else {
3320 index->last_sel_cur->invalid = true;
3321 }
3322
3323 if (dict_table_is_intrinsic(index->table)
3324 && dict_index_is_auto_gen_clust(index)) {
3325 err = row_ins_sorted_clust_index_entry(
3326 BTR_MODIFY_TREE, index, entry, n_ext, thr);
3327 } else {
3328 err = row_ins_clust_index_entry_low(
3329 flags, BTR_MODIFY_TREE, index, n_uniq, entry,
3330 n_ext, thr, dup_chk_only);
3331 }
3332
3333 DBUG_RETURN(err);
3334 }
3335
3336 /***************************************************************//**
3337 Inserts an entry into a secondary index. Tries first optimistic,
3338 then pessimistic descent down the tree. If the entry matches enough
3339 to a delete marked record, performs the insert by updating or delete
3340 unmarking the delete marked record.
3341 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3342 dberr_t
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)3343 row_ins_sec_index_entry(
3344 /*====================*/
3345 dict_index_t* index, /*!< in: secondary index */
3346 dtuple_t* entry, /*!< in/out: index entry to insert */
3347 que_thr_t* thr, /*!< in: query thread */
3348 bool dup_chk_only)
3349 /*!< in: if true, just do duplicate check
3350 and return. don't execute actual insert. */
3351 {
3352 dberr_t err;
3353 mem_heap_t* offsets_heap;
3354 mem_heap_t* heap;
3355
3356 DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
3357 DBUG_SET("-d,row_ins_sec_index_entry_timeout");
3358 return(DB_LOCK_WAIT);});
3359
3360 if (!index->table->foreign_set.empty()) {
3361 err = row_ins_check_foreign_constraints(index->table, index,
3362 entry, thr);
3363 if (err != DB_SUCCESS) {
3364
3365 return(err);
3366 }
3367 }
3368
3369 ut_ad(thr_get_trx(thr)->id != 0
3370 || dict_table_is_intrinsic(index->table));
3371
3372 offsets_heap = mem_heap_create(1024);
3373 heap = mem_heap_create(1024);
3374
3375 /* Try first optimistic descent to the B-tree */
3376
3377 ulint flags;
3378
3379 if (!dict_table_is_intrinsic(index->table)) {
3380 log_free_check();
3381 flags = dict_table_is_temporary(index->table)
3382 ? BTR_NO_LOCKING_FLAG
3383 : 0;
3384 } else {
3385 flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3386 }
3387
3388 err = row_ins_sec_index_entry_low(
3389 flags, BTR_MODIFY_LEAF, index, offsets_heap, heap, entry,
3390 0, thr, dup_chk_only);
3391 if (err == DB_FAIL) {
3392 mem_heap_empty(heap);
3393
3394 /* Try then pessimistic descent to the B-tree */
3395
3396 if (!dict_table_is_intrinsic(index->table)) {
3397 log_free_check();
3398 } else if(!index->last_sel_cur) {
3399 dict_allocate_mem_intrinsic_cache(index);
3400 index->last_sel_cur->invalid = true;
3401 } else {
3402 index->last_sel_cur->invalid = true;
3403 }
3404
3405 err = row_ins_sec_index_entry_low(
3406 flags, BTR_MODIFY_TREE, index,
3407 offsets_heap, heap, entry, 0, thr,
3408 dup_chk_only);
3409 }
3410
3411 mem_heap_free(heap);
3412 mem_heap_free(offsets_heap);
3413 return(err);
3414 }
3415
3416 /***************************************************************//**
3417 Inserts an index entry to index. Tries first optimistic, then pessimistic
3418 descent down the tree. If the entry matches enough to a delete marked record,
3419 performs the insert by updating or delete unmarking the delete marked
3420 record.
3421 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
3422 static
3423 dberr_t
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)3424 row_ins_index_entry(
3425 /*================*/
3426 dict_index_t* index, /*!< in: index */
3427 dtuple_t* entry, /*!< in/out: index entry to insert */
3428 que_thr_t* thr) /*!< in: query thread */
3429 {
3430 ut_ad(thr_get_trx(thr)->id != 0);
3431
3432 DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
3433 DBUG_SET("-d,row_ins_index_entry_timeout");
3434 return(DB_LOCK_WAIT);});
3435
3436 if (dict_index_is_clust(index)) {
3437 return(row_ins_clust_index_entry(index, entry, thr, 0, false));
3438 } else {
3439 return(row_ins_sec_index_entry(index, entry, thr, false));
3440 }
3441 }
3442
3443
3444 /*****************************************************************//**
3445 This function generate MBR (Minimum Bounding Box) for spatial objects
3446 and set it to spatial index field. */
3447 static
3448 void
row_ins_spatial_index_entry_set_mbr_field(dfield_t * field,const dfield_t * row_field)3449 row_ins_spatial_index_entry_set_mbr_field(
3450 /*======================================*/
3451 dfield_t* field, /*!< in/out: mbr field */
3452 const dfield_t* row_field) /*!< in: row field */
3453 {
3454 uchar* dptr = NULL;
3455 ulint dlen = 0;
3456 double mbr[SPDIMS * 2];
3457
3458 /* This must be a GEOMETRY datatype */
3459 ut_ad(DATA_GEOMETRY_MTYPE(field->type.mtype));
3460
3461 dptr = static_cast<uchar*>(dfield_get_data(row_field));
3462 dlen = dfield_get_len(row_field);
3463
3464 /* obtain the MBR */
3465 rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
3466 static_cast<uint>(dlen - GEO_DATA_HEADER_SIZE),
3467 SPDIMS, mbr);
3468
3469 /* Set mbr as index entry data */
3470 dfield_write_mbr(field, mbr);
3471 }
3472
3473 /** Sets the values of the dtuple fields in entry from the values of appropriate
3474 columns in row.
3475 @param[in] index index handler
3476 @param[out] entry index entry to make
3477 @param[in] row row
3478
3479 @return DB_SUCCESS if the set is successful */
3480 dberr_t
row_ins_index_entry_set_vals(const dict_index_t * index,dtuple_t * entry,const dtuple_t * row)3481 row_ins_index_entry_set_vals(
3482 const dict_index_t* index,
3483 dtuple_t* entry,
3484 const dtuple_t* row)
3485 {
3486 ulint n_fields;
3487 ulint i;
3488 ulint num_v = dtuple_get_n_v_fields(entry);
3489
3490 n_fields = dtuple_get_n_fields(entry);
3491
3492 for (i = 0; i < n_fields + num_v; i++) {
3493 dict_field_t* ind_field = NULL;
3494 dfield_t* field;
3495 const dfield_t* row_field;
3496 ulint len;
3497 dict_col_t* col;
3498
3499 if (i >= n_fields) {
3500 /* This is virtual field */
3501 field = dtuple_get_nth_v_field(entry, i - n_fields);
3502 col = &dict_table_get_nth_v_col(
3503 index->table, i - n_fields)->m_col;
3504 } else {
3505 field = dtuple_get_nth_field(entry, i);
3506 ind_field = dict_index_get_nth_field(index, i);
3507 col = ind_field->col;
3508 }
3509
3510 if (dict_col_is_virtual(col)) {
3511 const dict_v_col_t* v_col
3512 = reinterpret_cast<const dict_v_col_t*>(col);
3513 ut_ad(dtuple_get_n_fields(row)
3514 == dict_table_get_n_cols(index->table));
3515 row_field = dtuple_get_nth_v_field(row, v_col->v_pos);
3516 } else {
3517 row_field = dtuple_get_nth_field(
3518 row, ind_field->col->ind);
3519 }
3520
3521 len = dfield_get_len(row_field);
3522
3523 /* Check column prefix indexes */
3524 if (ind_field != NULL && ind_field->prefix_len > 0
3525 && dfield_get_len(row_field) != UNIV_SQL_NULL) {
3526
3527 const dict_col_t* col
3528 = dict_field_get_col(ind_field);
3529
3530 len = dtype_get_at_most_n_mbchars(
3531 col->prtype, col->mbminmaxlen,
3532 ind_field->prefix_len,
3533 len,
3534 static_cast<const char*>(
3535 dfield_get_data(row_field)));
3536
3537 ut_ad(!dfield_is_ext(row_field));
3538 }
3539
3540 /* Handle spatial index. For the first field, replace
3541 the data with its MBR (Minimum Bounding Box). */
3542 if ((i == 0) && dict_index_is_spatial(index)) {
3543 if (!row_field->data
3544 || row_field->len < GEO_DATA_HEADER_SIZE) {
3545 return(DB_CANT_CREATE_GEOMETRY_OBJECT);
3546 }
3547 row_ins_spatial_index_entry_set_mbr_field(
3548 field, row_field);
3549 continue;
3550 }
3551
3552 dfield_set_data(field, dfield_get_data(row_field), len);
3553 if (dfield_is_ext(row_field)) {
3554 ut_ad(dict_index_is_clust(index));
3555 dfield_set_ext(field);
3556 }
3557 }
3558
3559 return(DB_SUCCESS);
3560 }
3561
3562 /***********************************************************//**
3563 Inserts a single index entry to the table.
3564 @return DB_SUCCESS if operation successfully completed, else error
3565 code or DB_LOCK_WAIT */
3566 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3567 dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3568 row_ins_index_entry_step(
3569 /*=====================*/
3570 ins_node_t* node, /*!< in: row insert node */
3571 que_thr_t* thr) /*!< in: query thread */
3572 {
3573 dberr_t err;
3574
3575 DBUG_ENTER("row_ins_index_entry_step");
3576
3577 ut_ad(dtuple_check_typed(node->row));
3578
3579 err = row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3580
3581 if (err != DB_SUCCESS) {
3582 DBUG_RETURN(err);
3583 }
3584
3585 ut_ad(dtuple_check_typed(node->entry));
3586
3587 err = row_ins_index_entry(node->index, node->entry, thr);
3588
3589 DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3590 "after_row_ins_index_entry_step");
3591
3592 DBUG_RETURN(err);
3593 }
3594
3595 /***********************************************************//**
3596 Allocates a row id for row and inits the node->index field. */
3597 UNIV_INLINE
3598 void
row_ins_alloc_row_id_step(ins_node_t * node)3599 row_ins_alloc_row_id_step(
3600 /*======================*/
3601 ins_node_t* node) /*!< in: row insert node */
3602 {
3603 row_id_t row_id;
3604
3605 ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3606
3607 if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
3608
3609 /* No row id is stored if the clustered index is unique */
3610
3611 return;
3612 }
3613
3614 /* Fill in row id value to row */
3615
3616 row_id = dict_sys_get_new_row_id();
3617
3618 dict_sys_write_row_id(node->row_id_buf, row_id);
3619 }
3620
3621 /***********************************************************//**
3622 Gets a row to insert from the values list. */
3623 UNIV_INLINE
3624 void
row_ins_get_row_from_values(ins_node_t * node)3625 row_ins_get_row_from_values(
3626 /*========================*/
3627 ins_node_t* node) /*!< in: row insert node */
3628 {
3629 que_node_t* list_node;
3630 dfield_t* dfield;
3631 dtuple_t* row;
3632 ulint i;
3633
3634 /* The field values are copied in the buffers of the select node and
3635 it is safe to use them until we fetch from select again: therefore
3636 we can just copy the pointers */
3637
3638 row = node->row;
3639
3640 i = 0;
3641 list_node = node->values_list;
3642
3643 while (list_node) {
3644 eval_exp(list_node);
3645
3646 dfield = dtuple_get_nth_field(row, i);
3647 dfield_copy_data(dfield, que_node_get_val(list_node));
3648
3649 i++;
3650 list_node = que_node_get_next(list_node);
3651 }
3652 }
3653
3654 /***********************************************************//**
3655 Gets a row to insert from the select list. */
3656 UNIV_INLINE
3657 void
row_ins_get_row_from_select(ins_node_t * node)3658 row_ins_get_row_from_select(
3659 /*========================*/
3660 ins_node_t* node) /*!< in: row insert node */
3661 {
3662 que_node_t* list_node;
3663 dfield_t* dfield;
3664 dtuple_t* row;
3665 ulint i;
3666
3667 /* The field values are copied in the buffers of the select node and
3668 it is safe to use them until we fetch from select again: therefore
3669 we can just copy the pointers */
3670
3671 row = node->row;
3672
3673 i = 0;
3674 list_node = node->select->select_list;
3675
3676 while (list_node) {
3677 dfield = dtuple_get_nth_field(row, i);
3678 dfield_copy_data(dfield, que_node_get_val(list_node));
3679
3680 i++;
3681 list_node = que_node_get_next(list_node);
3682 }
3683 }
3684
3685 /***********************************************************//**
3686 Inserts a row to a table.
3687 @return DB_SUCCESS if operation successfully completed, else error
3688 code or DB_LOCK_WAIT */
3689 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3690 dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3691 row_ins(
3692 /*====*/
3693 ins_node_t* node, /*!< in: row insert node */
3694 que_thr_t* thr) /*!< in: query thread */
3695 {
3696 dberr_t err;
3697
3698 DBUG_ENTER("row_ins");
3699
3700 DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));
3701
3702 if (node->state == INS_NODE_ALLOC_ROW_ID) {
3703
3704 row_ins_alloc_row_id_step(node);
3705
3706 node->index = dict_table_get_first_index(node->table);
3707 node->entry = UT_LIST_GET_FIRST(node->entry_list);
3708
3709 if (node->ins_type == INS_SEARCHED) {
3710
3711 row_ins_get_row_from_select(node);
3712
3713 } else if (node->ins_type == INS_VALUES) {
3714
3715 row_ins_get_row_from_values(node);
3716 }
3717
3718 node->state = INS_NODE_INSERT_ENTRIES;
3719 }
3720
3721 ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3722
3723 while (node->index != NULL) {
3724 if (node->index->type != DICT_FTS) {
3725 err = row_ins_index_entry_step(node, thr);
3726 switch(err) {
3727 case DB_SUCCESS:
3728 break;
3729 case DB_DUPLICATE_KEY:
3730 thr_get_trx(thr)->error_state
3731 = DB_DUPLICATE_KEY;
3732 thr_get_trx(thr)->error_index
3733 = node->index;
3734 //fall through
3735 default:
3736 DBUG_RETURN(err);
3737 }
3738 }
3739
3740 node->index = dict_table_get_next_index(node->index);
3741 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3742
3743 DBUG_EXECUTE_IF(
3744 "row_ins_skip_sec",
3745 node->index = NULL; node->entry = NULL; break;);
3746
3747 /* Skip corrupted secondary index and its entry */
3748 while (node->index && dict_index_is_corrupted(node->index)) {
3749
3750 node->index = dict_table_get_next_index(node->index);
3751 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3752 }
3753
3754 }
3755
3756 ut_ad(node->entry == NULL);
3757 thr_get_trx(thr)->error_index = NULL;
3758 node->state = INS_NODE_ALLOC_ROW_ID;
3759
3760 DBUG_RETURN(DB_SUCCESS);
3761 }
3762
3763 /***********************************************************//**
3764 Inserts a row to a table. This is a high-level function used in SQL execution
3765 graphs.
3766 @return query thread to run next or NULL */
3767 que_thr_t*
row_ins_step(que_thr_t * thr)3768 row_ins_step(
3769 /*=========*/
3770 que_thr_t* thr) /*!< in: query thread */
3771 {
3772 ins_node_t* node;
3773 que_node_t* parent;
3774 sel_node_t* sel_node;
3775 trx_t* trx;
3776 dberr_t err;
3777
3778 ut_ad(thr);
3779
3780 DEBUG_SYNC_C("innodb_row_ins_step_enter");
3781
3782 trx = thr_get_trx(thr);
3783
3784 trx_start_if_not_started_xa(trx, true);
3785
3786 node = static_cast<ins_node_t*>(thr->run_node);
3787
3788 ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3789 ut_ad(!dict_table_is_intrinsic(node->table));
3790
3791 parent = que_node_get_parent(node);
3792 sel_node = node->select;
3793
3794 if (thr->prev_node == parent) {
3795 node->state = INS_NODE_SET_IX_LOCK;
3796 }
3797
3798 /* If this is the first time this node is executed (or when
3799 execution resumes after wait for the table IX lock), set an
3800 IX lock on the table and reset the possible select node. MySQL's
3801 partitioned table code may also call an insert within the same
3802 SQL statement AFTER it has used this table handle to do a search.
3803 This happens, for example, when a row update moves it to another
3804 partition. In that case, we have already set the IX lock on the
3805 table during the search operation, and there is no need to set
3806 it again here. But we must write trx->id to node->trx_id_buf. */
3807
3808 memset(node->trx_id_buf, 0, DATA_TRX_ID_LEN);
3809 trx_write_trx_id(node->trx_id_buf, trx->id);
3810
3811 if (node->state == INS_NODE_SET_IX_LOCK) {
3812
3813 node->state = INS_NODE_ALLOC_ROW_ID;
3814
3815 /* It may be that the current session has not yet started
3816 its transaction, or it has been committed: */
3817
3818 if (trx->id == node->trx_id) {
3819 /* No need to do IX-locking */
3820
3821 goto same_trx;
3822 }
3823
3824 err = lock_table(0, node->table, LOCK_IX, thr);
3825
3826 DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait",
3827 err = DB_LOCK_WAIT;);
3828
3829 if (err != DB_SUCCESS) {
3830
3831 goto error_handling;
3832 }
3833
3834 node->trx_id = trx->id;
3835 same_trx:
3836 if (node->ins_type == INS_SEARCHED) {
3837 /* Reset the cursor */
3838 sel_node->state = SEL_NODE_OPEN;
3839
3840 /* Fetch a row to insert */
3841
3842 thr->run_node = sel_node;
3843
3844 return(thr);
3845 }
3846 }
3847
3848 if ((node->ins_type == INS_SEARCHED)
3849 && (sel_node->state != SEL_NODE_FETCH)) {
3850
3851 ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3852
3853 /* No more rows to insert */
3854 thr->run_node = parent;
3855
3856 return(thr);
3857 }
3858
3859 /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3860
3861 err = row_ins(node, thr);
3862
3863 error_handling:
3864 trx->error_state = err;
3865
3866 if (err != DB_SUCCESS) {
3867 /* err == DB_LOCK_WAIT or SQL error detected */
3868 return(NULL);
3869 }
3870
3871 /* DO THE TRIGGER ACTIONS HERE */
3872
3873 if (node->ins_type == INS_SEARCHED) {
3874 /* Fetch a row to insert */
3875
3876 thr->run_node = sel_node;
3877 } else {
3878 thr->run_node = que_node_get_parent(node);
3879 }
3880
3881 return(thr);
3882 }
3883