1 /*****************************************************************************
2
3 Copyright (c) 1996, 2014, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
16
17 *****************************************************************************/
18
19 /**************************************************//**
20 @file row/row0ins.c
21 Insert into a table
22
23 Created 4/20/1996 Heikki Tuuri
24 *******************************************************/
25
26 #include "m_string.h" /* for my_sys.h */
27 #include "my_sys.h" /* DEBUG_SYNC_C */
28 #include "row0ins.h"
29
30 #ifdef UNIV_NONINL
31 #include "row0ins.ic"
32 #endif
33
34 #include "ha_prototypes.h"
35 #include "dict0dict.h"
36 #include "dict0boot.h"
37 #include "trx0undo.h"
38 #include "btr0btr.h"
39 #include "btr0cur.h"
40 #include "mach0data.h"
41 #include "que0que.h"
42 #include "row0upd.h"
43 #include "row0sel.h"
44 #include "row0row.h"
45 #include "rem0cmp.h"
46 #include "lock0lock.h"
47 #include "log0log.h"
48 #include "eval0eval.h"
49 #include "data0data.h"
50 #include "usr0sess.h"
51 #include "buf0lru.h"
52 #include "m_string.h"
53 #include "my_sys.h"
54
55 #define ROW_INS_PREV 1
56 #define ROW_INS_NEXT 2
57
58 /*************************************************************************
59 IMPORTANT NOTE: Any operation that generates redo MUST check that there
60 is enough space in the redo log before for that operation. This is
61 done by calling log_free_check(). The reason for checking the
62 availability of the redo log space before the start of the operation is
63 that we MUST not hold any synchonization objects when performing the
64 check.
65 If you make a change in this module make sure that no codepath is
66 introduced where a call to log_free_check() is bypassed. */
67
68 /*********************************************************************//**
69 Creates an insert node struct.
70 @return own: insert node struct */
71 UNIV_INTERN
72 ins_node_t*
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)73 ins_node_create(
74 /*============*/
75 ulint ins_type, /*!< in: INS_VALUES, ... */
76 dict_table_t* table, /*!< in: table where to insert */
77 mem_heap_t* heap) /*!< in: mem heap where created */
78 {
79 ins_node_t* node;
80
81 node = mem_heap_alloc(heap, sizeof(ins_node_t));
82
83 node->common.type = QUE_NODE_INSERT;
84
85 node->ins_type = ins_type;
86
87 node->state = INS_NODE_SET_IX_LOCK;
88 node->table = table;
89 node->index = NULL;
90 node->entry = NULL;
91
92 node->select = NULL;
93
94 node->trx_id = 0;
95
96 node->entry_sys_heap = mem_heap_create(128);
97
98 node->magic_n = INS_NODE_MAGIC_N;
99
100 return(node);
101 }
102
103 /***********************************************************//**
104 Creates an entry template for each index of a table. */
105 UNIV_INTERN
106 void
ins_node_create_entry_list(ins_node_t * node)107 ins_node_create_entry_list(
108 /*=======================*/
109 ins_node_t* node) /*!< in: row insert node */
110 {
111 dict_index_t* index;
112 dtuple_t* entry;
113
114 ut_ad(node->entry_sys_heap);
115
116 UT_LIST_INIT(node->entry_list);
117
118 index = dict_table_get_first_index(node->table);
119
120 while (index != NULL) {
121 entry = row_build_index_entry(node->row, NULL, index,
122 node->entry_sys_heap);
123 UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
124
125 /* We will include all indexes (include those corrupted
126 secondary indexes) in the entry list. Filteration of
127 these corrupted index will be done in row_ins() */
128 index = dict_table_get_next_index(index);
129 }
130 }
131
132 /*****************************************************************//**
133 Adds system field buffers to a row. */
134 static
135 void
row_ins_alloc_sys_fields(ins_node_t * node)136 row_ins_alloc_sys_fields(
137 /*=====================*/
138 ins_node_t* node) /*!< in: insert node */
139 {
140 dtuple_t* row;
141 dict_table_t* table;
142 mem_heap_t* heap;
143 const dict_col_t* col;
144 dfield_t* dfield;
145 byte* ptr;
146 uint len;
147
148 row = node->row;
149 table = node->table;
150 heap = node->entry_sys_heap;
151
152 ut_ad(row && table && heap);
153 ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
154
155 /* allocate buffer to hold the needed system created hidden columns. */
156 len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
157 ptr = mem_heap_zalloc(heap, len);
158
159 /* 1. Populate row-id */
160 col = dict_table_get_sys_col(table, DATA_ROW_ID);
161
162 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
163
164 dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
165
166 node->row_id_buf = ptr;
167
168 ptr += DATA_ROW_ID_LEN;
169
170 /* 2. Populate trx id */
171 col = dict_table_get_sys_col(table, DATA_TRX_ID);
172
173 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
174
175 dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
176
177 node->trx_id_buf = ptr;
178
179 ptr += DATA_TRX_ID_LEN;
180
181 /* 3. Populate roll ptr */
182
183 col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
184
185 dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
186
187 dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
188 }
189
190 /*********************************************************************//**
191 Sets a new row to insert for an INS_DIRECT node. This function is only used
192 if we have constructed the row separately, which is a rare case; this
193 function is quite slow. */
194 UNIV_INTERN
195 void
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)196 ins_node_set_new_row(
197 /*=================*/
198 ins_node_t* node, /*!< in: insert node */
199 dtuple_t* row) /*!< in: new row (or first row) for the node */
200 {
201 node->state = INS_NODE_SET_IX_LOCK;
202 node->index = NULL;
203 node->entry = NULL;
204
205 node->row = row;
206
207 mem_heap_empty(node->entry_sys_heap);
208
209 /* Create templates for index entries */
210
211 ins_node_create_entry_list(node);
212
213 /* Allocate from entry_sys_heap buffers for sys fields */
214
215 row_ins_alloc_sys_fields(node);
216
217 /* As we allocated a new trx id buf, the trx id should be written
218 there again: */
219
220 node->trx_id = 0;
221 }
222
223 /*******************************************************************//**
224 Does an insert operation by updating a delete-marked existing record
225 in the index. This situation can occur if the delete-marked record is
226 kept in the index for consistent reads.
227 @return DB_SUCCESS or error code */
228 static
229 ulint
row_ins_sec_index_entry_by_modify(ulint mode,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)230 row_ins_sec_index_entry_by_modify(
231 /*==============================*/
232 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
233 depending on whether mtr holds just a leaf
234 latch or also a tree latch */
235 btr_cur_t* cursor, /*!< in: B-tree cursor */
236 const dtuple_t* entry, /*!< in: index entry to insert */
237 que_thr_t* thr, /*!< in: query thread */
238 mtr_t* mtr) /*!< in: mtr; must be committed before
239 latching any further pages */
240 {
241 big_rec_t* dummy_big_rec;
242 mem_heap_t* heap;
243 upd_t* update;
244 rec_t* rec;
245 ulint err;
246
247 rec = btr_cur_get_rec(cursor);
248
249 ut_ad(!dict_index_is_clust(cursor->index));
250 ut_ad(rec_get_deleted_flag(rec,
251 dict_table_is_comp(cursor->index->table)));
252
253 /* We know that in the alphabetical ordering, entry and rec are
254 identified. But in their binary form there may be differences if
255 there are char fields in them. Therefore we have to calculate the
256 difference. */
257
258 heap = mem_heap_create(1024);
259
260 update = row_upd_build_sec_rec_difference_binary(
261 cursor->index, entry, rec, thr_get_trx(thr), heap);
262 if (mode == BTR_MODIFY_LEAF) {
263 /* Try an optimistic updating of the record, keeping changes
264 within the page */
265
266 err = btr_cur_optimistic_update(BTR_KEEP_SYS_FLAG, cursor,
267 update, 0, thr, mtr);
268 switch (err) {
269 case DB_OVERFLOW:
270 case DB_UNDERFLOW:
271 case DB_ZIP_OVERFLOW:
272 err = DB_FAIL;
273 }
274 } else {
275 ut_a(mode == BTR_MODIFY_TREE);
276 if (buf_LRU_buf_pool_running_out()) {
277
278 err = DB_LOCK_TABLE_FULL;
279
280 goto func_exit;
281 }
282
283 err = btr_cur_pessimistic_update(BTR_KEEP_SYS_FLAG, cursor,
284 &heap, &dummy_big_rec, update,
285 0, thr, mtr);
286 ut_ad(!dummy_big_rec);
287 }
288 func_exit:
289 mem_heap_free(heap);
290
291 return(err);
292 }
293
294 /*******************************************************************//**
295 Does an insert operation by delete unmarking and updating a delete marked
296 existing record in the index. This situation can occur if the delete marked
297 record is kept in the index for consistent reads.
298 @return DB_SUCCESS, DB_FAIL, or error code */
299 static
300 ulint
row_ins_clust_index_entry_by_modify(ulint mode,btr_cur_t * cursor,mem_heap_t ** heap,big_rec_t ** big_rec,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)301 row_ins_clust_index_entry_by_modify(
302 /*================================*/
303 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
304 depending on whether mtr holds just a leaf
305 latch or also a tree latch */
306 btr_cur_t* cursor, /*!< in: B-tree cursor */
307 mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */
308 big_rec_t** big_rec,/*!< out: possible big rec vector of fields
309 which have to be stored externally by the
310 caller */
311 const dtuple_t* entry, /*!< in: index entry to insert */
312 que_thr_t* thr, /*!< in: query thread */
313 mtr_t* mtr) /*!< in: mtr; must be committed before
314 latching any further pages */
315 {
316 rec_t* rec;
317 upd_t* update;
318 ulint err;
319
320 ut_ad(dict_index_is_clust(cursor->index));
321
322 *big_rec = NULL;
323
324 rec = btr_cur_get_rec(cursor);
325
326 ut_ad(rec_get_deleted_flag(rec,
327 dict_table_is_comp(cursor->index->table)));
328
329 if (!*heap) {
330 *heap = mem_heap_create(1024);
331 }
332
333 /* Build an update vector containing all the fields to be modified;
334 NOTE that this vector may NOT contain system columns trx_id or
335 roll_ptr */
336
337 update = row_upd_build_difference_binary(cursor->index, entry, rec,
338 thr_get_trx(thr), *heap);
339 if (mode == BTR_MODIFY_LEAF) {
340 /* Try optimistic updating of the record, keeping changes
341 within the page */
342
343 err = btr_cur_optimistic_update(0, cursor, update, 0, thr,
344 mtr);
345 switch (err) {
346 case DB_OVERFLOW:
347 case DB_UNDERFLOW:
348 case DB_ZIP_OVERFLOW:
349 err = DB_FAIL;
350 }
351 } else {
352 ut_a(mode == BTR_MODIFY_TREE);
353 if (buf_LRU_buf_pool_running_out()) {
354
355 return(DB_LOCK_TABLE_FULL);
356
357 }
358 err = btr_cur_pessimistic_update(
359 BTR_KEEP_POS_FLAG, cursor, heap, big_rec, update,
360 0, thr, mtr);
361 }
362
363 return(err);
364 }
365
366 /*********************************************************************//**
367 Returns TRUE if in a cascaded update/delete an ancestor node of node
368 updates (not DELETE, but UPDATE) table.
369 @return TRUE if an ancestor updates table */
370 static
371 ibool
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)372 row_ins_cascade_ancestor_updates_table(
373 /*===================================*/
374 que_node_t* node, /*!< in: node in a query graph */
375 dict_table_t* table) /*!< in: table */
376 {
377 que_node_t* parent;
378 upd_node_t* upd_node;
379
380 parent = que_node_get_parent(node);
381
382 while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
383
384 upd_node = parent;
385
386 if (upd_node->table == table && upd_node->is_delete == FALSE) {
387
388 return(TRUE);
389 }
390
391 parent = que_node_get_parent(parent);
392
393 ut_a(parent);
394 }
395
396 return(FALSE);
397 }
398
399 /*********************************************************************//**
400 Returns the number of ancestor UPDATE or DELETE nodes of a
401 cascaded update/delete node.
402 @return number of ancestors */
403 static
404 ulint
row_ins_cascade_n_ancestors(que_node_t * node)405 row_ins_cascade_n_ancestors(
406 /*========================*/
407 que_node_t* node) /*!< in: node in a query graph */
408 {
409 que_node_t* parent;
410 ulint n_ancestors = 0;
411
412 parent = que_node_get_parent(node);
413
414 while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
415 n_ancestors++;
416
417 parent = que_node_get_parent(parent);
418
419 ut_a(parent);
420 }
421
422 return(n_ancestors);
423 }
424
425 /******************************************************************//**
426 Calculates the update vector node->cascade->update for a child table in
427 a cascaded update.
428 @return number of fields in the calculated update vector; the value
429 can also be 0 if no foreign key fields changed; the returned value is
430 ULINT_UNDEFINED if the column type in the child table is too short to
431 fit the new value in the parent table: that means the update fails */
432 static
433 ulint
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap)434 row_ins_cascade_calc_update_vec(
435 /*============================*/
436 upd_node_t* node, /*!< in: update node of the parent
437 table */
438 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
439 type is != 0 */
440 mem_heap_t* heap) /*!< in: memory heap to use as
441 temporary storage */
442 {
443 upd_node_t* cascade = node->cascade_node;
444 dict_table_t* table = foreign->foreign_table;
445 dict_index_t* index = foreign->foreign_index;
446 upd_t* update;
447 dict_table_t* parent_table;
448 dict_index_t* parent_index;
449 upd_t* parent_update;
450 ulint n_fields_updated;
451 ulint parent_field_no;
452 ulint i;
453 ulint j;
454
455 ut_a(node);
456 ut_a(foreign);
457 ut_a(cascade);
458 ut_a(table);
459 ut_a(index);
460
461 /* Calculate the appropriate update vector which will set the fields
462 in the child index record to the same value (possibly padded with
463 spaces if the column is a fixed length CHAR or FIXBINARY column) as
464 the referenced index record will get in the update. */
465
466 parent_table = node->table;
467 ut_a(parent_table == foreign->referenced_table);
468 parent_index = foreign->referenced_index;
469 parent_update = node->update;
470
471 update = cascade->update;
472
473 update->info_bits = 0;
474 update->n_fields = foreign->n_fields;
475
476 n_fields_updated = 0;
477
478 for (i = 0; i < foreign->n_fields; i++) {
479
480 parent_field_no = dict_table_get_nth_col_pos(
481 parent_table,
482 dict_index_get_nth_col_no(parent_index, i));
483
484 for (j = 0; j < parent_update->n_fields; j++) {
485 const upd_field_t* parent_ufield
486 = &parent_update->fields[j];
487
488 if (parent_ufield->field_no == parent_field_no) {
489
490 ulint min_size;
491 const dict_col_t* col;
492 ulint ufield_len;
493 upd_field_t* ufield;
494
495 col = dict_index_get_nth_col(index, i);
496
497 /* A field in the parent index record is
498 updated. Let us make the update vector
499 field for the child table. */
500
501 ufield = update->fields + n_fields_updated;
502
503 ufield->field_no
504 = dict_table_get_nth_col_pos(
505 table, dict_col_get_no(col));
506
507 ufield->orig_len = 0;
508 ufield->exp = NULL;
509
510 ufield->new_val = parent_ufield->new_val;
511 ufield_len = dfield_get_len(&ufield->new_val);
512
513 /* Clear the "external storage" flag */
514 dfield_set_len(&ufield->new_val, ufield_len);
515
516 /* Do not allow a NOT NULL column to be
517 updated as NULL */
518
519 if (dfield_is_null(&ufield->new_val)
520 && (col->prtype & DATA_NOT_NULL)) {
521
522 return(ULINT_UNDEFINED);
523 }
524
525 /* If the new value would not fit in the
526 column, do not allow the update */
527
528 if (!dfield_is_null(&ufield->new_val)
529 && dtype_get_at_most_n_mbchars(
530 col->prtype, col->mbminmaxlen,
531 col->len,
532 ufield_len,
533 dfield_get_data(&ufield->new_val))
534 < ufield_len) {
535
536 return(ULINT_UNDEFINED);
537 }
538
539 /* If the parent column type has a different
540 length than the child column type, we may
541 need to pad with spaces the new value of the
542 child column */
543
544 min_size = dict_col_get_min_size(col);
545
546 /* Because UNIV_SQL_NULL (the marker
547 of SQL NULL values) exceeds all possible
548 values of min_size, the test below will
549 not hold for SQL NULL columns. */
550
551 if (min_size > ufield_len) {
552
553 byte* pad;
554 ulint pad_len;
555 byte* padded_data;
556 ulint mbminlen;
557
558 padded_data = mem_heap_alloc(
559 heap, min_size);
560
561 pad = padded_data + ufield_len;
562 pad_len = min_size - ufield_len;
563
564 memcpy(padded_data,
565 dfield_get_data(&ufield
566 ->new_val),
567 ufield_len);
568
569 mbminlen = dict_col_get_mbminlen(col);
570
571 ut_ad(!(ufield_len % mbminlen));
572 ut_ad(!(min_size % mbminlen));
573
574 if (mbminlen == 1
575 && dtype_get_charset_coll(
576 col->prtype)
577 == DATA_MYSQL_BINARY_CHARSET_COLL) {
578 /* Do not pad BINARY columns */
579 return(ULINT_UNDEFINED);
580 }
581
582 row_mysql_pad_col(mbminlen,
583 pad, pad_len);
584 dfield_set_data(&ufield->new_val,
585 padded_data, min_size);
586 }
587
588 n_fields_updated++;
589 }
590 }
591 }
592
593 update->n_fields = n_fields_updated;
594
595 return(n_fields_updated);
596 }
597
598 /*********************************************************************//**
599 Set detailed error message associated with foreign key errors for
600 the given transaction. */
601 static
602 void
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)603 row_ins_set_detailed(
604 /*=================*/
605 trx_t* trx, /*!< in: transaction */
606 dict_foreign_t* foreign) /*!< in: foreign key constraint */
607 {
608 mutex_enter(&srv_misc_tmpfile_mutex);
609 rewind(srv_misc_tmpfile);
610
611 if (os_file_set_eof(srv_misc_tmpfile)) {
612 ut_print_name(srv_misc_tmpfile, trx, TRUE,
613 foreign->foreign_table_name);
614 dict_print_info_on_foreign_key_in_create_format(
615 srv_misc_tmpfile, trx, foreign, FALSE);
616 trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
617 } else {
618 trx_set_detailed_error(trx, "temp file operation failed");
619 }
620
621 mutex_exit(&srv_misc_tmpfile_mutex);
622 }
623
624 /*********************************************************************//**
625 Reports a foreign key error associated with an update or a delete of a
626 parent table index entry. */
627 static
628 void
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)629 row_ins_foreign_report_err(
630 /*=======================*/
631 const char* errstr, /*!< in: error string from the viewpoint
632 of the parent table */
633 que_thr_t* thr, /*!< in: query thread whose run_node
634 is an update node */
635 dict_foreign_t* foreign, /*!< in: foreign key constraint */
636 const rec_t* rec, /*!< in: a matching index record in the
637 child table */
638 const dtuple_t* entry) /*!< in: index entry in the parent
639 table */
640 {
641 FILE* ef = dict_foreign_err_file;
642 trx_t* trx = thr_get_trx(thr);
643
644 row_ins_set_detailed(trx, foreign);
645
646 mutex_enter(&dict_foreign_err_mutex);
647 rewind(ef);
648 ut_print_timestamp(ef);
649 fputs(" Transaction:\n", ef);
650 trx_print(ef, trx, 600);
651
652 fputs("Foreign key constraint fails for table ", ef);
653 ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
654 fputs(":\n", ef);
655 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
656 TRUE);
657 putc('\n', ef);
658 fputs(errstr, ef);
659 fputs(" in parent table, in index ", ef);
660 ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
661 if (entry) {
662 fputs(" tuple:\n", ef);
663 dtuple_print(ef, entry);
664 }
665 fputs("\nBut in child table ", ef);
666 ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
667 fputs(", in index ", ef);
668 ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
669 if (rec) {
670 fputs(", there is a record:\n", ef);
671 rec_print(ef, rec, foreign->foreign_index);
672 } else {
673 fputs(", the record is not available\n", ef);
674 }
675 putc('\n', ef);
676
677 mutex_exit(&dict_foreign_err_mutex);
678 }
679
680 /*********************************************************************//**
681 Reports a foreign key error to dict_foreign_err_file when we are trying
682 to add an index entry to a child table. Note that the adding may be the result
683 of an update, too. */
684 static
685 void
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)686 row_ins_foreign_report_add_err(
687 /*===========================*/
688 trx_t* trx, /*!< in: transaction */
689 dict_foreign_t* foreign, /*!< in: foreign key constraint */
690 const rec_t* rec, /*!< in: a record in the parent table:
691 it does not match entry because we
692 have an error! */
693 const dtuple_t* entry) /*!< in: index entry to insert in the
694 child table */
695 {
696 FILE* ef = dict_foreign_err_file;
697
698 row_ins_set_detailed(trx, foreign);
699
700 mutex_enter(&dict_foreign_err_mutex);
701 rewind(ef);
702 ut_print_timestamp(ef);
703 fputs(" Transaction:\n", ef);
704 trx_print(ef, trx, 600);
705 fputs("Foreign key constraint fails for table ", ef);
706 ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
707 fputs(":\n", ef);
708 dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
709 TRUE);
710 fputs("\nTrying to add in child table, in index ", ef);
711 ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
712 if (entry) {
713 fputs(" tuple:\n", ef);
714 /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
715 It would be better to only display the user columns. */
716 dtuple_print(ef, entry);
717 }
718 fputs("\nBut in parent table ", ef);
719 ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
720 fputs(", in index ", ef);
721 ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
722 fputs(",\nthe closest match we can find is record:\n", ef);
723 if (rec && page_rec_is_supremum(rec)) {
724 /* If the cursor ended on a supremum record, it is better
725 to report the previous record in the error message, so that
726 the user gets a more descriptive error message. */
727 rec = page_rec_get_prev_const(rec);
728 }
729
730 if (rec) {
731 rec_print(ef, rec, foreign->referenced_index);
732 }
733 putc('\n', ef);
734
735 mutex_exit(&dict_foreign_err_mutex);
736 }
737
738 /*********************************************************************//**
739 Invalidate the query cache for the given table. */
740 static
741 void
row_ins_invalidate_query_cache(que_thr_t * thr,const char * name)742 row_ins_invalidate_query_cache(
743 /*===========================*/
744 que_thr_t* thr, /*!< in: query thread whose run_node
745 is an update node */
746 const char* name) /*!< in: table name prefixed with
747 database name and a '/' character */
748 {
749 char* buf;
750 char* ptr;
751 ulint len = strlen(name) + 1;
752
753 buf = mem_strdupl(name, len);
754
755 ptr = strchr(buf, '/');
756 ut_a(ptr);
757 *ptr = '\0';
758
759 innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
760 mem_free(buf);
761 }
762
763 /*********************************************************************//**
764 Perform referential actions or checks when a parent row is deleted or updated
765 and the constraint had an ON DELETE or ON UPDATE condition which was not
766 RESTRICT.
767 @return DB_SUCCESS, DB_LOCK_WAIT, or error code */
768 static
769 ulint
row_ins_foreign_check_on_constraint(que_thr_t * thr,dict_foreign_t * foreign,btr_pcur_t * pcur,dtuple_t * entry,mtr_t * mtr)770 row_ins_foreign_check_on_constraint(
771 /*================================*/
772 que_thr_t* thr, /*!< in: query thread whose run_node
773 is an update node */
774 dict_foreign_t* foreign, /*!< in: foreign key constraint whose
775 type is != 0 */
776 btr_pcur_t* pcur, /*!< in: cursor placed on a matching
777 index record in the child table */
778 dtuple_t* entry, /*!< in: index entry in the parent
779 table */
780 mtr_t* mtr) /*!< in: mtr holding the latch of pcur
781 page */
782 {
783 upd_node_t* node;
784 upd_node_t* cascade;
785 dict_table_t* table = foreign->foreign_table;
786 dict_index_t* index;
787 dict_index_t* clust_index;
788 dtuple_t* ref;
789 mem_heap_t* upd_vec_heap = NULL;
790 const rec_t* rec;
791 const rec_t* clust_rec;
792 const buf_block_t* clust_block;
793 upd_t* update;
794 ulint n_to_update;
795 ulint err;
796 ulint i;
797 trx_t* trx;
798 mem_heap_t* tmp_heap = NULL;
799
800 ut_a(thr);
801 ut_a(foreign);
802 ut_a(pcur);
803 ut_a(mtr);
804
805 trx = thr_get_trx(thr);
806
807 /* Since we are going to delete or update a row, we have to invalidate
808 the MySQL query cache for table. A deadlock of threads is not possible
809 here because the caller of this function does not hold any latches with
810 the sync0sync.h rank above the kernel mutex. The query cache mutex has
811 a rank just above the kernel mutex. */
812
813 row_ins_invalidate_query_cache(thr, table->name);
814
815 node = thr->run_node;
816
817 if (node->is_delete && 0 == (foreign->type
818 & (DICT_FOREIGN_ON_DELETE_CASCADE
819 | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
820
821 row_ins_foreign_report_err("Trying to delete",
822 thr, foreign,
823 btr_pcur_get_rec(pcur), entry);
824
825 return(DB_ROW_IS_REFERENCED);
826 }
827
828 if (!node->is_delete && 0 == (foreign->type
829 & (DICT_FOREIGN_ON_UPDATE_CASCADE
830 | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
831
832 /* This is an UPDATE */
833
834 row_ins_foreign_report_err("Trying to update",
835 thr, foreign,
836 btr_pcur_get_rec(pcur), entry);
837
838 return(DB_ROW_IS_REFERENCED);
839 }
840
841 if (node->cascade_node == NULL) {
842 /* Extend our query graph by creating a child to current
843 update node. The child is used in the cascade or set null
844 operation. */
845
846 node->cascade_heap = mem_heap_create(128);
847 node->cascade_node = row_create_update_node_for_mysql(
848 table, node->cascade_heap);
849 que_node_set_parent(node->cascade_node, node);
850 }
851
852 /* Initialize cascade_node to do the operation we want. Note that we
853 use the SAME cascade node to do all foreign key operations of the
854 SQL DELETE: the table of the cascade node may change if there are
855 several child tables to the table where the delete is done! */
856
857 cascade = node->cascade_node;
858
859 cascade->table = table;
860
861 cascade->foreign = foreign;
862
863 if (node->is_delete
864 && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
865 cascade->is_delete = TRUE;
866 } else {
867 cascade->is_delete = FALSE;
868
869 if (foreign->n_fields > cascade->update_n_fields) {
870 /* We have to make the update vector longer */
871
872 cascade->update = upd_create(foreign->n_fields,
873 node->cascade_heap);
874 cascade->update_n_fields = foreign->n_fields;
875 }
876 }
877
878 /* We do not allow cyclic cascaded updating (DELETE is allowed,
879 but not UPDATE) of the same table, as this can lead to an infinite
880 cycle. Check that we are not updating the same table which is
881 already being modified in this cascade chain. We have to check
882 this also because the modification of the indexes of a 'parent'
883 table may still be incomplete, and we must avoid seeing the indexes
884 of the parent table in an inconsistent state! */
885
886 if (!cascade->is_delete
887 && row_ins_cascade_ancestor_updates_table(cascade, table)) {
888
889 /* We do not know if this would break foreign key
890 constraints, but play safe and return an error */
891
892 err = DB_ROW_IS_REFERENCED;
893
894 row_ins_foreign_report_err(
895 "Trying an update, possibly causing a cyclic"
896 " cascaded update\n"
897 "in the child table,", thr, foreign,
898 btr_pcur_get_rec(pcur), entry);
899
900 goto nonstandard_exit_func;
901 }
902
903 if (row_ins_cascade_n_ancestors(cascade) >= 15) {
904 err = DB_ROW_IS_REFERENCED;
905
906 row_ins_foreign_report_err(
907 "Trying a too deep cascaded delete or update\n",
908 thr, foreign, btr_pcur_get_rec(pcur), entry);
909
910 goto nonstandard_exit_func;
911 }
912
913 index = btr_pcur_get_btr_cur(pcur)->index;
914
915 ut_a(index == foreign->foreign_index);
916
917 rec = btr_pcur_get_rec(pcur);
918
919 if (dict_index_is_clust(index)) {
920 /* pcur is already positioned in the clustered index of
921 the child table */
922
923 clust_index = index;
924 clust_rec = rec;
925 clust_block = btr_pcur_get_block(pcur);
926 } else {
927 /* We have to look for the record in the clustered index
928 in the child table */
929
930 clust_index = dict_table_get_first_index(table);
931
932 tmp_heap = mem_heap_create(256);
933
934 ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
935 tmp_heap);
936 btr_pcur_open_with_no_init(clust_index, ref,
937 PAGE_CUR_LE, BTR_SEARCH_LEAF,
938 cascade->pcur, 0, mtr);
939
940 clust_rec = btr_pcur_get_rec(cascade->pcur);
941 clust_block = btr_pcur_get_block(cascade->pcur);
942
943 if (!page_rec_is_user_rec(clust_rec)
944 || btr_pcur_get_low_match(cascade->pcur)
945 < dict_index_get_n_unique(clust_index)) {
946
947 fputs("InnoDB: error in cascade of a foreign key op\n"
948 "InnoDB: ", stderr);
949 dict_index_name_print(stderr, trx, index);
950
951 fputs("\n"
952 "InnoDB: record ", stderr);
953 rec_print(stderr, rec, index);
954 fputs("\n"
955 "InnoDB: clustered record ", stderr);
956 rec_print(stderr, clust_rec, clust_index);
957 fputs("\n"
958 "InnoDB: Submit a detailed bug report to"
959 " http://bugs.mysql.com\n", stderr);
960 ut_ad(0);
961 err = DB_SUCCESS;
962
963 goto nonstandard_exit_func;
964 }
965 }
966
967 /* Set an X-lock on the row to delete or update in the child table */
968
969 err = lock_table(0, table, LOCK_IX, thr);
970
971 if (err == DB_SUCCESS) {
972 /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
973 we already have a normal shared lock on the appropriate
974 gap if the search criterion was not unique */
975
976 err = lock_clust_rec_read_check_and_lock_alt(
977 0, clust_block, clust_rec, clust_index,
978 LOCK_X, LOCK_REC_NOT_GAP, thr);
979 }
980
981 if (err != DB_SUCCESS) {
982
983 goto nonstandard_exit_func;
984 }
985
986 if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
987 /* This can happen if there is a circular reference of
988 rows such that cascading delete comes to delete a row
989 already in the process of being delete marked */
990 err = DB_SUCCESS;
991
992 goto nonstandard_exit_func;
993 }
994
995 if (node->is_delete
996 ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
997 : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
998
999 /* Build the appropriate update vector which sets
1000 foreign->n_fields first fields in rec to SQL NULL */
1001
1002 update = cascade->update;
1003
1004 update->info_bits = 0;
1005 update->n_fields = foreign->n_fields;
1006 UNIV_MEM_INVALID(update->fields,
1007 update->n_fields * sizeof *update->fields);
1008
1009 for (i = 0; i < foreign->n_fields; i++) {
1010 upd_field_t* ufield = &update->fields[i];
1011
1012 ufield->field_no = dict_table_get_nth_col_pos(
1013 table,
1014 dict_index_get_nth_col_no(index, i));
1015 ufield->orig_len = 0;
1016 ufield->exp = NULL;
1017 dfield_set_null(&ufield->new_val);
1018 }
1019 }
1020
1021 if (!node->is_delete
1022 && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1023
1024 /* Build the appropriate update vector which sets changing
1025 foreign->n_fields first fields in rec to new values */
1026
1027 upd_vec_heap = mem_heap_create(256);
1028
1029 n_to_update = row_ins_cascade_calc_update_vec(node, foreign,
1030 upd_vec_heap);
1031 if (n_to_update == ULINT_UNDEFINED) {
1032 err = DB_ROW_IS_REFERENCED;
1033
1034 row_ins_foreign_report_err(
1035 "Trying a cascaded update where the"
1036 " updated value in the child\n"
1037 "table would not fit in the length"
1038 " of the column, or the value would\n"
1039 "be NULL and the column is"
1040 " declared as not NULL in the child table,",
1041 thr, foreign, btr_pcur_get_rec(pcur), entry);
1042
1043 goto nonstandard_exit_func;
1044 }
1045
1046 if (cascade->update->n_fields == 0) {
1047
1048 /* The update does not change any columns referred
1049 to in this foreign key constraint: no need to do
1050 anything */
1051
1052 err = DB_SUCCESS;
1053
1054 goto nonstandard_exit_func;
1055 }
1056 }
1057
1058 /* Store pcur position and initialize or store the cascade node
1059 pcur stored position */
1060
1061 btr_pcur_store_position(pcur, mtr);
1062
1063 if (index == clust_index) {
1064 btr_pcur_copy_stored_position(cascade->pcur, pcur);
1065 } else {
1066 btr_pcur_store_position(cascade->pcur, mtr);
1067 }
1068
1069 mtr_commit(mtr);
1070
1071 ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
1072
1073 cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1074
1075 err = row_update_cascade_for_mysql(thr, cascade,
1076 foreign->foreign_table);
1077
1078 if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
1079 fprintf(stderr,
1080 "InnoDB: error: table %s has the counter 0"
1081 " though there is\n"
1082 "InnoDB: a FOREIGN KEY check running on it.\n",
1083 foreign->foreign_table->name);
1084 }
1085
1086 /* Release the data dictionary latch for a while, so that we do not
1087 starve other threads from doing CREATE TABLE etc. if we have a huge
1088 cascaded operation running. The counter n_foreign_key_checks_running
1089 will prevent other users from dropping or ALTERing the table when we
1090 release the latch. */
1091
1092 row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
1093
1094 DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1095
1096 row_mysql_freeze_data_dictionary(thr_get_trx(thr));
1097
1098 mtr_start(mtr);
1099
1100 /* Restore pcur position */
1101
1102 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1103
1104 if (tmp_heap) {
1105 mem_heap_free(tmp_heap);
1106 }
1107
1108 if (upd_vec_heap) {
1109 mem_heap_free(upd_vec_heap);
1110 }
1111
1112 return(err);
1113
1114 nonstandard_exit_func:
1115 if (tmp_heap) {
1116 mem_heap_free(tmp_heap);
1117 }
1118
1119 if (upd_vec_heap) {
1120 mem_heap_free(upd_vec_heap);
1121 }
1122
1123 btr_pcur_store_position(pcur, mtr);
1124
1125 mtr_commit(mtr);
1126 mtr_start(mtr);
1127
1128 btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1129
1130 return(err);
1131 }
1132
1133 /*********************************************************************//**
1134 Sets a shared lock on a record. Used in locking possible duplicate key
1135 records and also in checking foreign key constraints.
1136 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1137 static
1138 enum db_err
row_ins_set_shared_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1139 row_ins_set_shared_rec_lock(
1140 /*========================*/
1141 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1142 LOCK_REC_NOT_GAP type lock */
1143 const buf_block_t* block, /*!< in: buffer block of rec */
1144 const rec_t* rec, /*!< in: record */
1145 dict_index_t* index, /*!< in: index */
1146 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1147 que_thr_t* thr) /*!< in: query thread */
1148 {
1149 enum db_err err;
1150
1151 ut_ad(rec_offs_validate(rec, index, offsets));
1152
1153 if (dict_index_is_clust(index)) {
1154 err = lock_clust_rec_read_check_and_lock(
1155 0, block, rec, index, offsets, LOCK_S, type, thr);
1156 } else {
1157 err = lock_sec_rec_read_check_and_lock(
1158 0, block, rec, index, offsets, LOCK_S, type, thr);
1159 }
1160
1161 return(err);
1162 }
1163
1164 /*********************************************************************//**
1165 Sets a exclusive lock on a record. Used in locking possible duplicate key
1166 records
1167 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
1168 static
1169 enum db_err
row_ins_set_exclusive_rec_lock(ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1170 row_ins_set_exclusive_rec_lock(
1171 /*===========================*/
1172 ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
1173 LOCK_REC_NOT_GAP type lock */
1174 const buf_block_t* block, /*!< in: buffer block of rec */
1175 const rec_t* rec, /*!< in: record */
1176 dict_index_t* index, /*!< in: index */
1177 const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
1178 que_thr_t* thr) /*!< in: query thread */
1179 {
1180 enum db_err err;
1181
1182 ut_ad(rec_offs_validate(rec, index, offsets));
1183
1184 if (dict_index_is_clust(index)) {
1185 err = lock_clust_rec_read_check_and_lock(
1186 0, block, rec, index, offsets, LOCK_X, type, thr);
1187 } else {
1188 err = lock_sec_rec_read_check_and_lock(
1189 0, block, rec, index, offsets, LOCK_X, type, thr);
1190 }
1191
1192 return(err);
1193 }
1194
1195 /***************************************************************//**
1196 Checks if foreign key constraint fails for an index entry. Sets shared locks
1197 which lock either the success or the failure of the constraint. NOTE that
1198 the caller must have a shared latch on dict_operation_lock.
1199 @return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
1200 UNIV_INTERN
1201 ulint
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1202 row_ins_check_foreign_constraint(
1203 /*=============================*/
1204 ibool check_ref,/*!< in: TRUE if we want to check that
1205 the referenced table is ok, FALSE if we
1206 want to check the foreign key table */
1207 dict_foreign_t* foreign,/*!< in: foreign constraint; NOTE that the
1208 tables mentioned in it must be in the
1209 dictionary cache if they exist at all */
1210 dict_table_t* table, /*!< in: if check_ref is TRUE, then the foreign
1211 table, else the referenced table */
1212 dtuple_t* entry, /*!< in: index entry for index */
1213 que_thr_t* thr) /*!< in: query thread */
1214 {
1215 upd_node_t* upd_node;
1216 dict_table_t* check_table;
1217 dict_index_t* check_index;
1218 ulint n_fields_cmp;
1219 btr_pcur_t pcur;
1220 int cmp;
1221 ulint err;
1222 ulint i;
1223 mtr_t mtr;
1224 trx_t* trx = thr_get_trx(thr);
1225 mem_heap_t* heap = NULL;
1226 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1227 ulint* offsets = offsets_;
1228 rec_offs_init(offsets_);
1229
1230 run_again:
1231 #ifdef UNIV_SYNC_DEBUG
1232 ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
1233 #endif /* UNIV_SYNC_DEBUG */
1234
1235 err = DB_SUCCESS;
1236
1237 if (trx->check_foreigns == FALSE) {
1238 /* The user has suppressed foreign key checks currently for
1239 this session */
1240 goto exit_func;
1241 }
1242
1243 /* If any of the foreign key fields in entry is SQL NULL, we
1244 suppress the foreign key check: this is compatible with Oracle,
1245 for example */
1246
1247 for (i = 0; i < foreign->n_fields; i++) {
1248 if (UNIV_SQL_NULL == dfield_get_len(
1249 dtuple_get_nth_field(entry, i))) {
1250
1251 goto exit_func;
1252 }
1253 }
1254
1255 if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1256 upd_node = thr->run_node;
1257
1258 if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1259 /* If a cascaded update is done as defined by a
1260 foreign key constraint, do not check that
1261 constraint for the child row. In ON UPDATE CASCADE
1262 the update of the parent row is only half done when
1263 we come here: if we would check the constraint here
1264 for the child row it would fail.
1265
1266 A QUESTION remains: if in the child table there are
1267 several constraints which refer to the same parent
1268 table, we should merge all updates to the child as
1269 one update? And the updates can be contradictory!
1270 Currently we just perform the update associated
1271 with each foreign key constraint, one after
1272 another, and the user has problems predicting in
1273 which order they are performed. */
1274
1275 goto exit_func;
1276 }
1277 }
1278
1279 if (check_ref) {
1280 check_table = foreign->referenced_table;
1281 check_index = foreign->referenced_index;
1282 } else {
1283 check_table = foreign->foreign_table;
1284 check_index = foreign->foreign_index;
1285 }
1286
1287 if (check_table == NULL || check_table->ibd_file_missing
1288 || check_index == NULL) {
1289 if (check_ref) {
1290 FILE* ef = dict_foreign_err_file;
1291
1292 row_ins_set_detailed(trx, foreign);
1293
1294 mutex_enter(&dict_foreign_err_mutex);
1295 rewind(ef);
1296 ut_print_timestamp(ef);
1297 fputs(" Transaction:\n", ef);
1298 trx_print(ef, trx, 600);
1299 fputs("Foreign key constraint fails for table ", ef);
1300 ut_print_name(ef, trx, TRUE,
1301 foreign->foreign_table_name);
1302 fputs(":\n", ef);
1303 dict_print_info_on_foreign_key_in_create_format(
1304 ef, trx, foreign, TRUE);
1305 fputs("\nTrying to add to index ", ef);
1306 ut_print_name(ef, trx, FALSE,
1307 foreign->foreign_index->name);
1308 fputs(" tuple:\n", ef);
1309 dtuple_print(ef, entry);
1310 fputs("\nBut the parent table ", ef);
1311 ut_print_name(ef, trx, TRUE,
1312 foreign->referenced_table_name);
1313 fputs("\nor its .ibd file does"
1314 " not currently exist!\n", ef);
1315 mutex_exit(&dict_foreign_err_mutex);
1316
1317 err = DB_NO_REFERENCED_ROW;
1318 }
1319
1320 goto exit_func;
1321 }
1322
1323 if (check_table != table) {
1324 /* We already have a LOCK_IX on table, but not necessarily
1325 on check_table */
1326
1327 err = lock_table(0, check_table, LOCK_IS, thr);
1328
1329 if (err != DB_SUCCESS) {
1330
1331 goto do_possible_lock_wait;
1332 }
1333 }
1334
1335 mtr_start(&mtr);
1336
1337 /* Store old value on n_fields_cmp */
1338
1339 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1340
1341 dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1342
1343 btr_pcur_open(check_index, entry, PAGE_CUR_GE,
1344 BTR_SEARCH_LEAF, &pcur, &mtr);
1345
1346 /* Scan index records and check if there is a matching record */
1347
1348 do {
1349 const rec_t* rec = btr_pcur_get_rec(&pcur);
1350 const buf_block_t* block = btr_pcur_get_block(&pcur);
1351
1352 if (page_rec_is_infimum(rec)) {
1353
1354 continue;
1355 }
1356
1357 offsets = rec_get_offsets(rec, check_index,
1358 offsets, ULINT_UNDEFINED, &heap);
1359
1360 if (page_rec_is_supremum(rec)) {
1361
1362 err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
1363 rec, check_index,
1364 offsets, thr);
1365 switch (err) {
1366 case DB_SUCCESS_LOCKED_REC:
1367 case DB_SUCCESS:
1368 continue;
1369 default:
1370 goto end_scan;
1371 }
1372 }
1373
1374 cmp = cmp_dtuple_rec(entry, rec, offsets);
1375
1376 if (cmp == 0) {
1377 if (rec_get_deleted_flag(rec,
1378 rec_offs_comp(offsets))) {
1379 err = row_ins_set_shared_rec_lock(
1380 LOCK_ORDINARY, block,
1381 rec, check_index, offsets, thr);
1382 switch (err) {
1383 case DB_SUCCESS_LOCKED_REC:
1384 case DB_SUCCESS:
1385 break;
1386 default:
1387 goto end_scan;
1388 }
1389 } else {
1390 /* Found a matching record. Lock only
1391 a record because we can allow inserts
1392 into gaps */
1393
1394 err = row_ins_set_shared_rec_lock(
1395 LOCK_REC_NOT_GAP, block,
1396 rec, check_index, offsets, thr);
1397
1398 switch (err) {
1399 case DB_SUCCESS_LOCKED_REC:
1400 case DB_SUCCESS:
1401 break;
1402 default:
1403 goto end_scan;
1404 }
1405
1406 if (check_ref) {
1407 err = DB_SUCCESS;
1408
1409 goto end_scan;
1410 } else if (foreign->type != 0) {
1411 /* There is an ON UPDATE or ON DELETE
1412 condition: check them in a separate
1413 function */
1414
1415 err = row_ins_foreign_check_on_constraint(
1416 thr, foreign, &pcur, entry,
1417 &mtr);
1418 if (err != DB_SUCCESS) {
1419 /* Since reporting a plain
1420 "duplicate key" error
1421 message to the user in
1422 cases where a long CASCADE
1423 operation would lead to a
1424 duplicate key in some
1425 other table is very
1426 confusing, map duplicate
1427 key errors resulting from
1428 FK constraints to a
1429 separate error code. */
1430
1431 if (err == DB_DUPLICATE_KEY) {
1432 err = DB_FOREIGN_DUPLICATE_KEY;
1433 }
1434
1435 goto end_scan;
1436 }
1437
1438 /* row_ins_foreign_check_on_constraint
1439 may have repositioned pcur on a
1440 different block */
1441 block = btr_pcur_get_block(&pcur);
1442 } else {
1443 row_ins_foreign_report_err(
1444 "Trying to delete or update",
1445 thr, foreign, rec, entry);
1446
1447 err = DB_ROW_IS_REFERENCED;
1448 goto end_scan;
1449 }
1450 }
1451 } else {
1452 ut_a(cmp < 0);
1453
1454 err = row_ins_set_shared_rec_lock(
1455 LOCK_GAP, block,
1456 rec, check_index, offsets, thr);
1457
1458 switch (err) {
1459 case DB_SUCCESS_LOCKED_REC:
1460 case DB_SUCCESS:
1461 if (check_ref) {
1462 err = DB_NO_REFERENCED_ROW;
1463 row_ins_foreign_report_add_err(
1464 trx, foreign, rec, entry);
1465 } else {
1466 err = DB_SUCCESS;
1467 }
1468 }
1469
1470 goto end_scan;
1471 }
1472 } while (btr_pcur_move_to_next(&pcur, &mtr));
1473
1474 if (check_ref) {
1475 row_ins_foreign_report_add_err(
1476 trx, foreign, btr_pcur_get_rec(&pcur), entry);
1477 err = DB_NO_REFERENCED_ROW;
1478 } else {
1479 err = DB_SUCCESS;
1480 }
1481
1482 end_scan:
1483 btr_pcur_close(&pcur);
1484
1485 mtr_commit(&mtr);
1486
1487 /* Restore old value */
1488 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1489
1490 do_possible_lock_wait:
1491 if (err == DB_LOCK_WAIT) {
1492 trx->error_state = err;
1493
1494 que_thr_stop_for_mysql(thr);
1495
1496 srv_suspend_mysql_thread(thr);
1497
1498 if (trx->error_state == DB_SUCCESS) {
1499
1500 goto run_again;
1501 }
1502
1503 err = trx->error_state;
1504 }
1505
1506 exit_func:
1507 if (UNIV_LIKELY_NULL(heap)) {
1508 mem_heap_free(heap);
1509 }
1510 return(err);
1511 }
1512
1513 /***************************************************************//**
1514 Checks if foreign key constraints fail for an index entry. If index
1515 is not mentioned in any constraint, this function does nothing,
1516 Otherwise does searches to the indexes of referenced tables and
1517 sets shared locks which lock either the success or the failure of
1518 a constraint.
1519 @return DB_SUCCESS or error code */
1520 static
1521 ulint
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1522 row_ins_check_foreign_constraints(
1523 /*==============================*/
1524 dict_table_t* table, /*!< in: table */
1525 dict_index_t* index, /*!< in: index */
1526 dtuple_t* entry, /*!< in: index entry for index */
1527 que_thr_t* thr) /*!< in: query thread */
1528 {
1529 dict_foreign_t* foreign;
1530 ulint err;
1531 trx_t* trx;
1532 ibool got_s_lock = FALSE;
1533
1534 trx = thr_get_trx(thr);
1535
1536 foreign = UT_LIST_GET_FIRST(table->foreign_list);
1537
1538 while (foreign) {
1539 if (foreign->foreign_index == index) {
1540
1541 if (foreign->referenced_table == NULL) {
1542 dict_table_get(foreign->referenced_table_name_lookup,
1543 FALSE,
1544 DICT_ERR_IGNORE_NONE);
1545 }
1546
1547 if (0 == trx->dict_operation_lock_mode) {
1548 got_s_lock = TRUE;
1549
1550 row_mysql_freeze_data_dictionary(trx);
1551 }
1552
1553 if (foreign->referenced_table) {
1554 mutex_enter(&(dict_sys->mutex));
1555
1556 (foreign->referenced_table
1557 ->n_foreign_key_checks_running)++;
1558
1559 mutex_exit(&(dict_sys->mutex));
1560 }
1561
1562 /* NOTE that if the thread ends up waiting for a lock
1563 we will release dict_operation_lock temporarily!
1564 But the counter on the table protects the referenced
1565 table from being dropped while the check is running. */
1566
1567 err = row_ins_check_foreign_constraint(
1568 TRUE, foreign, table, entry, thr);
1569
1570 if (foreign->referenced_table) {
1571 mutex_enter(&(dict_sys->mutex));
1572
1573 ut_a(foreign->referenced_table
1574 ->n_foreign_key_checks_running > 0);
1575 (foreign->referenced_table
1576 ->n_foreign_key_checks_running)--;
1577
1578 mutex_exit(&(dict_sys->mutex));
1579 }
1580
1581 if (got_s_lock) {
1582 row_mysql_unfreeze_data_dictionary(trx);
1583 }
1584
1585 if (err != DB_SUCCESS) {
1586 return(err);
1587 }
1588 }
1589
1590 foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
1591 }
1592
1593 return(DB_SUCCESS);
1594 }
1595
1596 /***************************************************************//**
1597 Checks if a unique key violation to rec would occur at the index entry
1598 insert.
1599 @return TRUE if error */
1600 static
1601 ibool
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1602 row_ins_dupl_error_with_rec(
1603 /*========================*/
1604 const rec_t* rec, /*!< in: user record; NOTE that we assume
1605 that the caller already has a record lock on
1606 the record! */
1607 const dtuple_t* entry, /*!< in: entry to insert */
1608 dict_index_t* index, /*!< in: index */
1609 const ulint* offsets)/*!< in: rec_get_offsets(rec, index) */
1610 {
1611 ulint matched_fields;
1612 ulint matched_bytes;
1613 ulint n_unique;
1614 ulint i;
1615
1616 ut_ad(rec_offs_validate(rec, index, offsets));
1617
1618 n_unique = dict_index_get_n_unique(index);
1619
1620 matched_fields = 0;
1621 matched_bytes = 0;
1622
1623 cmp_dtuple_rec_with_match(entry, rec, offsets,
1624 &matched_fields, &matched_bytes);
1625
1626 if (matched_fields < n_unique) {
1627
1628 return(FALSE);
1629 }
1630
1631 /* In a unique secondary index we allow equal key values if they
1632 contain SQL NULLs */
1633
1634 if (!dict_index_is_clust(index)) {
1635
1636 for (i = 0; i < n_unique; i++) {
1637 if (UNIV_SQL_NULL == dfield_get_len(
1638 dtuple_get_nth_field(entry, i))) {
1639
1640 return(FALSE);
1641 }
1642 }
1643 }
1644
1645 return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1646 }
1647
1648 /***************************************************************//**
1649 Scans a unique non-clustered index at a given index entry to determine
1650 whether a uniqueness violation has occurred for the key value of the entry.
1651 Set shared locks on possible duplicate records.
1652 @return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
1653 static
1654 ulint
row_ins_scan_sec_index_for_duplicate(dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1655 row_ins_scan_sec_index_for_duplicate(
1656 /*=================================*/
1657 dict_index_t* index, /*!< in: non-clustered unique index */
1658 dtuple_t* entry, /*!< in: index entry */
1659 que_thr_t* thr) /*!< in: query thread */
1660 {
1661 ulint n_unique;
1662 ulint i;
1663 int cmp;
1664 ulint n_fields_cmp;
1665 btr_pcur_t pcur;
1666 ulint err = DB_SUCCESS;
1667 ulint allow_duplicates;
1668 mtr_t mtr;
1669 mem_heap_t* heap = NULL;
1670 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1671 ulint* offsets = offsets_;
1672 rec_offs_init(offsets_);
1673
1674 n_unique = dict_index_get_n_unique(index);
1675
1676 /* If the secondary index is unique, but one of the fields in the
1677 n_unique first fields is NULL, a unique key violation cannot occur,
1678 since we define NULL != NULL in this case */
1679
1680 for (i = 0; i < n_unique; i++) {
1681 if (UNIV_SQL_NULL == dfield_get_len(
1682 dtuple_get_nth_field(entry, i))) {
1683
1684 return(DB_SUCCESS);
1685 }
1686 }
1687
1688 mtr_start(&mtr);
1689
1690 /* Store old value on n_fields_cmp */
1691
1692 n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1693
1694 dtuple_set_n_fields_cmp(entry, dict_index_get_n_unique(index));
1695
1696 btr_pcur_open(index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
1697
1698 allow_duplicates = thr_get_trx(thr)->duplicates;
1699
1700 /* Scan index records and check if there is a duplicate */
1701
1702 do {
1703 const rec_t* rec = btr_pcur_get_rec(&pcur);
1704 const buf_block_t* block = btr_pcur_get_block(&pcur);
1705 const ulint lock_type = LOCK_ORDINARY;
1706
1707 if (page_rec_is_infimum(rec)) {
1708
1709 continue;
1710 }
1711
1712 offsets = rec_get_offsets(rec, index, offsets,
1713 ULINT_UNDEFINED, &heap);
1714
1715 if (allow_duplicates) {
1716
1717 /* If the SQL-query will update or replace
1718 duplicate key we will take X-lock for
1719 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1720 INSERT ON DUPLICATE KEY UPDATE). */
1721
1722 err = row_ins_set_exclusive_rec_lock(
1723 lock_type, block, rec, index, offsets, thr);
1724 } else {
1725
1726 err = row_ins_set_shared_rec_lock(
1727 lock_type, block, rec, index, offsets, thr);
1728 }
1729
1730 switch (err) {
1731 case DB_SUCCESS_LOCKED_REC:
1732 err = DB_SUCCESS;
1733 case DB_SUCCESS:
1734 break;
1735 default:
1736 goto end_scan;
1737 }
1738
1739 if (page_rec_is_supremum(rec)) {
1740
1741 continue;
1742 }
1743
1744 cmp = cmp_dtuple_rec(entry, rec, offsets);
1745
1746 if (cmp == 0) {
1747 if (row_ins_dupl_error_with_rec(rec, entry,
1748 index, offsets)) {
1749 err = DB_DUPLICATE_KEY;
1750
1751 thr_get_trx(thr)->error_info = index;
1752
1753 goto end_scan;
1754 }
1755 } else {
1756 ut_a(cmp < 0);
1757 goto end_scan;
1758 }
1759 } while (btr_pcur_move_to_next(&pcur, &mtr));
1760
1761 end_scan:
1762 if (UNIV_LIKELY_NULL(heap)) {
1763 mem_heap_free(heap);
1764 }
1765 mtr_commit(&mtr);
1766
1767 /* Restore old value */
1768 dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1769
1770 return(err);
1771 }
1772
1773 /***************************************************************//**
1774 Checks if a unique key violation error would occur at an index entry
1775 insert. Sets shared locks on possible duplicate records. Works only
1776 for a clustered index!
1777 @return DB_SUCCESS if no error, DB_DUPLICATE_KEY if error,
1778 DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
1779 record */
1780 static
1781 ulint
row_ins_duplicate_error_in_clust(btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)1782 row_ins_duplicate_error_in_clust(
1783 /*=============================*/
1784 btr_cur_t* cursor, /*!< in: B-tree cursor */
1785 const dtuple_t* entry, /*!< in: entry to insert */
1786 que_thr_t* thr, /*!< in: query thread */
1787 mtr_t* mtr) /*!< in: mtr */
1788 {
1789 ulint err;
1790 rec_t* rec;
1791 ulint n_unique;
1792 trx_t* trx = thr_get_trx(thr);
1793 mem_heap_t*heap = NULL;
1794 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1795 ulint* offsets = offsets_;
1796 rec_offs_init(offsets_);
1797
1798 UT_NOT_USED(mtr);
1799
1800 ut_a(dict_index_is_clust(cursor->index));
1801 ut_ad(dict_index_is_unique(cursor->index));
1802
1803 /* NOTE: For unique non-clustered indexes there may be any number
1804 of delete marked records with the same value for the non-clustered
1805 index key (remember multiversioning), and which differ only in
1806 the row refererence part of the index record, containing the
1807 clustered index key fields. For such a secondary index record,
1808 to avoid race condition, we must FIRST do the insertion and after
1809 that check that the uniqueness condition is not breached! */
1810
1811 /* NOTE: A problem is that in the B-tree node pointers on an
1812 upper level may match more to the entry than the actual existing
1813 user records on the leaf level. So, even if low_match would suggest
1814 that a duplicate key violation may occur, this may not be the case. */
1815
1816 n_unique = dict_index_get_n_unique(cursor->index);
1817
1818 if (cursor->low_match >= n_unique) {
1819
1820 rec = btr_cur_get_rec(cursor);
1821
1822 if (!page_rec_is_infimum(rec)) {
1823 offsets = rec_get_offsets(rec, cursor->index, offsets,
1824 ULINT_UNDEFINED, &heap);
1825
1826 /* We set a lock on the possible duplicate: this
1827 is needed in logical logging of MySQL to make
1828 sure that in roll-forward we get the same duplicate
1829 errors as in original execution */
1830
1831 if (trx->duplicates) {
1832
1833 /* If the SQL-query will update or replace
1834 duplicate key we will take X-lock for
1835 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1836 INSERT ON DUPLICATE KEY UPDATE). */
1837
1838 err = row_ins_set_exclusive_rec_lock(
1839 LOCK_REC_NOT_GAP,
1840 btr_cur_get_block(cursor),
1841 rec, cursor->index, offsets, thr);
1842 } else {
1843
1844 err = row_ins_set_shared_rec_lock(
1845 LOCK_REC_NOT_GAP,
1846 btr_cur_get_block(cursor), rec,
1847 cursor->index, offsets, thr);
1848 }
1849
1850 switch (err) {
1851 case DB_SUCCESS_LOCKED_REC:
1852 case DB_SUCCESS:
1853 break;
1854 default:
1855 goto func_exit;
1856 }
1857
1858 if (row_ins_dupl_error_with_rec(
1859 rec, entry, cursor->index, offsets)) {
1860 trx->error_info = cursor->index;
1861 err = DB_DUPLICATE_KEY;
1862 goto func_exit;
1863 }
1864 }
1865 }
1866
1867 if (cursor->up_match >= n_unique) {
1868
1869 rec = page_rec_get_next(btr_cur_get_rec(cursor));
1870
1871 if (!page_rec_is_supremum(rec)) {
1872 offsets = rec_get_offsets(rec, cursor->index, offsets,
1873 ULINT_UNDEFINED, &heap);
1874
1875 if (trx->duplicates) {
1876
1877 /* If the SQL-query will update or replace
1878 duplicate key we will take X-lock for
1879 duplicates ( REPLACE, LOAD DATAFILE REPLACE,
1880 INSERT ON DUPLICATE KEY UPDATE). */
1881
1882 err = row_ins_set_exclusive_rec_lock(
1883 LOCK_REC_NOT_GAP,
1884 btr_cur_get_block(cursor),
1885 rec, cursor->index, offsets, thr);
1886 } else {
1887
1888 err = row_ins_set_shared_rec_lock(
1889 LOCK_REC_NOT_GAP,
1890 btr_cur_get_block(cursor),
1891 rec, cursor->index, offsets, thr);
1892 }
1893
1894 switch (err) {
1895 case DB_SUCCESS_LOCKED_REC:
1896 case DB_SUCCESS:
1897 break;
1898 default:
1899 goto func_exit;
1900 }
1901
1902 if (row_ins_dupl_error_with_rec(
1903 rec, entry, cursor->index, offsets)) {
1904 trx->error_info = cursor->index;
1905 err = DB_DUPLICATE_KEY;
1906 goto func_exit;
1907 }
1908 }
1909
1910 ut_a(!dict_index_is_clust(cursor->index));
1911 /* This should never happen */
1912 }
1913
1914 err = DB_SUCCESS;
1915 func_exit:
1916 if (UNIV_LIKELY_NULL(heap)) {
1917 mem_heap_free(heap);
1918 }
1919 return(err);
1920 }
1921
1922 /***************************************************************//**
1923 Checks if an index entry has long enough common prefix with an existing
1924 record so that the intended insert of the entry must be changed to a modify of
1925 the existing record. In the case of a clustered index, the prefix must be
1926 n_unique fields long, and in the case of a secondary index, all fields must be
1927 equal.
1928 @return 0 if no update, ROW_INS_PREV if previous should be updated;
1929 currently we do the search so that only the low_match record can match
1930 enough to the search tuple, not the next record */
1931 UNIV_INLINE
1932 ulint
row_ins_must_modify(btr_cur_t * cursor)1933 row_ins_must_modify(
1934 /*================*/
1935 btr_cur_t* cursor) /*!< in: B-tree cursor */
1936 {
1937 ulint enough_match;
1938 rec_t* rec;
1939
1940 /* NOTE: (compare to the note in row_ins_duplicate_error) Because node
1941 pointers on upper levels of the B-tree may match more to entry than
1942 to actual user records on the leaf level, we have to check if the
1943 candidate record is actually a user record. In a clustered index
1944 node pointers contain index->n_unique first fields, and in the case
1945 of a secondary index, all fields of the index. */
1946
1947 enough_match = dict_index_get_n_unique_in_tree(cursor->index);
1948
1949 if (cursor->low_match >= enough_match) {
1950
1951 rec = btr_cur_get_rec(cursor);
1952
1953 if (!page_rec_is_infimum(rec)) {
1954
1955 return(ROW_INS_PREV);
1956 }
1957 }
1958
1959 return(0);
1960 }
1961
1962 /***************************************************************//**
1963 Tries to insert an index entry to an index. If the index is clustered
1964 and a record with the same unique key is found, the other record is
1965 necessarily marked deleted by a committed transaction, or a unique key
1966 violation error occurs. The delete marked record is then updated to an
1967 existing record, and we must write an undo log record on the delete
1968 marked record. If the index is secondary, and a record with exactly the
1969 same fields is found, the other record is necessarily marked deleted.
1970 It is then unmarked. Otherwise, the entry is just inserted to the index.
1971 @return DB_SUCCESS, DB_LOCK_WAIT, DB_FAIL if pessimistic retry needed,
1972 or error code */
1973 static
1974 ulint
row_ins_index_entry_low(ulint mode,dict_index_t * index,dtuple_t * entry,ulint n_ext,que_thr_t * thr)1975 row_ins_index_entry_low(
1976 /*====================*/
1977 ulint mode, /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
1978 depending on whether we wish optimistic or
1979 pessimistic descent down the index tree */
1980 dict_index_t* index, /*!< in: index */
1981 dtuple_t* entry, /*!< in/out: index entry to insert */
1982 ulint n_ext, /*!< in: number of externally stored columns */
1983 que_thr_t* thr) /*!< in: query thread */
1984 {
1985 btr_cur_t cursor;
1986 ulint search_mode;
1987 ulint modify = 0; /* remove warning */
1988 rec_t* insert_rec;
1989 rec_t* rec;
1990 ulint* offsets;
1991 ulint err;
1992 ulint n_unique;
1993 big_rec_t* big_rec = NULL;
1994 mtr_t mtr;
1995 mem_heap_t* heap = NULL;
1996
1997 log_free_check();
1998
1999 mtr_start(&mtr);
2000
2001 cursor.thr = thr;
2002
2003 /* Note that we use PAGE_CUR_LE as the search mode, because then
2004 the function will return in both low_match and up_match of the
2005 cursor sensible values */
2006
2007 if (dict_index_is_clust(index)) {
2008 search_mode = mode;
2009 } else if (!(thr_get_trx(thr)->check_unique_secondary)) {
2010 search_mode = mode | BTR_INSERT | BTR_IGNORE_SEC_UNIQUE;
2011 } else {
2012 search_mode = mode | BTR_INSERT;
2013 }
2014
2015 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2016 search_mode,
2017 &cursor, 0, __FILE__, __LINE__, &mtr);
2018
2019 if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2020 /* The insertion was made to the insert buffer already during
2021 the search: we are done */
2022
2023 ut_ad(search_mode & BTR_INSERT);
2024 err = DB_SUCCESS;
2025
2026 goto function_exit;
2027 }
2028
2029 #ifdef UNIV_DEBUG
2030 {
2031 page_t* page = btr_cur_get_page(&cursor);
2032 rec_t* first_rec = page_rec_get_next(
2033 page_get_infimum_rec(page));
2034
2035 ut_ad(page_rec_is_supremum(first_rec)
2036 || rec_get_n_fields(first_rec, index)
2037 == dtuple_get_n_fields(entry));
2038 }
2039 #endif
2040
2041 n_unique = dict_index_get_n_unique(index);
2042
2043 if (dict_index_is_unique(index) && (cursor.up_match >= n_unique
2044 || cursor.low_match >= n_unique)) {
2045
2046 if (dict_index_is_clust(index)) {
2047 /* Note that the following may return also
2048 DB_LOCK_WAIT */
2049
2050 err = row_ins_duplicate_error_in_clust(
2051 &cursor, entry, thr, &mtr);
2052 if (err != DB_SUCCESS) {
2053
2054 goto function_exit;
2055 }
2056 } else {
2057 mtr_commit(&mtr);
2058 err = row_ins_scan_sec_index_for_duplicate(
2059 index, entry, thr);
2060 mtr_start(&mtr);
2061
2062 if (err != DB_SUCCESS) {
2063 goto function_exit;
2064 }
2065
2066 /* We did not find a duplicate and we have now
2067 locked with s-locks the necessary records to
2068 prevent any insertion of a duplicate by another
2069 transaction. Let us now reposition the cursor and
2070 continue the insertion. */
2071
2072 btr_cur_search_to_nth_level(index, 0, entry,
2073 PAGE_CUR_LE,
2074 mode | BTR_INSERT,
2075 &cursor, 0,
2076 __FILE__, __LINE__, &mtr);
2077 }
2078 }
2079
2080 modify = row_ins_must_modify(&cursor);
2081
2082 if (modify != 0) {
2083 /* There is already an index entry with a long enough common
2084 prefix, we must convert the insert into a modify of an
2085 existing record */
2086
2087 if (modify == ROW_INS_NEXT) {
2088 rec = page_rec_get_next(btr_cur_get_rec(&cursor));
2089
2090 btr_cur_position(index, rec,
2091 btr_cur_get_block(&cursor),&cursor);
2092 }
2093
2094 if (dict_index_is_clust(index)) {
2095 err = row_ins_clust_index_entry_by_modify(
2096 mode, &cursor, &heap, &big_rec, entry,
2097 thr, &mtr);
2098
2099 if (big_rec) {
2100 ut_a(err == DB_SUCCESS);
2101 /* Write out the externally stored
2102 columns while still x-latching
2103 index->lock and block->lock. Allocate
2104 pages for big_rec in the mtr that
2105 modified the B-tree, but be sure to skip
2106 any pages that were freed in mtr. We will
2107 write out the big_rec pages before
2108 committing the B-tree mini-transaction. If
2109 the system crashes so that crash recovery
2110 will not replay the mtr_commit(&mtr), the
2111 big_rec pages will be left orphaned until
2112 the pages are allocated for something else.
2113
2114 TODO: If the allocation extends the
2115 tablespace, it will not be redo
2116 logged, in either mini-transaction.
2117 Tablespace extension should be
2118 redo-logged in the big_rec
2119 mini-transaction, so that recovery
2120 will not fail when the big_rec was
2121 written to the extended portion of the
2122 file, in case the file was somehow
2123 truncated in the crash. */
2124
2125 rec = btr_cur_get_rec(&cursor);
2126 offsets = rec_get_offsets(
2127 rec, index, NULL,
2128 ULINT_UNDEFINED, &heap);
2129
2130 DEBUG_SYNC_C("before_row_ins_upd_extern");
2131 err = btr_store_big_rec_extern_fields(
2132 index, btr_cur_get_block(&cursor),
2133 rec, offsets, big_rec, &mtr,
2134 BTR_STORE_INSERT_UPDATE);
2135 DEBUG_SYNC_C("after_row_ins_upd_extern");
2136 /* If writing big_rec fails (for
2137 example, because of DB_OUT_OF_FILE_SPACE),
2138 the record will be corrupted. Even if
2139 we did not update any externally
2140 stored columns, our update could cause
2141 the record to grow so that a
2142 non-updated column was selected for
2143 external storage. This non-update
2144 would not have been written to the
2145 undo log, and thus the record cannot
2146 be rolled back.
2147
2148 However, because we have not executed
2149 mtr_commit(mtr) yet, the update will
2150 not be replayed in crash recovery, and
2151 the following assertion failure will
2152 effectively "roll back" the operation. */
2153 ut_a(err == DB_SUCCESS);
2154 goto stored_big_rec;
2155 }
2156 } else {
2157 ut_ad(!n_ext);
2158 err = row_ins_sec_index_entry_by_modify(
2159 mode, &cursor, entry, thr, &mtr);
2160 }
2161 } else {
2162 if (mode == BTR_MODIFY_LEAF) {
2163 err = btr_cur_optimistic_insert(
2164 0, &cursor, entry, &insert_rec, &big_rec,
2165 n_ext, thr, &mtr);
2166 } else {
2167 ut_a(mode == BTR_MODIFY_TREE);
2168 if (buf_LRU_buf_pool_running_out()) {
2169
2170 err = DB_LOCK_TABLE_FULL;
2171
2172 goto function_exit;
2173 }
2174
2175 err = btr_cur_optimistic_insert(
2176 0, &cursor, entry, &insert_rec, &big_rec,
2177 n_ext, thr, &mtr);
2178
2179 if (err == DB_FAIL) {
2180 err = btr_cur_pessimistic_insert(
2181 0, &cursor, entry, &insert_rec,
2182 &big_rec, n_ext, thr, &mtr);
2183 }
2184 }
2185 }
2186
2187 function_exit:
2188 mtr_commit(&mtr);
2189
2190 if (UNIV_LIKELY_NULL(big_rec)) {
2191 DBUG_EXECUTE_IF(
2192 "row_ins_extern_checkpoint",
2193 log_make_checkpoint_at(IB_ULONGLONG_MAX, TRUE););
2194
2195 mtr_start(&mtr);
2196
2197 DEBUG_SYNC_C("before_row_ins_extern_latch");
2198 btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
2199 BTR_MODIFY_TREE, &cursor, 0,
2200 __FILE__, __LINE__, &mtr);
2201 rec = btr_cur_get_rec(&cursor);
2202 offsets = rec_get_offsets(rec, index, NULL,
2203 ULINT_UNDEFINED, &heap);
2204
2205 DEBUG_SYNC_C("before_row_ins_extern");
2206 err = btr_store_big_rec_extern_fields(
2207 index, btr_cur_get_block(&cursor),
2208 rec, offsets, big_rec, &mtr, BTR_STORE_INSERT);
2209 DEBUG_SYNC_C("after_row_ins_extern");
2210
2211 stored_big_rec:
2212 if (modify) {
2213 dtuple_big_rec_free(big_rec);
2214 } else {
2215 dtuple_convert_back_big_rec(index, entry, big_rec);
2216 }
2217
2218 mtr_commit(&mtr);
2219 }
2220
2221 if (UNIV_LIKELY_NULL(heap)) {
2222 mem_heap_free(heap);
2223 }
2224 return(err);
2225 }
2226
2227 /***************************************************************//**
2228 Inserts an index entry to index. Tries first optimistic, then pessimistic
2229 descent down the tree. If the entry matches enough to a delete marked record,
2230 performs the insert by updating or delete unmarking the delete marked
2231 record.
2232 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
2233 UNIV_INTERN
2234 ulint
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,ulint n_ext,ibool foreign,que_thr_t * thr)2235 row_ins_index_entry(
2236 /*================*/
2237 dict_index_t* index, /*!< in: index */
2238 dtuple_t* entry, /*!< in/out: index entry to insert */
2239 ulint n_ext, /*!< in: number of externally stored columns */
2240 ibool foreign,/*!< in: TRUE=check foreign key constraints
2241 (foreign=FALSE only during CREATE INDEX) */
2242 que_thr_t* thr) /*!< in: query thread */
2243 {
2244 ulint err;
2245
2246 DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
2247 DBUG_SET("-d,row_ins_index_entry_timeout");
2248 return(DB_LOCK_WAIT);});
2249
2250 if (foreign && UT_LIST_GET_FIRST(index->table->foreign_list)) {
2251 err = row_ins_check_foreign_constraints(index->table, index,
2252 entry, thr);
2253 if (err != DB_SUCCESS) {
2254
2255 return(err);
2256 }
2257 }
2258
2259 /* Try first optimistic descent to the B-tree */
2260
2261 err = row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry,
2262 n_ext, thr);
2263 if (err != DB_FAIL) {
2264 if (index == dict_table_get_first_index(index->table)
2265 && thr_get_trx(thr)->mysql_thd != 0) {
2266 DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
2267 }
2268 return(err);
2269 }
2270
2271 /* Try then pessimistic descent to the B-tree */
2272
2273 err = row_ins_index_entry_low(BTR_MODIFY_TREE, index, entry,
2274 n_ext, thr);
2275 return(err);
2276 }
2277
2278 /***********************************************************//**
2279 Sets the values of the dtuple fields in entry from the values of appropriate
2280 columns in row. */
2281 static
2282 void
row_ins_index_entry_set_vals(dict_index_t * index,dtuple_t * entry,const dtuple_t * row)2283 row_ins_index_entry_set_vals(
2284 /*=========================*/
2285 dict_index_t* index, /*!< in: index */
2286 dtuple_t* entry, /*!< in: index entry to make */
2287 const dtuple_t* row) /*!< in: row */
2288 {
2289 ulint n_fields;
2290 ulint i;
2291
2292 ut_ad(entry && row);
2293
2294 n_fields = dtuple_get_n_fields(entry);
2295
2296 for (i = 0; i < n_fields; i++) {
2297 dict_field_t* ind_field;
2298 dfield_t* field;
2299 const dfield_t* row_field;
2300 ulint len;
2301
2302 field = dtuple_get_nth_field(entry, i);
2303 ind_field = dict_index_get_nth_field(index, i);
2304 row_field = dtuple_get_nth_field(row, ind_field->col->ind);
2305 len = dfield_get_len(row_field);
2306
2307 /* Check column prefix indexes */
2308 if (ind_field->prefix_len > 0
2309 && dfield_get_len(row_field) != UNIV_SQL_NULL) {
2310
2311 const dict_col_t* col
2312 = dict_field_get_col(ind_field);
2313
2314 len = dtype_get_at_most_n_mbchars(
2315 col->prtype, col->mbminmaxlen,
2316 ind_field->prefix_len,
2317 len, dfield_get_data(row_field));
2318
2319 ut_ad(!dfield_is_ext(row_field));
2320 }
2321
2322 dfield_set_data(field, dfield_get_data(row_field), len);
2323 if (dfield_is_ext(row_field)) {
2324 ut_ad(dict_index_is_clust(index));
2325 dfield_set_ext(field);
2326 }
2327 }
2328 }
2329
2330 /***********************************************************//**
2331 Inserts a single index entry to the table.
2332 @return DB_SUCCESS if operation successfully completed, else error
2333 code or DB_LOCK_WAIT */
2334 static
2335 ulint
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)2336 row_ins_index_entry_step(
2337 /*=====================*/
2338 ins_node_t* node, /*!< in: row insert node */
2339 que_thr_t* thr) /*!< in: query thread */
2340 {
2341 ulint err;
2342
2343 ut_ad(dtuple_check_typed(node->row));
2344
2345 row_ins_index_entry_set_vals(node->index, node->entry, node->row);
2346
2347 ut_ad(dtuple_check_typed(node->entry));
2348
2349 err = row_ins_index_entry(node->index, node->entry, 0, TRUE, thr);
2350
2351 return(err);
2352 }
2353
2354 /***********************************************************//**
2355 Allocates a row id for row and inits the node->index field. */
2356 UNIV_INLINE
2357 void
row_ins_alloc_row_id_step(ins_node_t * node)2358 row_ins_alloc_row_id_step(
2359 /*======================*/
2360 ins_node_t* node) /*!< in: row insert node */
2361 {
2362 row_id_t row_id;
2363
2364 ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
2365
2366 if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
2367
2368 /* No row id is stored if the clustered index is unique */
2369
2370 return;
2371 }
2372
2373 /* Fill in row id value to row */
2374
2375 row_id = dict_sys_get_new_row_id();
2376
2377 dict_sys_write_row_id(node->row_id_buf, row_id);
2378 }
2379
2380 /***********************************************************//**
2381 Gets a row to insert from the values list. */
2382 UNIV_INLINE
2383 void
row_ins_get_row_from_values(ins_node_t * node)2384 row_ins_get_row_from_values(
2385 /*========================*/
2386 ins_node_t* node) /*!< in: row insert node */
2387 {
2388 que_node_t* list_node;
2389 dfield_t* dfield;
2390 dtuple_t* row;
2391 ulint i;
2392
2393 /* The field values are copied in the buffers of the select node and
2394 it is safe to use them until we fetch from select again: therefore
2395 we can just copy the pointers */
2396
2397 row = node->row;
2398
2399 i = 0;
2400 list_node = node->values_list;
2401
2402 while (list_node) {
2403 eval_exp(list_node);
2404
2405 dfield = dtuple_get_nth_field(row, i);
2406 dfield_copy_data(dfield, que_node_get_val(list_node));
2407
2408 i++;
2409 list_node = que_node_get_next(list_node);
2410 }
2411 }
2412
2413 /***********************************************************//**
2414 Gets a row to insert from the select list. */
2415 UNIV_INLINE
2416 void
row_ins_get_row_from_select(ins_node_t * node)2417 row_ins_get_row_from_select(
2418 /*========================*/
2419 ins_node_t* node) /*!< in: row insert node */
2420 {
2421 que_node_t* list_node;
2422 dfield_t* dfield;
2423 dtuple_t* row;
2424 ulint i;
2425
2426 /* The field values are copied in the buffers of the select node and
2427 it is safe to use them until we fetch from select again: therefore
2428 we can just copy the pointers */
2429
2430 row = node->row;
2431
2432 i = 0;
2433 list_node = node->select->select_list;
2434
2435 while (list_node) {
2436 dfield = dtuple_get_nth_field(row, i);
2437 dfield_copy_data(dfield, que_node_get_val(list_node));
2438
2439 i++;
2440 list_node = que_node_get_next(list_node);
2441 }
2442 }
2443
2444 /***********************************************************//**
2445 Inserts a row to a table.
2446 @return DB_SUCCESS if operation successfully completed, else error
2447 code or DB_LOCK_WAIT */
2448 static
2449 ulint
row_ins(ins_node_t * node,que_thr_t * thr)2450 row_ins(
2451 /*====*/
2452 ins_node_t* node, /*!< in: row insert node */
2453 que_thr_t* thr) /*!< in: query thread */
2454 {
2455 ulint err;
2456
2457 ut_ad(node && thr);
2458
2459 if (node->state == INS_NODE_ALLOC_ROW_ID) {
2460
2461 row_ins_alloc_row_id_step(node);
2462
2463 node->index = dict_table_get_first_index(node->table);
2464 node->entry = UT_LIST_GET_FIRST(node->entry_list);
2465
2466 if (node->ins_type == INS_SEARCHED) {
2467
2468 row_ins_get_row_from_select(node);
2469
2470 } else if (node->ins_type == INS_VALUES) {
2471
2472 row_ins_get_row_from_values(node);
2473 }
2474
2475 node->state = INS_NODE_INSERT_ENTRIES;
2476 }
2477
2478 ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
2479
2480 while (node->index != NULL) {
2481 err = row_ins_index_entry_step(node, thr);
2482
2483 if (err != DB_SUCCESS) {
2484
2485 return(err);
2486 }
2487
2488 node->index = dict_table_get_next_index(node->index);
2489 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
2490
2491 /* Skip corrupted secondar index and its entry */
2492 while (node->index && dict_index_is_corrupted(node->index)) {
2493
2494 node->index = dict_table_get_next_index(node->index);
2495 node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
2496 }
2497 }
2498
2499 ut_ad(node->entry == NULL);
2500
2501 node->state = INS_NODE_ALLOC_ROW_ID;
2502
2503 return(DB_SUCCESS);
2504 }
2505
2506 /***********************************************************//**
2507 Inserts a row to a table. This is a high-level function used in SQL execution
2508 graphs.
2509 @return query thread to run next or NULL */
2510 UNIV_INTERN
2511 que_thr_t*
row_ins_step(que_thr_t * thr)2512 row_ins_step(
2513 /*=========*/
2514 que_thr_t* thr) /*!< in: query thread */
2515 {
2516 ins_node_t* node;
2517 que_node_t* parent;
2518 sel_node_t* sel_node;
2519 trx_t* trx;
2520 ulint err;
2521
2522 ut_ad(thr);
2523
2524 trx = thr_get_trx(thr);
2525
2526 trx_start_if_not_started(trx);
2527
2528 node = thr->run_node;
2529
2530 ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
2531
2532 parent = que_node_get_parent(node);
2533 sel_node = node->select;
2534
2535 if (thr->prev_node == parent) {
2536 node->state = INS_NODE_SET_IX_LOCK;
2537 }
2538
2539 /* If this is the first time this node is executed (or when
2540 execution resumes after wait for the table IX lock), set an
2541 IX lock on the table and reset the possible select node. MySQL's
2542 partitioned table code may also call an insert within the same
2543 SQL statement AFTER it has used this table handle to do a search.
2544 This happens, for example, when a row update moves it to another
2545 partition. In that case, we have already set the IX lock on the
2546 table during the search operation, and there is no need to set
2547 it again here. But we must write trx->id to node->trx_id_buf. */
2548
2549 trx_write_trx_id(node->trx_id_buf, trx->id);
2550
2551 if (node->state == INS_NODE_SET_IX_LOCK) {
2552
2553 /* It may be that the current session has not yet started
2554 its transaction, or it has been committed: */
2555
2556 if (trx->id == node->trx_id) {
2557 /* No need to do IX-locking */
2558
2559 goto same_trx;
2560 }
2561
2562 err = lock_table(0, node->table, LOCK_IX, thr);
2563
2564 if (err != DB_SUCCESS) {
2565
2566 goto error_handling;
2567 }
2568
2569 node->trx_id = trx->id;
2570 same_trx:
2571 node->state = INS_NODE_ALLOC_ROW_ID;
2572
2573 if (node->ins_type == INS_SEARCHED) {
2574 /* Reset the cursor */
2575 sel_node->state = SEL_NODE_OPEN;
2576
2577 /* Fetch a row to insert */
2578
2579 thr->run_node = sel_node;
2580
2581 return(thr);
2582 }
2583 }
2584
2585 if ((node->ins_type == INS_SEARCHED)
2586 && (sel_node->state != SEL_NODE_FETCH)) {
2587
2588 ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
2589
2590 /* No more rows to insert */
2591 thr->run_node = parent;
2592
2593 return(thr);
2594 }
2595
2596 /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
2597
2598 err = row_ins(node, thr);
2599
2600 error_handling:
2601 trx->error_state = err;
2602
2603 if (err != DB_SUCCESS) {
2604 /* err == DB_LOCK_WAIT or SQL error detected */
2605 return(NULL);
2606 }
2607
2608 /* DO THE TRIGGER ACTIONS HERE */
2609
2610 if (node->ins_type == INS_SEARCHED) {
2611 /* Fetch a row to insert */
2612
2613 thr->run_node = sel_node;
2614 } else {
2615 thr->run_node = que_node_get_parent(node);
2616 }
2617
2618 return(thr);
2619 }
2620