1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file row/row0ins.cc
28  Insert into a table
29 
30  Created 4/20/1996 Heikki Tuuri
31  *******************************************************/
32 
33 #include <sys/types.h>
34 
35 #include "btr0btr.h"
36 #include "btr0cur.h"
37 #include "buf0lru.h"
38 #include "current_thd.h"
39 #include "data0data.h"
40 #include "dict0boot.h"
41 #include "dict0dd.h"
42 #include "dict0dict.h"
43 #include "eval0eval.h"
44 #include "fts0fts.h"
45 #include "fts0types.h"
46 #include "gis0geo.h"
47 #include "ha_prototypes.h"
48 #include "lob0lob.h"
49 #include "lock0lock.h"
50 #include "log0log.h"
51 #include "m_string.h"
52 #include "mach0data.h"
53 #include "que0que.h"
54 #include "rem0cmp.h"
55 #include "row0ins.h"
56 #include "row0log.h"
57 #include "row0row.h"
58 #include "row0sel.h"
59 #include "row0upd.h"
60 #include "trx0rec.h"
61 #include "trx0undo.h"
62 #include "usr0sess.h"
63 
64 #include "my_dbug.h"
65 
66 /*************************************************************************
67 IMPORTANT NOTE: Any operation that generates redo MUST check that there
68 is enough space in the redo log before for that operation. This is
69 done by calling log_free_check(). The reason for checking the
70 availability of the redo log space before the start of the operation is
71 that we MUST not hold any synchonization objects when performing the
72 check.
73 If you make a change in this module make sure that no codepath is
74 introduced where a call to log_free_check() is bypassed. */
75 
76 /** Creates an insert node struct.
77  @return own: insert node struct */
ins_node_create(ulint ins_type,dict_table_t * table,mem_heap_t * heap)78 ins_node_t *ins_node_create(
79     ulint ins_type,      /*!< in: INS_VALUES, ... */
80     dict_table_t *table, /*!< in: table where to insert */
81     mem_heap_t *heap)    /*!< in: mem heap where created */
82 {
83   ins_node_t *node;
84 
85   node = static_cast<ins_node_t *>(mem_heap_alloc(heap, sizeof(ins_node_t)));
86 
87   node->common.type = QUE_NODE_INSERT;
88 
89   node->ins_type = ins_type;
90 
91   node->state = INS_NODE_SET_IX_LOCK;
92   node->table = table;
93   node->index = nullptr;
94   node->entry = nullptr;
95 
96   node->select = nullptr;
97 
98   node->trx_id = 0;
99 
100   node->entry_sys_heap = mem_heap_create(128);
101 
102   node->magic_n = INS_NODE_MAGIC_N;
103 
104   node->ins_multi_val_pos = 0;
105 
106   return (node);
107 }
108 
109 /** Creates an entry template for each index of a table. */
ins_node_create_entry_list(ins_node_t * node)110 static void ins_node_create_entry_list(
111     ins_node_t *node) /*!< in: row insert node */
112 {
113   dict_index_t *index;
114   dtuple_t *entry;
115 
116   ut_ad(node->entry_sys_heap);
117 
118   UT_LIST_INIT(node->entry_list, &dtuple_t::tuple_list);
119 
120   /* We will include all indexes (include those corrupted
121   secondary indexes) in the entry list. Filteration of
122   these corrupted index will be done in row_ins() */
123 
124   for (index = node->table->first_index(); index != nullptr;
125        index = index->next()) {
126     entry = row_build_index_entry_low(
127         node->row, nullptr, index, node->entry_sys_heap, ROW_BUILD_FOR_INSERT);
128 
129     UT_LIST_ADD_LAST(node->entry_list, entry);
130   }
131 }
132 
133 /** Adds system field buffers to a row. */
row_ins_alloc_sys_fields(ins_node_t * node)134 static void row_ins_alloc_sys_fields(ins_node_t *node) /*!< in: insert node */
135 {
136   dtuple_t *row;
137   dict_table_t *table;
138   mem_heap_t *heap;
139   const dict_col_t *col;
140   dfield_t *dfield;
141   byte *ptr;
142 
143   row = node->row;
144   table = node->table;
145   heap = node->entry_sys_heap;
146 
147   ut_ad(row && table && heap);
148   ut_ad(dtuple_get_n_fields(row) == table->get_n_cols());
149 
150   /* allocate buffer to hold the needed system created hidden columns. */
151   uint len = DATA_ROW_ID_LEN + DATA_TRX_ID_LEN;
152   if (!table->is_intrinsic()) {
153     len += DATA_ROLL_PTR_LEN;
154   }
155   ptr = static_cast<byte *>(mem_heap_zalloc(heap, len));
156 
157   /* 1. Populate row-id */
158   col = table->get_sys_col(DATA_ROW_ID);
159 
160   dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
161 
162   dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
163 
164   node->row_id_buf = ptr;
165 
166   ptr += DATA_ROW_ID_LEN;
167 
168   /* 2. Populate trx id */
169   col = table->get_sys_col(DATA_TRX_ID);
170 
171   dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
172 
173   dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
174 
175   node->trx_id_buf = ptr;
176 
177   ptr += DATA_TRX_ID_LEN;
178 
179   if (!table->is_intrinsic()) {
180     col = table->get_sys_col(DATA_ROLL_PTR);
181 
182     dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
183 
184     dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
185   }
186 }
187 
188 /** Sets a new row to insert for an INS_DIRECT node. This function is only used
189  if we have constructed the row separately, which is a rare case; this
190  function is quite slow. */
ins_node_set_new_row(ins_node_t * node,dtuple_t * row)191 void ins_node_set_new_row(
192     ins_node_t *node, /*!< in: insert node */
193     dtuple_t *row)    /*!< in: new row (or first row) for the node */
194 {
195   node->state = INS_NODE_SET_IX_LOCK;
196   node->index = nullptr;
197   node->entry = nullptr;
198 
199   node->row = row;
200 
201   mem_heap_empty(node->entry_sys_heap);
202 
203   /* Create templates for index entries */
204 
205   ins_node_create_entry_list(node);
206 
207   /* Allocate from entry_sys_heap buffers for sys fields */
208 
209   row_ins_alloc_sys_fields(node);
210 
211   /* As we allocated a new trx id buf, the trx id should be written
212   there again: */
213 
214   node->trx_id = 0;
215 }
216 
217 /** Does an insert operation by updating a delete-marked existing record
218  in the index. This situation can occur if the delete-marked record is
219  kept in the index for consistent reads.
220  @return DB_SUCCESS or error code */
221 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_sec_index_entry_by_modify(ulint flags,ulint mode,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)222     row_ins_sec_index_entry_by_modify(
223         ulint flags,       /*!< in: undo logging and locking flags */
224         ulint mode,        /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
225                            depending on whether mtr holds just a leaf
226                            latch or also a tree latch */
227         btr_cur_t *cursor, /*!< in: B-tree cursor */
228         ulint **offsets,   /*!< in/out: offsets on cursor->page_cur.rec */
229         mem_heap_t *offsets_heap,
230         /*!< in/out: memory heap that can be emptied */
231         mem_heap_t *heap,      /*!< in/out: memory heap */
232         const dtuple_t *entry, /*!< in: index entry to insert */
233         que_thr_t *thr,        /*!< in: query thread */
234         mtr_t *mtr)            /*!< in: mtr; must be committed before
235                                latching any further pages */
236 {
237   big_rec_t *dummy_big_rec;
238   upd_t *update;
239   rec_t *rec;
240   dberr_t err;
241 
242   rec = btr_cur_get_rec(cursor);
243 
244   ut_ad(!cursor->index->is_clustered());
245   ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
246   ut_ad(!entry->info_bits);
247 
248   /* We know that in the alphabetical ordering, entry and rec are
249   identified. But in their binary form there may be differences if
250   there are char fields in them. Therefore we have to calculate the
251   difference. */
252 
253   update = row_upd_build_sec_rec_difference_binary(rec, cursor->index, *offsets,
254                                                    entry, heap);
255 
256   if (!rec_get_deleted_flag(rec, rec_offs_comp(*offsets))) {
257     /* We should never insert in place of a record that
258     has not been delete-marked. The only exception is when
259     online CREATE INDEX copied the changes that we already
260     made to the clustered index, and completed the
261     secondary index creation before we got here. In this
262     case, the change would already be there. The CREATE
263     INDEX should be waiting for a MySQL meta-data lock
264     upgrade at least until this INSERT or UPDATE
265     returns. After that point, set_committed(true)
266     would be invoked in commit_inplace_alter_table(). */
267     ut_a(update->n_fields == 0);
268     ut_a(!cursor->index->is_committed());
269     ut_ad(!dict_index_is_online_ddl(cursor->index));
270     return (DB_SUCCESS);
271   }
272 
273   if (mode == BTR_MODIFY_LEAF) {
274     /* Try an optimistic updating of the record, keeping changes
275     within the page */
276 
277     /* TODO: pass only *offsets */
278     err = btr_cur_optimistic_update(flags | BTR_KEEP_SYS_FLAG, cursor, offsets,
279                                     &offsets_heap, update, 0, thr,
280                                     thr_get_trx(thr)->id, mtr);
281     switch (err) {
282       case DB_OVERFLOW:
283       case DB_UNDERFLOW:
284       case DB_ZIP_OVERFLOW:
285         err = DB_FAIL;
286       default:
287         break;
288     }
289   } else {
290     ut_a(mode == BTR_MODIFY_TREE);
291     if (buf_LRU_buf_pool_running_out()) {
292       return (DB_LOCK_TABLE_FULL);
293     }
294 
295     trx_t *trx = thr_get_trx(thr);
296     err = btr_cur_pessimistic_update(
297         flags | BTR_KEEP_SYS_FLAG, cursor, offsets, &offsets_heap, heap,
298         &dummy_big_rec, update, 0, thr, trx->id, trx->undo_no, mtr);
299     ut_ad(!dummy_big_rec);
300   }
301 
302   return (err);
303 }
304 
305 /** Does an insert operation by delete unmarking and updating a delete marked
306  existing record in the index. This situation can occur if the delete marked
307  record is kept in the index for consistent reads.
308  @return DB_SUCCESS, DB_FAIL, or error code */
309 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_clust_index_entry_by_modify(btr_pcur_t * pcur,ulint flags,ulint mode,ulint ** offsets,mem_heap_t ** offsets_heap,mem_heap_t * heap,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)310     row_ins_clust_index_entry_by_modify(
311         btr_pcur_t *pcur, /*!< in/out: a persistent cursor pointing
312                           to the clust_rec that is being modified. */
313         ulint flags,      /*!< in: undo logging and locking flags */
314         ulint mode,       /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
315                           depending on whether mtr holds just a leaf
316                           latch or also a tree latch */
317         ulint **offsets,  /*!< out: offsets on cursor->page_cur.rec */
318         mem_heap_t **offsets_heap,
319         /*!< in/out: pointer to memory heap that can
320         be emptied, or NULL */
321         mem_heap_t *heap,      /*!< in/out: memory heap */
322         const dtuple_t *entry, /*!< in: index entry to insert */
323         que_thr_t *thr,        /*!< in: query thread */
324         mtr_t *mtr)            /*!< in: mtr; must be committed before
325                                latching any further pages */
326 {
327   const rec_t *rec;
328   upd_t *update;
329   dberr_t err = DB_SUCCESS;
330   btr_cur_t *cursor = btr_pcur_get_btr_cur(pcur);
331   TABLE *mysql_table = nullptr;
332   ut_ad(cursor->index->is_clustered());
333 
334   rec = btr_cur_get_rec(cursor);
335 
336   ut_ad(rec_get_deleted_flag(rec, dict_table_is_comp(cursor->index->table)));
337 
338   /* Build an update vector containing all the fields to be modified;
339   NOTE that this vector may NOT contain system columns trx_id or
340   roll_ptr */
341   if (thr->prebuilt != nullptr) {
342     mysql_table = thr->prebuilt->m_mysql_table;
343     ut_ad(thr->prebuilt->trx == thr_get_trx(thr));
344   }
345 
346   update = row_upd_build_difference_binary(cursor->index, entry, rec, nullptr,
347                                            true, thr_get_trx(thr), heap,
348                                            mysql_table, &err);
349   if (err != DB_SUCCESS) {
350     return (err);
351   }
352   if (mode != BTR_MODIFY_TREE) {
353     ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
354 
355     /* Try optimistic updating of the record, keeping changes
356     within the page */
357 
358     err = btr_cur_optimistic_update(flags, cursor, offsets, offsets_heap,
359                                     update, 0, thr, thr_get_trx(thr)->id, mtr);
360     switch (err) {
361       case DB_OVERFLOW:
362       case DB_UNDERFLOW:
363       case DB_ZIP_OVERFLOW:
364         err = DB_FAIL;
365       default:
366         break;
367     }
368   } else {
369     if (buf_LRU_buf_pool_running_out()) {
370       return (DB_LOCK_TABLE_FULL);
371     }
372 
373     big_rec_t *big_rec = nullptr;
374     trx_t *trx = thr_get_trx(thr);
375     trx_id_t trx_id = thr_get_trx(thr)->id;
376 
377     DEBUG_SYNC_C("before_row_ins_upd_pessimistic");
378 
379     err = btr_cur_pessimistic_update(flags | BTR_KEEP_POS_FLAG, cursor, offsets,
380                                      offsets_heap, heap, &big_rec, update, 0,
381                                      thr, trx_id, trx->undo_no, mtr);
382 
383     if (big_rec) {
384       ut_a(err == DB_SUCCESS);
385 
386       DEBUG_SYNC_C("before_row_ins_upd_extern");
387       err = lob::btr_store_big_rec_extern_fields(
388           trx, pcur, update, *offsets, big_rec, mtr, lob::OPCODE_INSERT_UPDATE);
389       DEBUG_SYNC_C("after_row_ins_upd_extern");
390       dtuple_big_rec_free(big_rec);
391     }
392   }
393 
394   return (err);
395 }
396 
397 /** Returns TRUE if in a cascaded update/delete an ancestor node of node
398  updates (not DELETE, but UPDATE) table.
399  @return true if an ancestor updates table */
row_ins_cascade_ancestor_updates_table(que_node_t * node,dict_table_t * table)400 static ibool row_ins_cascade_ancestor_updates_table(
401     que_node_t *node,    /*!< in: node in a query graph */
402     dict_table_t *table) /*!< in: table */
403 {
404   que_node_t *parent;
405 
406   for (parent = que_node_get_parent(node);
407        que_node_get_type(parent) == QUE_NODE_UPDATE;
408        parent = que_node_get_parent(parent)) {
409     upd_node_t *upd_node;
410 
411     upd_node = static_cast<upd_node_t *>(parent);
412 
413     if (upd_node->table == table && upd_node->is_delete == FALSE) {
414       return (TRUE);
415     }
416   }
417 
418   return (FALSE);
419 }
420 
421 /** Returns the number of ancestor UPDATE or DELETE nodes of a
422  cascaded update/delete node.
423  @return number of ancestors */
row_ins_cascade_n_ancestors(que_node_t * node)424 static MY_ATTRIBUTE((warn_unused_result)) ulint row_ins_cascade_n_ancestors(
425     que_node_t *node) /*!< in: node in a query graph */
426 {
427   que_node_t *parent;
428   ulint n_ancestors = 0;
429 
430   for (parent = que_node_get_parent(node);
431        que_node_get_type(parent) == QUE_NODE_UPDATE;
432        parent = que_node_get_parent(parent)) {
433     n_ancestors++;
434   }
435 
436   return (n_ancestors);
437 }
438 
439 /** Calculates the update vector node->cascade->update for a child table in
440  a cascaded update.
441  @return number of fields in the calculated update vector; the value
442  can also be 0 if no foreign key fields changed; the returned value is
443  ULINT_UNDEFINED if the column type in the child table is too short to
444  fit the new value in the parent table: that means the update fails */
row_ins_cascade_calc_update_vec(upd_node_t * node,dict_foreign_t * foreign,mem_heap_t * heap,trx_t * trx,ibool * fts_col_affected)445 static MY_ATTRIBUTE((warn_unused_result)) ulint row_ins_cascade_calc_update_vec(
446     upd_node_t *node,        /*!< in: update node of the parent
447                              table */
448     dict_foreign_t *foreign, /*!< in: foreign key constraint whose
449                              type is != 0 */
450     mem_heap_t *heap,        /*!< in: memory heap to use as
451                              temporary storage */
452     trx_t *trx,              /*!< in: update transaction */
453     ibool *fts_col_affected)
454 /*!< out: is FTS column affected */
455 {
456   upd_node_t *cascade = node->cascade_node;
457   dict_table_t *table = foreign->foreign_table;
458   dict_index_t *index = foreign->foreign_index;
459   upd_t *update;
460   dict_table_t *parent_table;
461   dict_index_t *parent_index;
462   upd_t *parent_update;
463   ulint n_fields_updated;
464   ulint parent_field_no;
465   ulint i;
466   ulint j;
467   ibool doc_id_updated = FALSE;
468   ulint doc_id_pos = 0;
469   doc_id_t new_doc_id = FTS_NULL_DOC_ID;
470 
471   ut_a(node);
472   ut_a(foreign);
473   ut_a(cascade);
474   ut_a(table);
475   ut_a(index);
476 
477   /* Calculate the appropriate update vector which will set the fields
478   in the child index record to the same value (possibly padded with
479   spaces if the column is a fixed length CHAR or FIXBINARY column) as
480   the referenced index record will get in the update. */
481 
482   parent_table = node->table;
483   ut_a(parent_table == foreign->referenced_table);
484   parent_index = foreign->referenced_index;
485   parent_update = node->update;
486 
487   update = cascade->update;
488 
489   update->info_bits = 0;
490 
491   n_fields_updated = 0;
492 
493   *fts_col_affected = FALSE;
494 
495   if (table->fts) {
496     doc_id_pos = dict_table_get_nth_col_pos(table, table->fts->doc_col);
497   }
498 
499   for (i = 0; i < foreign->n_fields; i++) {
500     parent_field_no =
501         dict_table_get_nth_col_pos(parent_table, parent_index->get_col_no(i));
502 
503     for (j = 0; j < parent_update->n_fields; j++) {
504       const upd_field_t *parent_ufield = &parent_update->fields[j];
505 
506       if (parent_ufield->field_no == parent_field_no) {
507         ulint min_size;
508         const dict_col_t *col;
509         ulint ufield_len;
510         upd_field_t *ufield;
511 
512         col = index->get_col(i);
513 
514         /* A field in the parent index record is
515         updated. Let us make the update vector
516         field for the child table. */
517 
518         ufield = update->fields + n_fields_updated;
519 
520         ufield->field_no =
521             dict_table_get_nth_col_pos(table, dict_col_get_no(col));
522 
523         ufield->orig_len = 0;
524         ufield->exp = nullptr;
525 
526         ufield->new_val = parent_ufield->new_val;
527         ufield_len = dfield_get_len(&ufield->new_val);
528 
529         /* Clear the "external storage" flag */
530         dfield_set_len(&ufield->new_val, ufield_len);
531 
532         /* Do not allow a NOT NULL column to be
533         updated as NULL */
534 
535         if (dfield_is_null(&ufield->new_val) && (col->prtype & DATA_NOT_NULL)) {
536           return (ULINT_UNDEFINED);
537         }
538 
539         /* If the new value would not fit in the
540         column, do not allow the update */
541 
542         if (!dfield_is_null(&ufield->new_val) &&
543             dtype_get_at_most_n_mbchars(
544                 col->prtype, col->mbminmaxlen, col->len, ufield_len,
545                 static_cast<char *>(dfield_get_data(&ufield->new_val))) <
546                 ufield_len) {
547           return (ULINT_UNDEFINED);
548         }
549 
550         /* If the parent column type has a different
551         length than the child column type, we may
552         need to pad with spaces the new value of the
553         child column */
554 
555         min_size = col->get_min_size();
556 
557         /* Because UNIV_SQL_NULL (the marker
558         of SQL NULL values) exceeds all possible
559         values of min_size, the test below will
560         not hold for SQL NULL columns. */
561 
562         if (min_size > ufield_len) {
563           byte *pad;
564           ulint pad_len;
565           byte *padded_data;
566           ulint mbminlen;
567 
568           padded_data = static_cast<byte *>(mem_heap_alloc(heap, min_size));
569 
570           pad = padded_data + ufield_len;
571           pad_len = min_size - ufield_len;
572 
573           memcpy(padded_data, dfield_get_data(&ufield->new_val), ufield_len);
574 
575           mbminlen = col->get_mbminlen();
576 
577           ut_ad(!(ufield_len % mbminlen));
578           ut_ad(!(min_size % mbminlen));
579 
580           if (mbminlen == 1 && dtype_get_charset_coll(col->prtype) ==
581                                    DATA_MYSQL_BINARY_CHARSET_COLL) {
582             /* Do not pad BINARY columns */
583             return (ULINT_UNDEFINED);
584           }
585 
586           row_mysql_pad_col(mbminlen, pad, pad_len);
587           dfield_set_data(&ufield->new_val, padded_data, min_size);
588         }
589 
590         /* Check whether the current column has
591         FTS index on it */
592         if (table->fts &&
593             dict_table_is_fts_column(table->fts->indexes, dict_col_get_no(col),
594                                      col->is_virtual()) != ULINT_UNDEFINED) {
595           *fts_col_affected = TRUE;
596         }
597 
598         /* If Doc ID is updated, check whether the
599         Doc ID is valid */
600         if (table->fts && ufield->field_no == doc_id_pos) {
601           doc_id_t n_doc_id;
602 
603           n_doc_id = table->fts->cache->next_doc_id;
604 
605           new_doc_id = fts_read_doc_id(
606               static_cast<const byte *>(dfield_get_data(&ufield->new_val)));
607 
608           if (new_doc_id <= 0) {
609             ib::error(ER_IB_MSG_954) << "FTS Doc ID"
610                                         " must be larger than"
611                                         " 0";
612             return (ULINT_UNDEFINED);
613           }
614 
615           if (new_doc_id < n_doc_id) {
616             ib::error(ER_IB_MSG_955)
617                 << "FTS Doc ID"
618                    " must be larger than "
619                 << n_doc_id - 1 << " for table " << table->name;
620 
621             return (ULINT_UNDEFINED);
622           }
623 
624           *fts_col_affected = TRUE;
625           doc_id_updated = TRUE;
626         }
627 
628         n_fields_updated++;
629       }
630     }
631   }
632 
633   /* Generate a new Doc ID if FTS index columns get updated */
634   if (table->fts && *fts_col_affected) {
635     if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_HAS_DOC_ID)) {
636       doc_id_t doc_id;
637       doc_id_t *next_doc_id;
638       upd_field_t *ufield;
639 
640       next_doc_id =
641           static_cast<doc_id_t *>(mem_heap_alloc(heap, sizeof(doc_id_t)));
642 
643       ut_ad(!doc_id_updated);
644       ufield = update->fields + n_fields_updated;
645       fts_get_next_doc_id(table, next_doc_id);
646       doc_id = fts_update_doc_id(table, ufield, next_doc_id);
647       n_fields_updated++;
648       fts_trx_add_op(trx, table, doc_id, FTS_INSERT, nullptr);
649     } else {
650       if (doc_id_updated) {
651         ut_ad(new_doc_id);
652         fts_trx_add_op(trx, table, new_doc_id, FTS_INSERT, nullptr);
653       } else {
654         ib::error(ER_IB_MSG_956) << "FTS Doc ID must be updated"
655                                     " along with FTS indexed column for"
656                                     " table "
657                                  << table->name;
658         return (ULINT_UNDEFINED);
659       }
660     }
661   }
662 
663   update->n_fields = n_fields_updated;
664 
665   return (n_fields_updated);
666 }
667 
668 /** Set detailed error message associated with foreign key errors for
669  the given transaction. */
row_ins_set_detailed(trx_t * trx,dict_foreign_t * foreign)670 static void row_ins_set_detailed(
671     trx_t *trx,              /*!< in: transaction */
672     dict_foreign_t *foreign) /*!< in: foreign key constraint */
673 {
674   ut_ad(!srv_read_only_mode);
675 
676   mutex_enter(&srv_misc_tmpfile_mutex);
677   rewind(srv_misc_tmpfile);
678 
679   if (os_file_set_eof(srv_misc_tmpfile)) {
680     ut_print_name(srv_misc_tmpfile, trx, foreign->foreign_table_name);
681     dict_print_info_on_foreign_key_in_create_format(srv_misc_tmpfile, trx,
682                                                     foreign, FALSE);
683     trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
684   } else {
685     trx_set_detailed_error(trx, "temp file operation failed");
686   }
687 
688   mutex_exit(&srv_misc_tmpfile_mutex);
689 }
690 
691 /** Acquires dict_foreign_err_mutex, rewinds dict_foreign_err_file
692  and displays information about the given transaction.
693  The caller must release dict_foreign_err_mutex. */
row_ins_foreign_trx_print(trx_t * trx)694 static void row_ins_foreign_trx_print(trx_t *trx) /*!< in: transaction */
695 {
696   ulint n_rec_locks;
697   ulint n_trx_locks;
698   ulint heap_size;
699 
700   if (srv_read_only_mode) {
701     return;
702   }
703 
704   {
705     /** lock_number_of_rows_locked() requires global exclusive latch, and so
706     does accessing trx_locks with trx->mutex */
707     locksys::Global_exclusive_latch_guard guard{};
708     n_rec_locks = lock_number_of_rows_locked(&trx->lock);
709     n_trx_locks = UT_LIST_GET_LEN(trx->lock.trx_locks);
710     heap_size = mem_heap_get_size(trx->lock.lock_heap);
711   }
712 
713   trx_sys_mutex_enter();
714 
715   mutex_enter(&dict_foreign_err_mutex);
716   rewind(dict_foreign_err_file);
717   ut_print_timestamp(dict_foreign_err_file);
718   fputs(" Transaction:\n", dict_foreign_err_file);
719 
720   trx_print_low(dict_foreign_err_file, trx, 600, n_rec_locks, n_trx_locks,
721                 heap_size);
722 
723   trx_sys_mutex_exit();
724 
725   ut_ad(mutex_own(&dict_foreign_err_mutex));
726 }
727 
728 /** Reports a foreign key error associated with an update or a delete of a
729  parent table index entry. */
row_ins_foreign_report_err(const char * errstr,que_thr_t * thr,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)730 static void row_ins_foreign_report_err(
731     const char *errstr,      /*!< in: error string from the viewpoint
732                              of the parent table */
733     que_thr_t *thr,          /*!< in: query thread whose run_node
734                              is an update node */
735     dict_foreign_t *foreign, /*!< in: foreign key constraint */
736     const rec_t *rec,        /*!< in: a matching index record in the
737                              child table */
738     const dtuple_t *entry)   /*!< in: index entry in the parent
739                              table */
740 {
741   if (srv_read_only_mode) {
742     return;
743   }
744 
745   FILE *ef = dict_foreign_err_file;
746   trx_t *trx = thr_get_trx(thr);
747 
748   row_ins_set_detailed(trx, foreign);
749 
750   row_ins_foreign_trx_print(trx);
751 
752   fputs("Foreign key constraint fails for table ", ef);
753   ut_print_name(ef, trx, foreign->foreign_table_name);
754   fputs(":\n", ef);
755   dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign, TRUE);
756   putc('\n', ef);
757   fputs(errstr, ef);
758   fprintf(ef, " in parent table, in index %s",
759           foreign->referenced_index->name());
760   if (entry) {
761     fputs(" tuple:\n", ef);
762     dtuple_print(ef, entry);
763   }
764   fputs("\nBut in child table ", ef);
765   ut_print_name(ef, trx, foreign->foreign_table_name);
766   fprintf(ef, ", in index %s", foreign->foreign_index->name());
767   if (rec) {
768     fputs(", there is a record:\n", ef);
769     rec_print(ef, rec, foreign->foreign_index);
770   } else {
771     fputs(", the record is not available\n", ef);
772   }
773   putc('\n', ef);
774 
775   mutex_exit(&dict_foreign_err_mutex);
776 }
777 
778 /** Reports a foreign key error to dict_foreign_err_file when we are trying
779  to add an index entry to a child table. Note that the adding may be the result
780  of an update, too. */
row_ins_foreign_report_add_err(trx_t * trx,dict_foreign_t * foreign,const rec_t * rec,const dtuple_t * entry)781 static void row_ins_foreign_report_add_err(
782     trx_t *trx,              /*!< in: transaction */
783     dict_foreign_t *foreign, /*!< in: foreign key constraint */
784     const rec_t *rec,        /*!< in: a record in the parent table:
785                              it does not match entry because we
786                              have an error! */
787     const dtuple_t *entry)   /*!< in: index entry to insert in the
788                              child table */
789 {
790   if (srv_read_only_mode) {
791     return;
792   }
793 
794   FILE *ef = dict_foreign_err_file;
795 
796   row_ins_set_detailed(trx, foreign);
797 
798   row_ins_foreign_trx_print(trx);
799 
800   fputs("Foreign key constraint fails for table ", ef);
801   ut_print_name(ef, trx, foreign->foreign_table_name);
802   fputs(":\n", ef);
803   dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign, TRUE);
804   fprintf(ef, "\nTrying to add in child table, in index %s",
805           foreign->foreign_index->name());
806   if (entry) {
807     fputs(" tuple:\n", ef);
808     /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
809     It would be better to only display the user columns. */
810     dtuple_print(ef, entry);
811   }
812   fputs("\nBut in parent table ", ef);
813   ut_print_name(ef, trx, foreign->referenced_table_name);
814   fprintf(ef,
815           ", in index %s,\n"
816           "the closest match we can find is record:\n",
817           foreign->referenced_index->name());
818   if (rec && page_rec_is_supremum(rec)) {
819     /* If the cursor ended on a supremum record, it is better
820     to report the previous record in the error message, so that
821     the user gets a more descriptive error message. */
822     rec = page_rec_get_prev_const(rec);
823   }
824 
825   if (rec) {
826     rec_print(ef, rec, foreign->referenced_index);
827   }
828   putc('\n', ef);
829 
830   mutex_exit(&dict_foreign_err_mutex);
831 }
832 
833 /** Fill virtual column information in cascade node for the child table.
834 @param[in]	trx		current transaction
835 @param[out]	cascade		child update node
836 @param[in]	rec		clustered rec of child table
837 @param[in]	index		clustered index of child table
838 @param[in]	node		parent update node
839 @param[in]	foreign		foreign key information.
840 @param[out]	err		error code. */
row_ins_foreign_fill_virtual(trx_t * trx,upd_node_t * cascade,const rec_t * rec,dict_index_t * index,upd_node_t * node,dict_foreign_t * foreign,dberr_t * err)841 static void row_ins_foreign_fill_virtual(trx_t *trx, upd_node_t *cascade,
842                                          const rec_t *rec, dict_index_t *index,
843                                          upd_node_t *node,
844                                          dict_foreign_t *foreign,
845                                          dberr_t *err) {
846   row_ext_t *ext;
847   THD *thd = current_thd;
848   ulint offsets_[REC_OFFS_NORMAL_SIZE];
849   rec_offs_init(offsets_);
850   const ulint *offsets =
851       rec_get_offsets(rec, index, offsets_, ULINT_UNDEFINED, &cascade->heap);
852   mem_heap_t *v_heap = nullptr;
853   upd_t *update = cascade->update;
854   ulint n_v_fld = index->table->n_v_def;
855   ulint n_diff;
856   upd_field_t *upd_field;
857   dict_vcol_set *v_cols = foreign->v_cols;
858 
859   update->old_vrow =
860       row_build(ROW_COPY_POINTERS, index, rec, offsets, index->table, nullptr,
861                 nullptr, &ext, cascade->heap);
862 
863   n_diff = update->n_fields;
864 
865   update->n_fields += n_v_fld;
866 
867   if (index->table->vc_templ == nullptr) {
868     /** This can occur when there is a cascading
869     delete or update after restart. */
870     innobase_init_vc_templ(index->table);
871   }
872 
873   for (ulint i = 0; i < n_v_fld; i++) {
874     dict_v_col_t *col = dict_table_get_nth_v_col(index->table, i);
875     auto it = v_cols->find(col);
876 
877     if (it == v_cols->end()) {
878       continue;
879     }
880 
881     dfield_t *vfield = innobase_get_computed_value(
882         update->old_vrow, col, index, &v_heap, update->heap, nullptr, thd,
883         nullptr, nullptr, nullptr, nullptr);
884 
885     if (vfield == nullptr) {
886       *err = DB_COMPUTE_VALUE_FAILED;
887       goto func_exit;
888     }
889 
890     upd_field = upd_get_nth_field(update, n_diff);
891 
892     upd_field->old_v_val = static_cast<dfield_t *>(
893         mem_heap_alloc(cascade->heap, sizeof *upd_field->old_v_val));
894 
895     dfield_copy(upd_field->old_v_val, vfield);
896 
897     upd_field_set_v_field_no(upd_field, i, index);
898 
899     if (node->is_delete ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
900                         : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
901       dfield_set_null(&upd_field->new_val);
902     }
903 
904     if (!node->is_delete && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
905       dfield_t *new_vfield = innobase_get_computed_value(
906           update->old_vrow, col, index, &v_heap, update->heap, nullptr, thd,
907           nullptr, nullptr, node->update, foreign);
908 
909       if (new_vfield == nullptr) {
910         *err = DB_COMPUTE_VALUE_FAILED;
911         goto func_exit;
912       }
913 
914       dfield_copy(&(upd_field->new_val), new_vfield);
915     }
916 
917     n_diff++;
918   }
919 
920   update->n_fields = n_diff;
921   *err = DB_SUCCESS;
922 
923 func_exit:
924   if (v_heap) {
925     mem_heap_free(v_heap);
926   }
927 }
928 
929 /** Perform referential actions or checks when a parent row is deleted or
930  updated and the constraint had an ON DELETE or ON UPDATE condition which was
931  not RESTRICT.
932  @return DB_SUCCESS, DB_LOCK_WAIT, or error code
933  Disable inlining because of a bug in gcc8 which may lead to stack exhaustion.
934 */
NO_INLINE(warn_unused_result)935 static NO_INLINE MY_ATTRIBUTE((warn_unused_result)) dberr_t
936     row_ins_foreign_check_on_constraint(
937         que_thr_t *thr,          /*!< in: query thread whose run_node
938                                  is an update node */
939         dict_foreign_t *foreign, /*!< in: foreign key constraint whose
940                                  type is != 0 */
941         btr_pcur_t *pcur,        /*!< in: cursor placed on a matching
942                                  index record in the child table */
943         dtuple_t *entry,         /*!< in: index entry in the parent
944                                  table */
945         mtr_t *mtr)              /*!< in: mtr holding the latch of pcur
946                                  page */
947 {
948   upd_node_t *node;
949   upd_node_t *cascade;
950   dict_table_t *table = foreign->foreign_table;
951   dict_index_t *index;
952   dict_index_t *clust_index;
953   dtuple_t *ref;
954   const rec_t *rec;
955   const rec_t *clust_rec;
956   const buf_block_t *clust_block;
957   upd_t *update;
958   ulint n_to_update;
959   dberr_t err;
960   ulint i;
961   trx_t *trx;
962   mem_heap_t *tmp_heap = nullptr;
963   doc_id_t doc_id = FTS_NULL_DOC_ID;
964   ibool fts_col_affacted = FALSE;
965 
966   DBUG_TRACE;
967   ut_a(thr);
968   ut_a(foreign);
969   ut_a(pcur);
970   ut_a(mtr);
971 
972   trx = thr_get_trx(thr);
973 
974   node = static_cast<upd_node_t *>(thr->run_node);
975 
976   if (node->is_delete &&
977       0 == (foreign->type & (DICT_FOREIGN_ON_DELETE_CASCADE |
978                              DICT_FOREIGN_ON_DELETE_SET_NULL))) {
979     row_ins_foreign_report_err("Trying to delete", thr, foreign,
980                                btr_pcur_get_rec(pcur), entry);
981 
982     return DB_ROW_IS_REFERENCED;
983   }
984 
985   if (!node->is_delete &&
986       0 == (foreign->type & (DICT_FOREIGN_ON_UPDATE_CASCADE |
987                              DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
988     /* This is an UPDATE */
989 
990     row_ins_foreign_report_err("Trying to update", thr, foreign,
991                                btr_pcur_get_rec(pcur), entry);
992 
993     return DB_ROW_IS_REFERENCED;
994   }
995 
996   if (node->cascade_node == nullptr) {
997     node->cascade_heap = mem_heap_create(128);
998     node->cascade_node =
999         row_create_update_node_for_mysql(table, node->cascade_heap);
1000     que_node_set_parent(node->cascade_node, node);
1001   }
1002   cascade = node->cascade_node;
1003   cascade->table = table;
1004   cascade->foreign = foreign;
1005 
1006   if (node->is_delete && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
1007     cascade->is_delete = TRUE;
1008   } else {
1009     cascade->is_delete = FALSE;
1010 
1011     if (foreign->n_fields > cascade->update_n_fields) {
1012       /* We have to make the update vector longer */
1013 
1014       cascade->update = upd_create(foreign->n_fields, node->cascade_heap);
1015       cascade->update_n_fields = foreign->n_fields;
1016     }
1017   }
1018 
1019   /* We do not allow cyclic cascaded updating (DELETE is allowed,
1020   but not UPDATE) of the same table, as this can lead to an infinite
1021   cycle. Check that we are not updating the same table which is
1022   already being modified in this cascade chain. We have to check
1023   this also because the modification of the indexes of a 'parent'
1024   table may still be incomplete, and we must avoid seeing the indexes
1025   of the parent table in an inconsistent state! */
1026 
1027   if (!cascade->is_delete &&
1028       row_ins_cascade_ancestor_updates_table(cascade, table)) {
1029     /* We do not know if this would break foreign key
1030     constraints, but play safe and return an error */
1031 
1032     err = DB_ROW_IS_REFERENCED;
1033 
1034     row_ins_foreign_report_err(
1035         "Trying an update, possibly causing a cyclic"
1036         " cascaded update\n"
1037         "in the child table,",
1038         thr, foreign, btr_pcur_get_rec(pcur), entry);
1039 
1040     goto nonstandard_exit_func;
1041   }
1042 
1043   if (row_ins_cascade_n_ancestors(cascade) >= FK_MAX_CASCADE_DEL) {
1044     err = DB_FOREIGN_EXCEED_MAX_CASCADE;
1045 
1046     row_ins_foreign_report_err("Trying a too deep cascaded delete or update\n",
1047                                thr, foreign, btr_pcur_get_rec(pcur), entry);
1048 
1049     goto nonstandard_exit_func;
1050   }
1051 
1052   index = btr_pcur_get_btr_cur(pcur)->index;
1053 
1054   ut_a(index == foreign->foreign_index);
1055 
1056   rec = btr_pcur_get_rec(pcur);
1057 
1058   tmp_heap = mem_heap_create(256);
1059 
1060   if (index->is_clustered()) {
1061     /* pcur is already positioned in the clustered index of
1062     the child table */
1063 
1064     clust_index = index;
1065     clust_rec = rec;
1066     clust_block = btr_pcur_get_block(pcur);
1067   } else {
1068     /* We have to look for the record in the clustered index
1069     in the child table */
1070 
1071     clust_index = table->first_index();
1072 
1073     ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec, tmp_heap);
1074     btr_pcur_open_with_no_init(clust_index, ref, PAGE_CUR_LE, BTR_SEARCH_LEAF,
1075                                cascade->pcur, 0, mtr);
1076 
1077     clust_rec = btr_pcur_get_rec(cascade->pcur);
1078     clust_block = btr_pcur_get_block(cascade->pcur);
1079 
1080     if (!page_rec_is_user_rec(clust_rec) ||
1081         btr_pcur_get_low_match(cascade->pcur) <
1082             dict_index_get_n_unique(clust_index)) {
1083       ib::error(ER_IB_MSG_957)
1084           << "In cascade of a foreign key op index " << index->name
1085           << " of table " << index->table->name;
1086 
1087       fputs("InnoDB: record ", stderr);
1088       rec_print(stderr, rec, index);
1089       fputs(
1090           "\n"
1091           "InnoDB: clustered record ",
1092           stderr);
1093       rec_print(stderr, clust_rec, clust_index);
1094       fputs(
1095           "\n"
1096           "InnoDB: Submit a detailed bug report to"
1097           " http://bugs.mysql.com\n",
1098           stderr);
1099       ut_ad(0);
1100       err = DB_SUCCESS;
1101 
1102       goto nonstandard_exit_func;
1103     }
1104   }
1105 
1106   /* Set an X-lock on the row to delete or update in the child table */
1107 
1108   err = lock_table(0, table, LOCK_IX, thr);
1109 
1110   if (err == DB_SUCCESS) {
1111     /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
1112     we already have a normal shared lock on the appropriate
1113     gap if the search criterion was not unique */
1114 
1115     err = lock_clust_rec_read_check_and_lock_alt(
1116         clust_block, clust_rec, clust_index, LOCK_X, LOCK_REC_NOT_GAP, thr);
1117   }
1118 
1119   if (err != DB_SUCCESS) {
1120     goto nonstandard_exit_func;
1121   }
1122 
1123   if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
1124     /* This can happen if there is a circular reference of
1125     rows such that cascading delete comes to delete a row
1126     already in the process of being delete marked */
1127     err = DB_SUCCESS;
1128 
1129     goto nonstandard_exit_func;
1130   }
1131 
1132   if (table->fts) {
1133     doc_id = fts_get_doc_id_from_rec(table, clust_rec, clust_index, tmp_heap);
1134   }
1135   if (cascade->is_delete && foreign->v_cols != nullptr &&
1136       foreign->v_cols->size() > 0 && table->vc_templ == nullptr) {
1137     innobase_init_vc_templ(table);
1138   }
1139 
1140   if (node->is_delete ? (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL)
1141                       : (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL)) {
1142     /* Build the appropriate update vector which sets
1143     foreign->n_fields first fields in rec to SQL NULL */
1144 
1145     update = cascade->update;
1146 
1147     update->info_bits = 0;
1148     update->n_fields = foreign->n_fields;
1149     UNIV_MEM_INVALID(update->fields, update->n_fields * sizeof *update->fields);
1150 
1151     for (i = 0; i < foreign->n_fields; i++) {
1152       upd_field_t *ufield = &update->fields[i];
1153       ulint col_no = index->get_col_no(i);
1154 
1155       ufield->field_no = dict_table_get_nth_col_pos(table, col_no);
1156       dict_col_t *col = table->get_col(col_no);
1157       col->copy_type(dfield_get_type(&ufield->new_val));
1158 
1159       ufield->orig_len = 0;
1160       ufield->exp = nullptr;
1161       dfield_set_null(&ufield->new_val);
1162 
1163       if (table->fts &&
1164           dict_table_is_fts_column(table->fts->indexes, index->get_col_no(i),
1165                                    index->get_col(i)->is_virtual()) !=
1166               ULINT_UNDEFINED) {
1167         fts_col_affacted = TRUE;
1168       }
1169     }
1170 
1171     if (fts_col_affacted) {
1172       fts_trx_add_op(trx, table, doc_id, FTS_DELETE, nullptr);
1173     }
1174 
1175     if (foreign->v_cols != nullptr && foreign->v_cols->size() > 0) {
1176       row_ins_foreign_fill_virtual(trx, cascade, clust_rec, clust_index, node,
1177                                    foreign, &err);
1178 
1179       if (err != DB_SUCCESS) {
1180         goto nonstandard_exit_func;
1181       }
1182     }
1183 
1184   } else if (table->fts && cascade->is_delete) {
1185     /* DICT_FOREIGN_ON_DELETE_CASCADE case */
1186     for (i = 0; i < foreign->n_fields; i++) {
1187       if (table->fts &&
1188           dict_table_is_fts_column(table->fts->indexes, index->get_col_no(i),
1189                                    index->get_col(i)->is_virtual()) !=
1190               ULINT_UNDEFINED) {
1191         fts_col_affacted = TRUE;
1192       }
1193     }
1194 
1195     if (fts_col_affacted) {
1196       fts_trx_add_op(trx, table, doc_id, FTS_DELETE, nullptr);
1197     }
1198   }
1199 
1200   if (!node->is_delete && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
1201     /* Build the appropriate update vector which sets changing
1202     foreign->n_fields first fields in rec to new values */
1203 
1204     n_to_update = row_ins_cascade_calc_update_vec(node, foreign, tmp_heap, trx,
1205                                                   &fts_col_affacted);
1206 
1207     if (foreign->v_cols != nullptr && foreign->v_cols->size() > 0) {
1208       row_ins_foreign_fill_virtual(trx, cascade, clust_rec, clust_index, node,
1209                                    foreign, &err);
1210 
1211       if (err != DB_SUCCESS) {
1212         goto nonstandard_exit_func;
1213       }
1214     }
1215 
1216     if (n_to_update == ULINT_UNDEFINED) {
1217       err = DB_ROW_IS_REFERENCED;
1218 
1219       row_ins_foreign_report_err(
1220           "Trying a cascaded update where the"
1221           " updated value in the child\n"
1222           "table would not fit in the length"
1223           " of the column, or the value would\n"
1224           "be NULL and the column is"
1225           " declared as not NULL in the child table,",
1226           thr, foreign, btr_pcur_get_rec(pcur), entry);
1227 
1228       goto nonstandard_exit_func;
1229     }
1230 
1231     if (cascade->update->n_fields == 0) {
1232       /* The update does not change any columns referred
1233       to in this foreign key constraint: no need to do
1234       anything */
1235 
1236       err = DB_SUCCESS;
1237 
1238       goto nonstandard_exit_func;
1239     }
1240 
1241     /* Mark the old Doc ID as deleted */
1242     if (fts_col_affacted) {
1243       ut_ad(table->fts);
1244       fts_trx_add_op(trx, table, doc_id, FTS_DELETE, nullptr);
1245     }
1246   }
1247 
1248   /* Store pcur position and initialize or store the cascade node
1249   pcur stored position */
1250 
1251   btr_pcur_store_position(pcur, mtr);
1252 
1253   if (index == clust_index) {
1254     btr_pcur_copy_stored_position(cascade->pcur, pcur);
1255   } else {
1256     btr_pcur_store_position(cascade->pcur, mtr);
1257   }
1258 
1259   mtr_commit(mtr);
1260 
1261   ut_a(cascade->pcur->m_rel_pos == BTR_PCUR_ON);
1262 
1263   cascade->state = UPD_NODE_UPDATE_CLUSTERED;
1264 
1265   err = row_update_cascade_for_mysql(thr, cascade, foreign->foreign_table);
1266 
1267   /* Release the data dictionary latch for a while, so that we do not
1268   starve other threads from doing CREATE TABLE etc. if we have a huge
1269   cascaded operation running. The counter n_foreign_key_checks_running
1270   will prevent other users from dropping or ALTERing the table when we
1271   release the latch. */
1272 
1273   DEBUG_SYNC_C("innodb_dml_cascade_dict_unfreeze");
1274 
1275   mtr_start(mtr);
1276 
1277   /* Restore pcur position */
1278 
1279   btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1280 
1281   if (tmp_heap) {
1282     mem_heap_free(tmp_heap);
1283   }
1284 
1285   return err;
1286 
1287 nonstandard_exit_func:
1288 
1289   if (tmp_heap) {
1290     mem_heap_free(tmp_heap);
1291   }
1292 
1293   btr_pcur_store_position(pcur, mtr);
1294 
1295   mtr_commit(mtr);
1296   mtr_start(mtr);
1297 
1298   btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
1299 
1300   return err;
1301 }
1302 
1303 /** Sets a lock on a record. Used in locking possible duplicate key
1304  records and also in checking foreign key constraints.
1305 @param[in]	mode	requested lock type: LOCK_S or LOCK_X mode
1306 @param[in]	type	LOCK_ORDINARY, LOCK_GAP, or LOCK_REC_NOT_GAP type lock
1307 @param[in]	block	buffer block of rec
1308 @param[in]	rec	record
1309 @param[in]	index	index
1310 @param[in]	offsets	rec_get_offsets(rec, index)
1311 @param[in]	thr	query thread
1312 @return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
row_ins_set_rec_lock(lock_mode mode,ulint type,const buf_block_t * block,const rec_t * rec,dict_index_t * index,const ulint * offsets,que_thr_t * thr)1313 static dberr_t row_ins_set_rec_lock(lock_mode mode, ulint type,
1314                                     const buf_block_t *block, const rec_t *rec,
1315                                     dict_index_t *index, const ulint *offsets,
1316                                     que_thr_t *thr) {
1317   dberr_t err;
1318 
1319   ut_ad(rec_offs_validate(rec, index, offsets));
1320 
1321   if (index->is_clustered()) {
1322     err = lock_clust_rec_read_check_and_lock(
1323         lock_duration_t::AT_LEAST_STATEMENT, block, rec, index, offsets,
1324         SELECT_ORDINARY, mode, type, thr);
1325   } else {
1326     err = lock_sec_rec_read_check_and_lock(lock_duration_t::AT_LEAST_STATEMENT,
1327                                            block, rec, index, offsets,
1328                                            SELECT_ORDINARY, mode, type, thr);
1329   }
1330 
1331   return (err);
1332 }
1333 
1334 /* Decrement a counter in the destructor. */
1335 class ib_dec_in_dtor {
1336  public:
ib_dec_in_dtor(ulint & c)1337   ib_dec_in_dtor(ulint &c) : counter(c) {}
~ib_dec_in_dtor()1338   ~ib_dec_in_dtor() { os_atomic_decrement_ulint(&counter, 1); }
1339 
1340  private:
1341   ulint &counter;
1342 };
1343 
1344 /** Checks if foreign key constraint fails for an index entry. Sets shared locks
1345  which lock either the success or the failure of the constraint. NOTE that
1346  the caller must have a shared latch on dict_operation_lock.
1347  @return DB_SUCCESS, DB_NO_REFERENCED_ROW, or DB_ROW_IS_REFERENCED */
row_ins_check_foreign_constraint(ibool check_ref,dict_foreign_t * foreign,dict_table_t * table,dtuple_t * entry,que_thr_t * thr)1348 dberr_t row_ins_check_foreign_constraint(
1349     ibool check_ref,         /*!< in: TRUE if we want to check that
1350                            the referenced table is ok, FALSE if we
1351                            want to check the foreign key table */
1352     dict_foreign_t *foreign, /*!< in: foreign constraint; NOTE that the
1353                              tables mentioned in it must be in the
1354                              dictionary cache if they exist at all */
1355     dict_table_t *table,     /*!< in: if check_ref is TRUE, then the foreign
1356                              table, else the referenced table */
1357     dtuple_t *entry,         /*!< in: index entry for index */
1358     que_thr_t *thr)          /*!< in: query thread */
1359 {
1360   dberr_t err;
1361   upd_node_t *upd_node;
1362   dict_table_t *check_table = nullptr;
1363   dict_index_t *check_index = nullptr;
1364   ulint n_fields_cmp;
1365   btr_pcur_t pcur;
1366   int cmp;
1367   mtr_t mtr;
1368   trx_t *trx = thr_get_trx(thr);
1369   mem_heap_t *heap = nullptr;
1370   ulint offsets_[REC_OFFS_NORMAL_SIZE];
1371   ulint *offsets = offsets_;
1372 
1373   bool skip_gap_lock;
1374   MDL_ticket *mdl = nullptr;
1375   THD *thd = current_thd;
1376   bool tmp_open = false;
1377   dict_foreign_t *tmp_foreign = nullptr;
1378 
1379   /* GAP locks are not needed on DD tables because serializability between
1380   different DDL statements is achieved using metadata locks. So no concurrent
1381   changes to DD tables when MDL is taken. */
1382   skip_gap_lock = (trx->isolation_level <= TRX_ISO_READ_COMMITTED) ||
1383                   table->skip_gap_locks();
1384 
1385   DBUG_TRACE;
1386 
1387   if (dict_sys_t::is_dd_table_id(table->id)) {
1388     return DB_SUCCESS;
1389   }
1390 
1391   rec_offs_init(offsets_);
1392 
1393   err = DB_SUCCESS;
1394 
1395   if (trx->check_foreigns == FALSE) {
1396     /* The user has suppressed foreign key checks currently for
1397     this session */
1398     goto exit_func;
1399   }
1400 
1401   /* If any of the foreign key fields in entry is SQL NULL, we
1402   suppress the foreign key check: this is compatible with Oracle,
1403   for example */
1404   for (ulint i = 0; i < foreign->n_fields; i++) {
1405     if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1406       goto exit_func;
1407     }
1408   }
1409 
1410   if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
1411     upd_node = static_cast<upd_node_t *>(thr->run_node);
1412 
1413     if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
1414       /* If a cascaded update is done as defined by a
1415       foreign key constraint, do not check that
1416       constraint for the child row. In ON UPDATE CASCADE
1417       the update of the parent row is only half done when
1418       we come here: if we would check the constraint here
1419       for the child row it would fail.
1420 
1421       A QUESTION remains: if in the child table there are
1422       several constraints which refer to the same parent
1423       table, we should merge all updates to the child as
1424       one update? And the updates can be contradictory!
1425       Currently we just perform the update associated
1426       with each foreign key constraint, one after
1427       another, and the user has problems predicting in
1428       which order they are performed. */
1429 
1430       goto exit_func;
1431     }
1432   }
1433 
1434   if (check_ref) {
1435     check_table = foreign->referenced_table;
1436     check_index = foreign->referenced_index;
1437 
1438     /* TODO: NewDD: Need to wait for WL#6049, as current
1439     referenced table does have has any info of any FK
1440     referencing to it */
1441     if (check_table == nullptr && foreign->referenced_table_name_lookup) {
1442       ut_ad(check_index == nullptr);
1443 
1444       mutex_enter(&dict_sys->mutex);
1445       check_table = dd_table_open_on_name(thd, &mdl,
1446                                           foreign->referenced_table_name_lookup,
1447                                           true, DICT_ERR_IGNORE_NONE);
1448 
1449       if (check_table) {
1450         check_index = dict_foreign_find_index(
1451             check_table, nullptr, foreign->referenced_col_names,
1452             foreign->n_fields, foreign->foreign_index, false, false);
1453 
1454         if (!check_index) {
1455           dd_table_close(check_table, thd, &mdl, true);
1456           check_table = nullptr;
1457         }
1458       }
1459 
1460       if (check_table && check_index) {
1461         tmp_foreign = static_cast<dict_foreign_t *>(
1462             ut_malloc_nokey(sizeof(dict_foreign_t)));
1463 
1464         memcpy(tmp_foreign, foreign, sizeof(*foreign));
1465 
1466         foreign = tmp_foreign;
1467         foreign->referenced_table = check_table;
1468         foreign->referenced_index = check_index;
1469         tmp_open = true;
1470       }
1471       mutex_exit(&dict_sys->mutex);
1472     }
1473   } else {
1474     check_table = foreign->foreign_table;
1475     check_index = foreign->foreign_index;
1476   }
1477 
1478   if (check_table == nullptr || check_table->ibd_file_missing ||
1479       check_index == nullptr) {
1480     if (!srv_read_only_mode && check_ref) {
1481       FILE *ef = dict_foreign_err_file;
1482 
1483       row_ins_set_detailed(trx, foreign);
1484 
1485       row_ins_foreign_trx_print(trx);
1486 
1487       fputs("Foreign key constraint fails for table ", ef);
1488       ut_print_name(ef, trx, foreign->foreign_table_name);
1489       fputs(":\n", ef);
1490       dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign, TRUE);
1491       fprintf(ef, "\nTrying to add to index %s tuple:\n",
1492               foreign->foreign_index->name());
1493       dtuple_print(ef, entry);
1494       fputs("\nBut the parent table ", ef);
1495       ut_print_name(ef, trx, foreign->referenced_table_name);
1496       fputs(
1497           "\nor its .ibd file does"
1498           " not currently exist!\n",
1499           ef);
1500       mutex_exit(&dict_foreign_err_mutex);
1501 
1502       err = DB_NO_REFERENCED_ROW;
1503     }
1504 
1505     goto exit_func;
1506   }
1507 
1508   if (check_table != table) {
1509     /* We already have a LOCK_IX on table, but not necessarily
1510     on check_table */
1511 
1512     err = lock_table(0, check_table, LOCK_IS, thr);
1513 
1514     if (err != DB_SUCCESS) {
1515       goto do_possible_lock_wait;
1516     }
1517   }
1518 
1519   mtr_start(&mtr);
1520 
1521   /* Store old value on n_fields_cmp */
1522 
1523   n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1524 
1525   dtuple_set_n_fields_cmp(entry, foreign->n_fields);
1526 
1527   btr_pcur_open(check_index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
1528 
1529   /* Scan index records and check if there is a matching record */
1530 
1531   do {
1532     const rec_t *rec = btr_pcur_get_rec(&pcur);
1533     const buf_block_t *block = btr_pcur_get_block(&pcur);
1534 
1535     if (page_rec_is_infimum(rec)) {
1536       continue;
1537     }
1538 
1539     offsets =
1540         rec_get_offsets(rec, check_index, offsets, ULINT_UNDEFINED, &heap);
1541 
1542     if (page_rec_is_supremum(rec)) {
1543       if (skip_gap_lock) {
1544         continue;
1545       }
1546 
1547       err = row_ins_set_rec_lock(LOCK_S, LOCK_ORDINARY, block, rec, check_index,
1548                                  offsets, thr);
1549       switch (err) {
1550         case DB_SUCCESS_LOCKED_REC:
1551         case DB_SUCCESS:
1552           continue;
1553         default:
1554           goto end_scan;
1555       }
1556     }
1557 
1558     cmp = cmp_dtuple_rec(entry, rec, check_index, offsets);
1559 
1560     if (cmp == 0) {
1561       ulint lock_type;
1562 
1563       lock_type = skip_gap_lock ? LOCK_REC_NOT_GAP : LOCK_ORDINARY;
1564 
1565       if (rec_get_deleted_flag(rec, rec_offs_comp(offsets))) {
1566         err = row_ins_set_rec_lock(LOCK_S, lock_type, block, rec, check_index,
1567                                    offsets, thr);
1568         switch (err) {
1569           case DB_SUCCESS_LOCKED_REC:
1570           case DB_SUCCESS:
1571             break;
1572           default:
1573             goto end_scan;
1574         }
1575       } else {
1576         /* Found a matching record. Lock only
1577         a record because we can allow inserts
1578         into gaps */
1579 
1580         err = row_ins_set_rec_lock(LOCK_S, LOCK_REC_NOT_GAP, block, rec,
1581                                    check_index, offsets, thr);
1582 
1583         switch (err) {
1584           case DB_SUCCESS_LOCKED_REC:
1585           case DB_SUCCESS:
1586             break;
1587           default:
1588             goto end_scan;
1589         }
1590 
1591         if (check_ref) {
1592           err = DB_SUCCESS;
1593 
1594           goto end_scan;
1595         } else if (foreign->type != 0) {
1596           /* There is an ON UPDATE or ON DELETE
1597           condition: check them in a separate
1598           function */
1599 
1600           err = row_ins_foreign_check_on_constraint(thr, foreign, &pcur, entry,
1601                                                     &mtr);
1602           if (err != DB_SUCCESS) {
1603             /* Since reporting a plain
1604             "duplicate key" error
1605             message to the user in
1606             cases where a long CASCADE
1607             operation would lead to a
1608             duplicate key in some
1609             other table is very
1610             confusing, map duplicate
1611             key errors resulting from
1612             FK constraints to a
1613             separate error code. */
1614 
1615             if (err == DB_DUPLICATE_KEY) {
1616               err = DB_FOREIGN_DUPLICATE_KEY;
1617             }
1618 
1619             goto end_scan;
1620           }
1621 
1622           /* row_ins_foreign_check_on_constraint
1623           may have repositioned pcur on a
1624           different block */
1625           block = btr_pcur_get_block(&pcur);
1626         } else {
1627           row_ins_foreign_report_err("Trying to delete or update", thr, foreign,
1628                                      rec, entry);
1629 
1630           err = DB_ROW_IS_REFERENCED;
1631           goto end_scan;
1632         }
1633       }
1634     } else {
1635       ut_a(cmp < 0);
1636 
1637       err = DB_SUCCESS;
1638 
1639       if (!skip_gap_lock) {
1640         err = row_ins_set_rec_lock(LOCK_S, LOCK_GAP, block, rec, check_index,
1641                                    offsets, thr);
1642       }
1643 
1644       switch (err) {
1645         case DB_SUCCESS_LOCKED_REC:
1646         case DB_SUCCESS:
1647           if (check_ref) {
1648             err = DB_NO_REFERENCED_ROW;
1649             row_ins_foreign_report_add_err(trx, foreign, rec, entry);
1650           } else {
1651             err = DB_SUCCESS;
1652           }
1653         default:
1654           break;
1655       }
1656 
1657       goto end_scan;
1658     }
1659   } while (btr_pcur_move_to_next(&pcur, &mtr));
1660 
1661   if (check_ref) {
1662     row_ins_foreign_report_add_err(trx, foreign, btr_pcur_get_rec(&pcur),
1663                                    entry);
1664     err = DB_NO_REFERENCED_ROW;
1665   } else {
1666     err = DB_SUCCESS;
1667   }
1668 
1669 end_scan:
1670   btr_pcur_close(&pcur);
1671 
1672   mtr_commit(&mtr);
1673 
1674   /* Restore old value */
1675   dtuple_set_n_fields_cmp(entry, n_fields_cmp);
1676 
1677 do_possible_lock_wait:
1678   if (err == DB_LOCK_WAIT) {
1679     /* An object that will correctly decrement the FK check counter
1680     when it goes out of this scope. */
1681     ib_dec_in_dtor dec(check_table->n_foreign_key_checks_running);
1682 
1683     trx->error_state = err;
1684 
1685     que_thr_stop_for_mysql(thr);
1686 
1687     thr->lock_state = QUE_THR_LOCK_ROW;
1688 
1689     /* To avoid check_table being dropped, increment counter */
1690     os_atomic_increment_ulint(&check_table->n_foreign_key_checks_running, 1);
1691 
1692     trx_kill_blocking(trx);
1693 
1694     lock_wait_suspend_thread(thr);
1695 
1696     if (trx->error_state != DB_SUCCESS) {
1697       err = trx->error_state;
1698       goto exit_func;
1699     }
1700 
1701     thr->lock_state = QUE_THR_LOCK_NOLOCK;
1702 
1703     DBUG_PRINT("to_be_dropped", ("table: %s", check_table->name.m_name));
1704     if (check_table->to_be_dropped) {
1705       /* The table is being dropped. We shall timeout
1706       this operation */
1707       err = DB_LOCK_WAIT_TIMEOUT;
1708 
1709       goto exit_func;
1710     }
1711   }
1712 
1713 exit_func:
1714   if (heap != nullptr) {
1715     mem_heap_free(heap);
1716   }
1717 
1718   /* TODO: NewDD: Remove this after WL#6049 */
1719   if (tmp_open) {
1720     mutex_enter(&dict_sys->mutex);
1721     dd_table_close(check_table, thd, &mdl, true);
1722     ut_free(tmp_foreign);
1723     mutex_exit(&dict_sys->mutex);
1724   }
1725 
1726   return err;
1727 }
1728 
1729 /** Checks if foreign key constraints fail for an index entry. If index
1730  is not mentioned in any constraint, this function does nothing,
1731  Otherwise does searches to the indexes of referenced tables and
1732  sets shared locks which lock either the success or the failure of
1733  a constraint.
1734  @return DB_SUCCESS or error code */
1735 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_check_foreign_constraints(dict_table_t * table,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)1736     row_ins_check_foreign_constraints(
1737         dict_table_t *table, /*!< in: table */
1738         dict_index_t *index, /*!< in: index */
1739         dtuple_t *entry,     /*!< in: index entry for index */
1740         que_thr_t *thr)      /*!< in: query thread */
1741 {
1742   dict_foreign_t *foreign;
1743   dberr_t err;
1744   trx_t *trx;
1745 
1746   /* Temporarily skip the FK check for DD tables */
1747   if (dict_sys_t::is_dd_table_id(table->id)) {
1748     return (DB_SUCCESS);
1749   }
1750 
1751   trx = thr_get_trx(thr);
1752 
1753   if (trx->check_foreigns == FALSE) {
1754     return (DB_SUCCESS);
1755   }
1756   DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
1757                       "foreign_constraint_check_for_ins");
1758 
1759   for (dict_foreign_set::iterator it = table->foreign_set.begin();
1760        it != table->foreign_set.end(); ++it) {
1761     foreign = *it;
1762 
1763     if (foreign->foreign_index == index) {
1764       dict_table_t *ref_table = nullptr;
1765       dict_table_t *foreign_table = foreign->foreign_table;
1766       dict_table_t *referenced_table = foreign->referenced_table;
1767       MDL_ticket *mdl = nullptr;
1768 
1769       if (referenced_table == nullptr) {
1770         ref_table = dd_table_open_on_name(trx->mysql_thd, &mdl,
1771                                           foreign->referenced_table_name_lookup,
1772                                           false, DICT_ERR_IGNORE_NONE);
1773       }
1774 
1775       if (referenced_table) {
1776         os_atomic_increment_ulint(&foreign_table->n_foreign_key_checks_running,
1777                                   1);
1778       }
1779 
1780       /* NOTE that if the thread ends up waiting for a lock
1781       we will release dict_operation_lock temporarily!
1782       But the counter on the table protects the referenced
1783       table from being dropped while the check is running. */
1784 
1785       err = row_ins_check_foreign_constraint(TRUE, foreign, table, entry, thr);
1786 
1787       if (referenced_table) {
1788         os_atomic_decrement_ulint(&foreign_table->n_foreign_key_checks_running,
1789                                   1);
1790       }
1791       if (ref_table != nullptr) {
1792         dd_table_close(ref_table, trx->mysql_thd, &mdl, false);
1793       }
1794 
1795       if (err != DB_SUCCESS) {
1796         return (err);
1797       }
1798     }
1799   }
1800 
1801   return (DB_SUCCESS);
1802 }
1803 
1804 /** Checks if a unique key violation to rec would occur at the index entry
1805  insert.
1806  @return true if error */
row_ins_dupl_error_with_rec(const rec_t * rec,const dtuple_t * entry,dict_index_t * index,const ulint * offsets)1807 static ibool row_ins_dupl_error_with_rec(
1808     const rec_t *rec,      /*!< in: user record; NOTE that we assume
1809                            that the caller already has a record lock on
1810                            the record! */
1811     const dtuple_t *entry, /*!< in: entry to insert */
1812     dict_index_t *index,   /*!< in: index */
1813     const ulint *offsets)  /*!< in: rec_get_offsets(rec, index) */
1814 {
1815   ulint matched_fields;
1816   ulint n_unique;
1817   ulint i;
1818 
1819   ut_ad(rec_offs_validate(rec, index, offsets));
1820 
1821   n_unique = dict_index_get_n_unique(index);
1822 
1823   matched_fields = 0;
1824 
1825   entry->compare(rec, index, offsets, &matched_fields);
1826 
1827   if (matched_fields < n_unique) {
1828     return (FALSE);
1829   }
1830 
1831   /* In a unique secondary index we allow equal key values if they
1832   contain SQL NULLs */
1833 
1834   if (!index->is_clustered() && !index->nulls_equal) {
1835     for (i = 0; i < n_unique; i++) {
1836       if (dfield_is_null(dtuple_get_nth_field(entry, i))) {
1837         return (FALSE);
1838       }
1839     }
1840   }
1841 
1842   return (!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
1843 }
1844 
1845 /** Determines if the query is REPLACE or ON DUPLICATE KEY UPDATE in which case
1846 duplicate values should be allowed (and further processed) instead of causing
1847 an error
1848 @param thr The query thread running the query
1849 @return true iff duplicated values should be allowed */
row_allow_duplicates(que_thr_t * thr)1850 static bool row_allow_duplicates(que_thr_t *thr) {
1851   return (thr->prebuilt && thr->prebuilt->allow_duplicates());
1852 }
1853 
1854 /** Scans a unique non-clustered index at a given index entry to determine
1855  whether a uniqueness violation has occurred for the key value of the entry.
1856  Set shared locks on possible duplicate records.
1857  @return DB_SUCCESS, DB_DUPLICATE_KEY, or DB_LOCK_WAIT */
1858 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_scan_sec_index_for_duplicate(ulint flags,dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool s_latch,mtr_t * mtr,mem_heap_t * offsets_heap)1859     row_ins_scan_sec_index_for_duplicate(
1860         ulint flags,         /*!< in: undo logging and locking flags */
1861         dict_index_t *index, /*!< in: non-clustered unique index */
1862         dtuple_t *entry,     /*!< in: index entry */
1863         que_thr_t *thr,      /*!< in: query thread */
1864         bool s_latch,        /*!< in: whether index->lock is being held */
1865         mtr_t *mtr,          /*!< in/out: mini-transaction */
1866         mem_heap_t *offsets_heap)
1867 /*!< in/out: memory heap that can be emptied */
1868 {
1869   ulint n_unique;
1870   int cmp;
1871   ulint n_fields_cmp;
1872   btr_pcur_t pcur;
1873   dberr_t err = DB_SUCCESS;
1874   ulint allow_duplicates;
1875   ulint *offsets = nullptr;
1876   DBUG_TRACE;
1877 
1878   ut_ad(s_latch ==
1879         rw_lock_own_flagged(&index->lock, RW_LOCK_FLAG_S | RW_LOCK_FLAG_SX));
1880 
1881   n_unique = dict_index_get_n_unique(index);
1882 
1883   /* If the secondary index is unique, but one of the fields in the
1884   n_unique first fields is NULL, a unique key violation cannot occur,
1885   since we define NULL != NULL in this case */
1886 
1887   if (!index->nulls_equal) {
1888     for (ulint i = 0; i < n_unique; i++) {
1889       if (UNIV_SQL_NULL == dfield_get_len(dtuple_get_nth_field(entry, i))) {
1890         return DB_SUCCESS;
1891       }
1892     }
1893   }
1894 
1895   /* Store old value on n_fields_cmp */
1896 
1897   n_fields_cmp = dtuple_get_n_fields_cmp(entry);
1898 
1899   dtuple_set_n_fields_cmp(entry, n_unique);
1900 
1901   btr_pcur_open(
1902       index, entry, PAGE_CUR_GE,
1903       s_latch ? BTR_SEARCH_LEAF | BTR_ALREADY_S_LATCHED : BTR_SEARCH_LEAF,
1904       &pcur, mtr);
1905   allow_duplicates = row_allow_duplicates(thr);
1906 
1907   /* Scan index records and check if there is a duplicate */
1908 
1909   do {
1910     const rec_t *rec = btr_pcur_get_rec(&pcur);
1911     const buf_block_t *block = btr_pcur_get_block(&pcur);
1912 
1913     /* For DD tables, We don't use next-key locking for duplicates
1914     found. This means it is possible for another transaction to
1915     insert a duplicate key value but MDL protection on DD tables
1916     will prevent insertion of duplicates into unique secondary indexes*/
1917     const ulint lock_type =
1918         index->table->skip_gap_locks() ? LOCK_REC_NOT_GAP : LOCK_ORDINARY;
1919 
1920     if (page_rec_is_infimum(rec)) {
1921       continue;
1922     }
1923 
1924     offsets =
1925         rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &offsets_heap);
1926 
1927     if (flags & BTR_NO_LOCKING_FLAG) {
1928       /* Set no locks when applying log
1929       in online table rebuild. */
1930     } else if (allow_duplicates) {
1931 #if 0  // TODO: Enable this assert after WL#9509. REPLACE will not be allowed on
1932        // DD tables
1933 			/* This assert means DD tables should not use REPLACE
1934 			or INSERT INTO table.. ON DUPLCIATE KEY */
1935 			ut_ad(!index->table->is_dd_table);
1936 #endif
1937 
1938 #if 1  // TODO: Remove this code after WL#9509. REPLACE will not be allowed on
1939        // DD tables
1940       if (index->table->skip_gap_locks()) {
1941         /* Only GAP lock is possible on supremum. */
1942         if (page_rec_is_supremum(rec)) {
1943           continue;
1944         }
1945       }
1946 #endif
1947 
1948       /* If the SQL-query will update or replace duplicate key we will take
1949       X-lock for duplicates ( REPLACE, LOAD DATAFILE REPLACE, INSERT ON
1950       DUPLICATE KEY UPDATE). */
1951       err = row_ins_set_rec_lock(LOCK_X, lock_type, block, rec, index, offsets,
1952                                  thr);
1953     } else {
1954       if (index->table->skip_gap_locks()) {
1955         /* Only GAP lock is possible on supremum. */
1956         if (page_rec_is_supremum(rec)) {
1957           continue;
1958         }
1959 
1960         if (cmp_dtuple_rec(entry, rec, index, offsets) < 0) {
1961           goto end_scan;
1962         }
1963       }
1964 
1965       err = row_ins_set_rec_lock(LOCK_S, lock_type, block, rec, index, offsets,
1966                                  thr);
1967     }
1968 
1969     switch (err) {
1970       case DB_SUCCESS_LOCKED_REC:
1971         err = DB_SUCCESS;
1972       case DB_SUCCESS:
1973         break;
1974       default:
1975         goto end_scan;
1976     }
1977 
1978     if (page_rec_is_supremum(rec)) {
1979       continue;
1980     }
1981 
1982     cmp = cmp_dtuple_rec(entry, rec, index, offsets);
1983 
1984     if (cmp == 0 && !index->allow_duplicates) {
1985       if (row_ins_dupl_error_with_rec(rec, entry, index, offsets)) {
1986         err = DB_DUPLICATE_KEY;
1987 
1988         thr_get_trx(thr)->error_index = index;
1989 
1990         /* If the duplicate is on hidden FTS_DOC_ID,
1991         state so in the error log */
1992         if (index == index->table->fts_doc_id_index &&
1993             DICT_TF2_FLAG_IS_SET(index->table, DICT_TF2_FTS_HAS_DOC_ID)) {
1994           ib::error(ER_IB_MSG_958) << "Duplicate FTS_DOC_ID"
1995                                       " value on table "
1996                                    << index->table->name;
1997         }
1998 
1999         goto end_scan;
2000       }
2001     } else {
2002       ut_a(cmp < 0 || index->allow_duplicates);
2003       goto end_scan;
2004     }
2005   } while (btr_pcur_move_to_next(&pcur, mtr));
2006 
2007 end_scan:
2008   /* Restore old value */
2009   dtuple_set_n_fields_cmp(entry, n_fields_cmp);
2010 
2011   return err;
2012 }
2013 
2014 /** Checks for a duplicate when the table is being rebuilt online.
2015 @param[in]	n_uniq	offset of DB_TRX_ID
2016 @param[in]	entry	entry being inserted
2017 @param[in]	rec	clustered index record at insert position
2018 @param[in]	index	clustered index
2019 @param[in,out]	offsets	rec_get_offsets(rec)
2020 @retval DB_SUCCESS when no duplicate is detected
2021 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2022 a newer version of entry (the entry should not be inserted)
2023 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2024 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_duplicate_online(ulint n_uniq,const dtuple_t * entry,const rec_t * rec,const dict_index_t * index,ulint * offsets)2025     row_ins_duplicate_online(ulint n_uniq, const dtuple_t *entry,
2026                              const rec_t *rec, const dict_index_t *index,
2027                              ulint *offsets) {
2028   ulint fields = 0;
2029 
2030   /* During rebuild, there should not be any delete-marked rows
2031   in the new table. */
2032   ut_ad(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
2033   ut_ad(dtuple_get_n_fields_cmp(entry) == n_uniq);
2034 
2035   /* Compare the PRIMARY KEY fields and the
2036   DB_TRX_ID, DB_ROLL_PTR. */
2037   cmp_dtuple_rec_with_match_low(entry, rec, index, offsets, n_uniq + 2,
2038                                 &fields);
2039 
2040   if (fields < n_uniq) {
2041     /* Not a duplicate. */
2042     return (DB_SUCCESS);
2043   }
2044 
2045   if (fields == n_uniq + 2) {
2046     /* rec is an exact match of entry. */
2047     return (DB_SUCCESS_LOCKED_REC);
2048   }
2049 
2050   return (DB_DUPLICATE_KEY);
2051 }
2052 
2053 /** Checks for a duplicate when the table is being rebuilt online.
2054 @retval DB_SUCCESS when no duplicate is detected
2055 @retval DB_SUCCESS_LOCKED_REC when rec is an exact match of entry or
2056 a newer version of entry (the entry should not be inserted)
2057 @retval DB_DUPLICATE_KEY when entry is a duplicate of rec */
2058 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_duplicate_error_in_clust_online(ulint n_uniq,const dtuple_t * entry,const btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap)2059     row_ins_duplicate_error_in_clust_online(
2060         ulint n_uniq,            /*!< in: offset of DB_TRX_ID */
2061         const dtuple_t *entry,   /*!< in: entry that is being inserted */
2062         const btr_cur_t *cursor, /*!< in: cursor on insert position */
2063         ulint **offsets,         /*!< in/out: rec_get_offsets(rec) */
2064         mem_heap_t **heap)       /*!< in/out: heap for offsets */
2065 {
2066   dberr_t err = DB_SUCCESS;
2067   const rec_t *rec = btr_cur_get_rec(cursor);
2068 
2069   if (cursor->low_match >= n_uniq && !page_rec_is_infimum(rec)) {
2070     *offsets =
2071         rec_get_offsets(rec, cursor->index, *offsets, ULINT_UNDEFINED, heap);
2072     err = row_ins_duplicate_online(n_uniq, entry, rec, cursor->index, *offsets);
2073     if (err != DB_SUCCESS) {
2074       return (err);
2075     }
2076   }
2077 
2078   rec = page_rec_get_next_const(btr_cur_get_rec(cursor));
2079 
2080   if (cursor->up_match >= n_uniq && !page_rec_is_supremum(rec)) {
2081     *offsets =
2082         rec_get_offsets(rec, cursor->index, *offsets, ULINT_UNDEFINED, heap);
2083     err = row_ins_duplicate_online(n_uniq, entry, rec, cursor->index, *offsets);
2084   }
2085 
2086   return (err);
2087 }
2088 
2089 /** Checks if a unique key violation error would occur at an index entry
2090  insert. Sets shared locks on possible duplicate records. Works only
2091  for a clustered index!
2092  @retval DB_SUCCESS if no error
2093  @retval DB_DUPLICATE_KEY if error,
2094  @retval DB_LOCK_WAIT if we have to wait for a lock on a possible duplicate
2095  record
2096  @retval DB_SUCCESS_LOCKED_REC if an exact match of the record was found
2097  in online table rebuild (flags & (BTR_KEEP_SYS_FLAG | BTR_NO_LOCKING_FLAG)) */
2098 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_duplicate_error_in_clust(ulint flags,btr_cur_t * cursor,const dtuple_t * entry,que_thr_t * thr,mtr_t * mtr)2099     row_ins_duplicate_error_in_clust(
2100         ulint flags,           /*!< in: undo logging and locking flags */
2101         btr_cur_t *cursor,     /*!< in: B-tree cursor */
2102         const dtuple_t *entry, /*!< in: entry to insert */
2103         que_thr_t *thr,        /*!< in: query thread */
2104         mtr_t *mtr)            /*!< in: mtr */
2105 {
2106   dberr_t err;
2107   rec_t *rec;
2108   ulint n_unique;
2109   trx_t *trx = thr_get_trx(thr);
2110   mem_heap_t *heap = nullptr;
2111   ulint offsets_[REC_OFFS_NORMAL_SIZE];
2112   ulint *offsets = offsets_;
2113   rec_offs_init(offsets_);
2114 
2115   UT_NOT_USED(mtr);
2116 
2117   ut_ad(cursor->index->is_clustered());
2118 
2119   /* NOTE: For unique non-clustered indexes there may be any number
2120   of delete marked records with the same value for the non-clustered
2121   index key (remember multiversioning), and which differ only in
2122   the row refererence part of the index record, containing the
2123   clustered index key fields. For such a secondary index record,
2124   to avoid race condition, we must FIRST do the insertion and after
2125   that check that the uniqueness condition is not breached! */
2126 
2127   /* NOTE: A problem is that in the B-tree node pointers on an
2128   upper level may match more to the entry than the actual existing
2129   user records on the leaf level. So, even if low_match would suggest
2130   that a duplicate key violation may occur, this may not be the case. */
2131 
2132   n_unique = dict_index_get_n_unique(cursor->index);
2133 
2134   if (cursor->low_match >= n_unique) {
2135     rec = btr_cur_get_rec(cursor);
2136 
2137     if (!page_rec_is_infimum(rec)) {
2138       offsets =
2139           rec_get_offsets(rec, cursor->index, offsets, ULINT_UNDEFINED, &heap);
2140 
2141       /* We set a lock on the possible duplicate: this
2142       is needed in logical logging of MySQL to make
2143       sure that in roll-forward we get the same duplicate
2144       errors as in original execution */
2145 
2146       if (flags & BTR_NO_LOCKING_FLAG) {
2147         /* Do nothing if no-locking is set */
2148         err = DB_SUCCESS;
2149       } else {
2150         /* If the SQL-query will update or replace
2151         duplicate key we will take X-lock for
2152         duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2153         INSERT ON DUPLICATE KEY UPDATE). */
2154 
2155         err = row_ins_set_rec_lock(row_allow_duplicates(thr) ? LOCK_X : LOCK_S,
2156                                    LOCK_REC_NOT_GAP, btr_cur_get_block(cursor),
2157                                    rec, cursor->index, offsets, thr);
2158       }
2159 
2160       switch (err) {
2161         case DB_SUCCESS_LOCKED_REC:
2162         case DB_SUCCESS:
2163           break;
2164         default:
2165           goto func_exit;
2166       }
2167 
2168       if (row_ins_dupl_error_with_rec(rec, entry, cursor->index, offsets)) {
2169       duplicate:
2170         trx->error_index = cursor->index;
2171         err = DB_DUPLICATE_KEY;
2172         goto func_exit;
2173       }
2174     }
2175   }
2176 
2177   if (cursor->up_match >= n_unique) {
2178     rec = page_rec_get_next(btr_cur_get_rec(cursor));
2179 
2180     if (!page_rec_is_supremum(rec)) {
2181       offsets =
2182           rec_get_offsets(rec, cursor->index, offsets, ULINT_UNDEFINED, &heap);
2183 
2184       /* If the SQL-query will update or replace
2185       duplicate key we will take X-lock for
2186       duplicates ( REPLACE, LOAD DATAFILE REPLACE,
2187       INSERT ON DUPLICATE KEY UPDATE). */
2188 
2189       err = row_ins_set_rec_lock(row_allow_duplicates(thr) ? LOCK_X : LOCK_S,
2190                                  LOCK_REC_NOT_GAP, btr_cur_get_block(cursor),
2191                                  rec, cursor->index, offsets, thr);
2192 
2193       switch (err) {
2194         case DB_SUCCESS_LOCKED_REC:
2195         case DB_SUCCESS:
2196           break;
2197         default:
2198           goto func_exit;
2199       }
2200 
2201       if (row_ins_dupl_error_with_rec(rec, entry, cursor->index, offsets)) {
2202         goto duplicate;
2203       }
2204     }
2205 
2206     /* This should never happen */
2207     ut_error;
2208   }
2209 
2210   err = DB_SUCCESS;
2211 func_exit:
2212   if (UNIV_LIKELY_NULL(heap)) {
2213     mem_heap_free(heap);
2214   }
2215   return (err);
2216 }
2217 
2218 /** Checks if an index entry has long enough common prefix with an
2219  existing record so that the intended insert of the entry must be
2220  changed to a modify of the existing record. In the case of a clustered
2221  index, the prefix must be n_unique fields long. In the case of a
2222  secondary index, all fields must be equal.  InnoDB never updates
2223  secondary index records in place, other than clearing or setting the
2224  delete-mark flag. We could be able to update the non-unique fields
2225  of a unique secondary index record by checking the cursor->up_match,
2226  but we do not do so, because it could have some locking implications.
2227  @return true if the existing record should be updated; false if not */
2228 UNIV_INLINE
row_ins_must_modify_rec(const btr_cur_t * cursor)2229 ibool row_ins_must_modify_rec(const btr_cur_t *cursor) /*!< in: B-tree cursor */
2230 {
2231   /* NOTE: (compare to the note in row_ins_duplicate_error_in_clust)
2232   Because node pointers on upper levels of the B-tree may match more
2233   to entry than to actual user records on the leaf level, we
2234   have to check if the candidate record is actually a user record.
2235   A clustered index node pointer contains index->n_unique first fields,
2236   and a secondary index node pointer contains all index fields. */
2237 
2238   return (cursor->low_match >= dict_index_get_n_unique_in_tree(cursor->index) &&
2239           !page_rec_is_infimum(btr_cur_get_rec(cursor)));
2240 }
2241 
2242 /** Insert the externally stored fields (off-page columns)
2243 of a clustered index entry.
2244 @param[in]	trx	current transaction
2245 @param[in]	entry	index entry to insert
2246 @param[in]	big_rec	externally stored fields
2247 @param[in,out]	offsets	rec_get_offsets()
2248 @param[in,out]	heap	memory heap */
2249 #ifdef UNIV_DEBUG
2250 /**
2251 @param[in]	thd	client connection, or NULL */
2252 #endif /* UNIV_DEBUG */
2253 /**
2254 @param[in]	index	clustered index
2255 @return	error code
2256 @retval	DB_SUCCESS
2257 @retval DB_OUT_OF_FILE_SPACE */
row_ins_index_entry_big_rec_func(trx_t * trx,const dtuple_t * entry,const big_rec_t * big_rec,ulint * offsets,mem_heap_t ** heap,const THD * thd,dict_index_t * index)2258 static dberr_t row_ins_index_entry_big_rec_func(trx_t *trx,
2259                                                 const dtuple_t *entry,
2260                                                 const big_rec_t *big_rec,
2261                                                 ulint *offsets,
2262                                                 mem_heap_t **heap,
2263 #ifdef UNIV_DEBUG
2264                                                 const THD *thd,
2265 #endif /* UNIV_DEBUG */
2266                                                 dict_index_t *index) {
2267   mtr_t mtr;
2268   btr_pcur_t pcur;
2269   rec_t *rec;
2270   dberr_t error;
2271 
2272   ut_ad(index->is_clustered());
2273 
2274   DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern_latch");
2275   DEBUG_SYNC_C("before_insertion_of_blob");
2276 
2277   mtr_start(&mtr);
2278 
2279   dict_disable_redo_if_temporary(index->table, &mtr);
2280 
2281   btr_pcur_open(index, entry, PAGE_CUR_LE, BTR_MODIFY_TREE, &pcur, &mtr);
2282   rec = btr_pcur_get_rec(&pcur);
2283   offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, heap);
2284 
2285   DEBUG_SYNC_C_IF_THD(thd, "before_row_ins_extern");
2286   error = lob::btr_store_big_rec_extern_fields(
2287       trx, &pcur, nullptr, offsets, big_rec, &mtr, lob::OPCODE_INSERT);
2288   DEBUG_SYNC_C_IF_THD(thd, "after_row_ins_extern");
2289 
2290   if (error == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2291     row_log_table_insert(btr_pcur_get_rec(&pcur), entry, index, offsets);
2292   }
2293 
2294   mtr_commit(&mtr);
2295 
2296   btr_pcur_close(&pcur);
2297 
2298   return (error);
2299 }
2300 
2301 #ifndef UNIV_DEBUG
2302 #define row_ins_index_entry_big_rec(trx, e, big, ofs, heap, index, thd) \
2303   row_ins_index_entry_big_rec_func(trx, e, big, ofs, heap, index)
2304 #else /* UNIV_DEBUG */
2305 #define row_ins_index_entry_big_rec(trx, e, big, ofs, heap, index, thd) \
2306   row_ins_index_entry_big_rec_func(trx, e, big, ofs, heap, thd, index)
2307 #endif /* UNIV_DEBUG */
2308 
2309 /** Update all the prebuilts working on this temporary table
2310 @param[in,out]	table	dict_table_t for the table */
row_ins_temp_prebuilt_tree_modified(dict_table_t * table)2311 static void row_ins_temp_prebuilt_tree_modified(dict_table_t *table) {
2312   if (table->temp_prebuilt == nullptr) {
2313     return;
2314   }
2315 
2316   std::vector<row_prebuilt_t *>::const_iterator it;
2317 
2318   for (it = table->temp_prebuilt->begin(); it != table->temp_prebuilt->end();
2319        ++it) {
2320     if ((*it)->m_temp_read_shared) {
2321       (*it)->m_temp_tree_modified = true;
2322     }
2323   }
2324 }
2325 
2326 /** Tries to insert an entry into a clustered index, ignoring foreign key
2327  constraints. If a record with the same unique key is found, the other
2328  record is necessarily marked deleted by a committed transaction, or a
2329  unique key violation error occurs. The delete marked record is then
2330  updated to an existing record, and we must write an undo log record on
2331  the delete marked record.
2332  @retval DB_SUCCESS on success
2333  @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2334  @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2335  @return error code */
row_ins_clust_index_entry_low(uint32_t flags,ulint mode,dict_index_t * index,ulint n_uniq,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)2336 dberr_t row_ins_clust_index_entry_low(
2337     uint32_t flags,      /*!< in: undo logging and locking flags */
2338     ulint mode,          /*!< in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2339                          depending on whether we wish optimistic or
2340                          pessimistic descent down the index tree */
2341     dict_index_t *index, /*!< in: clustered index */
2342     ulint n_uniq,        /*!< in: 0 or index->n_uniq */
2343     dtuple_t *entry,     /*!< in/out: index entry to insert */
2344     que_thr_t *thr,      /*!< in: query thread, or NULL if
2345                          flags & (BTR_NO_LOCKING_FLAG
2346                          | BTR_NO_UNDO_LOG_FLAG) and a duplicate
2347                          can't occur */
2348     bool dup_chk_only)
2349 /*!< in: if true, just do duplicate check
2350 and return. don't execute actual insert. */
2351 {
2352   btr_pcur_t pcur;
2353   btr_cur_t *cursor;
2354   dberr_t err = DB_SUCCESS;
2355   big_rec_t *big_rec = nullptr;
2356   mtr_t mtr;
2357   mem_heap_t *offsets_heap = nullptr;
2358   ulint offsets_[REC_OFFS_NORMAL_SIZE];
2359   ulint *offsets = offsets_;
2360   rec_offs_init(offsets_);
2361 
2362   DBUG_TRACE;
2363 
2364 #ifdef UNIV_DEBUG
2365   mtr_t temp_mtr;
2366   temp_mtr.start();
2367   mtr_s_lock(dict_index_get_lock(index), &temp_mtr);
2368 
2369   if (btr_height_get(index, &temp_mtr) >= BTR_MAX_NODE_LEVEL &&
2370       btr_cur_limit_optimistic_insert_debug > 1 &&
2371       btr_cur_limit_optimistic_insert_debug < 5) {
2372     ib::error(ER_IB_MSG_BTREE_LEVEL_LIMIT_EXCEEDED, index->name());
2373     temp_mtr.commit();
2374     return (DB_BTREE_LEVEL_LIMIT_EXCEEDED);
2375   }
2376 
2377   temp_mtr.commit();
2378 #endif
2379 
2380   ut_ad(index->is_clustered());
2381   ut_ad(!dict_index_is_unique(index) ||
2382         n_uniq == dict_index_get_n_unique(index));
2383   ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
2384   ut_ad((flags & (BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG)) ||
2385         !thr_get_trx(thr)->in_rollback);
2386   ut_ad(thr != nullptr || !dup_chk_only);
2387 
2388   mtr.start();
2389 
2390   if (index->table->is_temporary()) {
2391     /* Disable REDO logging as the lifetime of temp-tables is
2392     limited to server or connection lifetime and so REDO
2393     information is not needed on restart for recovery.
2394     Disable locking as temp-tables are local to a connection. */
2395 
2396     ut_ad(flags & BTR_NO_LOCKING_FLAG);
2397     ut_ad(!index->table->is_intrinsic() || (flags & BTR_NO_UNDO_LOG_FLAG));
2398 
2399     mtr.set_log_mode(MTR_LOG_NO_REDO);
2400   }
2401 
2402   if (mode == BTR_MODIFY_LEAF && dict_index_is_online_ddl(index)) {
2403     mode = BTR_MODIFY_LEAF | BTR_ALREADY_S_LATCHED;
2404     mtr_s_lock(dict_index_get_lock(index), &mtr);
2405   }
2406 
2407   /* Note that we use PAGE_CUR_LE as the search mode, because then
2408   the function will return in both low_match and up_match of the
2409   cursor sensible values */
2410   btr_pcur_open(index, entry, PAGE_CUR_LE, mode, &pcur, &mtr);
2411   cursor = btr_pcur_get_btr_cur(&pcur);
2412   cursor->thr = thr;
2413 
2414   ut_ad(!index->table->is_intrinsic() ||
2415         cursor->page_cur.block->made_dirty_with_no_latch);
2416 
2417 #ifdef UNIV_DEBUG
2418   {
2419     page_t *page = btr_cur_get_page(cursor);
2420     rec_t *first_rec = page_rec_get_next(page_get_infimum_rec(page));
2421 
2422     ut_ad(page_rec_is_supremum(first_rec) ||
2423           rec_n_fields_is_sane(index, first_rec, entry));
2424   }
2425 #endif /* UNIV_DEBUG */
2426 
2427   /* Write logs for AUTOINC right after index lock has been got and
2428   before any further resource acquisitions to prevent deadlock.
2429   No need to log for temporary tables and intermediate tables */
2430   if (!index->table->is_temporary() && !index->table->skip_alter_undo &&
2431       dict_table_has_autoinc_col(index->table)) {
2432     ib_uint64_t counter =
2433         row_get_autoinc_counter(entry, index->table->autoinc_field_no);
2434 
2435     if (counter != 0) {
2436       /* Always log the counter change first, so it won't
2437       be affected by any follow-up failure. */
2438       dict_table_autoinc_log(index->table, counter, &mtr);
2439     }
2440   }
2441 
2442   /* Allowing duplicates in clustered index is currently enabled
2443   only for intrinsic table and caller understand the limited
2444   operation that can be done in this case. */
2445   ut_ad(!index->allow_duplicates ||
2446         (index->allow_duplicates && index->table->is_intrinsic()));
2447 
2448   if (!index->allow_duplicates && n_uniq &&
2449       (cursor->up_match >= n_uniq || cursor->low_match >= n_uniq)) {
2450     if (flags == (BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG |
2451                   BTR_KEEP_SYS_FLAG)) {
2452       /* Set no locks when applying log
2453       in online table rebuild. Only check for duplicates. */
2454       err = row_ins_duplicate_error_in_clust_online(n_uniq, entry, cursor,
2455                                                     &offsets, &offsets_heap);
2456 
2457       switch (err) {
2458         case DB_SUCCESS:
2459           break;
2460         default:
2461           ut_ad(0);
2462           /* fall through */
2463         case DB_SUCCESS_LOCKED_REC:
2464         case DB_DUPLICATE_KEY:
2465           if (thr != nullptr) {
2466             thr_get_trx(thr)->error_index = cursor->index;
2467           }
2468       }
2469     } else {
2470       /* Note that the following may return also
2471       DB_LOCK_WAIT */
2472 
2473       err = row_ins_duplicate_error_in_clust(flags, cursor, entry, thr, &mtr);
2474     }
2475 
2476     if (err != DB_SUCCESS) {
2477     err_exit:
2478       mtr.commit();
2479       goto func_exit;
2480     }
2481   }
2482 
2483   if (dup_chk_only) {
2484     mtr.commit();
2485     goto func_exit;
2486   }
2487   /* Note: Allowing duplicates would qualify for modification of
2488   an existing record as the new entry is exactly same as old entry.
2489   Avoid this check if allow duplicates is enabled. */
2490   if (!index->allow_duplicates && row_ins_must_modify_rec(cursor)) {
2491     /* There is already an index entry with a long enough common
2492     prefix, we must convert the insert into a modify of an
2493     existing record */
2494     mem_heap_t *entry_heap = mem_heap_create(1024);
2495 
2496     /* If the existing record is being modified and the new record
2497     doesn't fit the provided slot then existing record is added
2498     to free list and new record is inserted. This also means
2499     cursor that we have cached for SELECT is now invalid. */
2500     if (index->last_sel_cur) {
2501       index->last_sel_cur->invalid = true;
2502     }
2503 
2504     ut_ad(thr != nullptr);
2505     err = row_ins_clust_index_entry_by_modify(&pcur, flags, mode, &offsets,
2506                                               &offsets_heap, entry_heap, entry,
2507                                               thr, &mtr);
2508 
2509     if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2510       row_log_table_insert(btr_cur_get_rec(cursor), entry, index, offsets);
2511     }
2512 
2513     mtr.commit();
2514     mem_heap_free(entry_heap);
2515   } else {
2516     rec_t *insert_rec;
2517 
2518     if (mode != BTR_MODIFY_TREE) {
2519       ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
2520       err = btr_cur_optimistic_insert(flags, cursor, &offsets, &offsets_heap,
2521                                       entry, &insert_rec, &big_rec, thr, &mtr);
2522     } else {
2523       if (buf_LRU_buf_pool_running_out()) {
2524         err = DB_LOCK_TABLE_FULL;
2525         goto err_exit;
2526       }
2527 
2528       DEBUG_SYNC_C("before_insert_pessimitic_row_ins_clust");
2529 
2530       err = btr_cur_optimistic_insert(flags, cursor, &offsets, &offsets_heap,
2531                                       entry, &insert_rec, &big_rec, thr, &mtr);
2532 
2533       if (err == DB_FAIL) {
2534         err =
2535             btr_cur_pessimistic_insert(flags, cursor, &offsets, &offsets_heap,
2536                                        entry, &insert_rec, &big_rec, thr, &mtr);
2537 
2538         if (index->table->is_intrinsic() && err == DB_SUCCESS) {
2539           row_ins_temp_prebuilt_tree_modified(index->table);
2540         }
2541       }
2542     }
2543 
2544     if (big_rec != nullptr) {
2545       mtr.commit();
2546 
2547       /* Online table rebuild could read (and
2548       ignore) the incomplete record at this point.
2549       If online rebuild is in progress, the
2550       row_ins_index_entry_big_rec() will write log. */
2551 
2552       DBUG_EXECUTE_IF("row_ins_extern_checkpoint",
2553                       log_make_latest_checkpoint(););
2554       err = row_ins_index_entry_big_rec(thr_get_trx(thr), entry, big_rec,
2555                                         offsets, &offsets_heap, index,
2556                                         thr_get_trx(thr)->mysql_thd);
2557       dtuple_convert_back_big_rec(entry, big_rec);
2558     } else {
2559       if (err == DB_SUCCESS && dict_index_is_online_ddl(index)) {
2560         row_log_table_insert(insert_rec, entry, index, offsets);
2561       }
2562 
2563       mtr.commit();
2564     }
2565   }
2566 
2567 func_exit:
2568   if (offsets_heap != nullptr) {
2569     mem_heap_free(offsets_heap);
2570   }
2571 
2572   btr_pcur_close(&pcur);
2573 
2574   DBUG_EXECUTE_IF(
2575       "ib_sdi", if (dict_table_is_sdi(index->table->id)) {
2576         ib::info(ER_IB_MSG_959)
2577             << "ib_sdi: row_ins_clust_index_entry_low: " << index->name << " "
2578             << index->table->name << " return status: " << err;
2579       });
2580 
2581   return err;
2582 }
2583 
2584 /** This is a specialized function meant for direct insertion to auto-generated
2585 clustered index based on cached position from last successful insert. To be
2586 used when data is sorted.
2587 @param[in]	mode	BTR_MODIFY_LEAF or BTR_MODIFY_TREE.
2588                         depending on whether we wish optimistic or pessimistic
2589                         descent down the index tree
2590 @param[in,out]	index	clustered index
2591 @param[in,out]	entry	index entry to insert
2592 @param[in]	thr	query thread
2593 @return error code */
row_ins_sorted_clust_index_entry(ulint mode,dict_index_t * index,dtuple_t * entry,que_thr_t * thr)2594 static dberr_t row_ins_sorted_clust_index_entry(ulint mode, dict_index_t *index,
2595                                                 dtuple_t *entry,
2596                                                 que_thr_t *thr) {
2597   dberr_t err;
2598   mtr_t *mtr;
2599   const bool commit_mtr = mode == BTR_MODIFY_TREE;
2600 
2601   mem_heap_t *offsets_heap = nullptr;
2602   ulint offsets_[REC_OFFS_NORMAL_SIZE];
2603   ulint *offsets = offsets_;
2604   rec_offs_init(offsets_);
2605 
2606   DBUG_TRACE;
2607 
2608   ut_ad(index->last_ins_cur != nullptr);
2609   ut_ad(index->is_clustered());
2610   ut_ad(index->table->is_intrinsic());
2611   ut_ad(dict_index_is_auto_gen_clust(index));
2612 
2613   btr_cur_t cursor;
2614   cursor.thr = thr;
2615   mtr = &index->last_ins_cur->mtr;
2616 
2617   /* Search for position if tree needs to be split or if last position
2618   is not cached. */
2619   if (mode == BTR_MODIFY_TREE || index->last_ins_cur->rec == nullptr ||
2620       index->last_ins_cur->disable_caching) {
2621     /* Commit the previous mtr. */
2622     index->last_ins_cur->release();
2623 
2624     mtr_start(mtr);
2625     mtr_set_log_mode(mtr, MTR_LOG_NO_REDO);
2626 
2627     btr_cur_search_to_nth_level_with_no_latch(index, 0, entry, PAGE_CUR_LE,
2628                                               &cursor, __FILE__, __LINE__, mtr);
2629     ut_ad(cursor.page_cur.block != nullptr);
2630     ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2631   } else {
2632     cursor.index = index;
2633 
2634     cursor.page_cur.index = index;
2635 
2636     cursor.page_cur.rec = index->last_ins_cur->rec;
2637 
2638     cursor.page_cur.block = index->last_ins_cur->block;
2639   }
2640 
2641   const ulint flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
2642 
2643   for (;;) {
2644     rec_t *insert_rec;
2645     big_rec_t *big_rec = nullptr;
2646 
2647     if (mode != BTR_MODIFY_TREE) {
2648       ut_ad((mode & ~BTR_ALREADY_S_LATCHED) == BTR_MODIFY_LEAF);
2649 
2650       err = btr_cur_optimistic_insert(flags, &cursor, &offsets, &offsets_heap,
2651                                       entry, &insert_rec, &big_rec, thr, mtr);
2652       if (err != DB_SUCCESS) {
2653         break;
2654       }
2655     } else {
2656       /* TODO: Check if this is needed for intrinsic table. */
2657       if (buf_LRU_buf_pool_running_out()) {
2658         err = DB_LOCK_TABLE_FULL;
2659         break;
2660       }
2661 
2662       err = btr_cur_optimistic_insert(flags, &cursor, &offsets, &offsets_heap,
2663                                       entry, &insert_rec, &big_rec, thr, mtr);
2664 
2665       if (err == DB_FAIL) {
2666         err =
2667             btr_cur_pessimistic_insert(flags, &cursor, &offsets, &offsets_heap,
2668                                        entry, &insert_rec, &big_rec, thr, mtr);
2669         if (index->table->is_intrinsic() && err == DB_SUCCESS) {
2670           row_ins_temp_prebuilt_tree_modified(index->table);
2671         }
2672       }
2673     }
2674 
2675     if (big_rec != nullptr) {
2676       /* If index involves big-record optimization is
2677       turned-off. */
2678       index->last_ins_cur->release();
2679       index->last_ins_cur->disable_caching = true;
2680 
2681       err = row_ins_index_entry_big_rec(thr_get_trx(thr), entry, big_rec,
2682                                         offsets, &offsets_heap, index,
2683                                         thr_get_trx(thr)->mysql_thd);
2684 
2685       dtuple_convert_back_big_rec(entry, big_rec);
2686 
2687     } else if (err == DB_SUCCESS) {
2688       if (!commit_mtr && !index->last_ins_cur->disable_caching) {
2689         index->last_ins_cur->rec = insert_rec;
2690 
2691         index->last_ins_cur->block = cursor.page_cur.block;
2692       } else {
2693         index->last_ins_cur->release();
2694       }
2695     }
2696 
2697     break;
2698   }
2699 
2700   if (err != DB_SUCCESS) {
2701     index->last_ins_cur->release();
2702   }
2703 
2704   if (offsets_heap != nullptr) {
2705     mem_heap_free(offsets_heap);
2706   }
2707 
2708   return err;
2709 }
2710 
2711 /** Start a mini-transaction and check if the index will be dropped.
2712 @param[in,out]	mtr		mini-transaction
2713 @param[in,out]	index		secondary index
2714 @param[in]	check		whether to check
2715 @param[in]	search_mode	flags
2716 @return true if the index is to be dropped */
row_ins_sec_mtr_start_and_check_if_aborted(mtr_t * mtr,dict_index_t * index,bool check,ulint search_mode)2717 static MY_ATTRIBUTE((warn_unused_result)) bool row_ins_sec_mtr_start_and_check_if_aborted(
2718     mtr_t *mtr, dict_index_t *index, bool check, ulint search_mode) {
2719   ut_ad(!index->is_clustered());
2720 
2721   const mtr_log_t log_mode = mtr->get_log_mode();
2722 
2723   mtr_start(mtr);
2724 
2725   mtr->set_log_mode(log_mode);
2726 
2727   if (!check) {
2728     return (false);
2729   }
2730 
2731   if (search_mode & BTR_ALREADY_S_LATCHED) {
2732     mtr_s_lock(dict_index_get_lock(index), mtr);
2733   } else {
2734     mtr_sx_lock(dict_index_get_lock(index), mtr);
2735   }
2736 
2737   switch (index->online_status) {
2738     case ONLINE_INDEX_ABORTED:
2739     case ONLINE_INDEX_ABORTED_DROPPED:
2740       ut_ad(!index->is_committed());
2741       return (true);
2742     case ONLINE_INDEX_COMPLETE:
2743       return (false);
2744     case ONLINE_INDEX_CREATION:
2745       break;
2746   }
2747 
2748   ut_error;
2749 }
2750 
2751 /** Tries to insert an entry into a secondary index. If a record with exactly
2752 the same fields is found, the other record is necessarily marked deleted.
2753 It is then unmarked. Otherwise, the entry is just inserted to the index.
2754 @param[in]	flags		undo logging and locking flags
2755 @param[in]	mode		BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
2756                                 depending on whether we wish optimistic or
2757                                 pessimistic descent down the index tree
2758 @param[in]	index		secondary index
2759 @param[in,out]	offsets_heap	memory heap that can be emptied
2760 @param[in,out]	heap		memory heap
2761 @param[in,out]	entry		index entry to insert
2762 @param[in]	trx_id		PAGE_MAX_TRX_ID during row_log_table_apply(),
2763                                 or trx_id when undo log is disabled during
2764                                 alter copy operation or 0
2765 @param[in]	thr		query thread
2766 @param[in]	dup_chk_only	TRUE, just do duplicate check and return.
2767                                 don't execute actual insert
2768 @retval DB_SUCCESS on success
2769 @retval DB_LOCK_WAIT on lock wait when !(flags & BTR_NO_LOCKING_FLAG)
2770 @retval DB_FAIL if retry with BTR_MODIFY_TREE is needed
2771 @return error code */
row_ins_sec_index_entry_low(uint32_t flags,ulint mode,dict_index_t * index,mem_heap_t * offsets_heap,mem_heap_t * heap,dtuple_t * entry,trx_id_t trx_id,que_thr_t * thr,bool dup_chk_only)2772 dberr_t row_ins_sec_index_entry_low(uint32_t flags, ulint mode,
2773                                     dict_index_t *index,
2774                                     mem_heap_t *offsets_heap, mem_heap_t *heap,
2775                                     dtuple_t *entry, trx_id_t trx_id,
2776                                     que_thr_t *thr, bool dup_chk_only) {
2777   DBUG_TRACE;
2778 
2779   btr_cur_t cursor;
2780   ulint search_mode = mode;
2781   dberr_t err = DB_SUCCESS;
2782   ulint n_unique;
2783   mtr_t mtr;
2784   ulint offsets_[REC_OFFS_NORMAL_SIZE];
2785   ulint *offsets = offsets_;
2786   rec_offs_init(offsets_);
2787   rtr_info_t rtr_info;
2788 
2789   ut_ad(!index->is_clustered());
2790   ut_ad(mode == BTR_MODIFY_LEAF || mode == BTR_MODIFY_TREE);
2791 
2792   cursor.thr = thr;
2793   cursor.rtr_info = nullptr;
2794   ut_ad(thr_get_trx(thr)->id != 0 || index->table->is_intrinsic());
2795 
2796   mtr_start(&mtr);
2797 
2798   if (index->table->is_temporary()) {
2799     /* Disable REDO logging as the lifetime of temp-tables is
2800     limited to server or connection lifetime and so REDO
2801     information is not needed on restart for recovery.
2802     Disable locking as temp-tables are local to a connection. */
2803 
2804     ut_ad(flags & BTR_NO_LOCKING_FLAG);
2805     ut_ad(!index->table->is_intrinsic() || (flags & BTR_NO_UNDO_LOG_FLAG));
2806 
2807     mtr.set_log_mode(MTR_LOG_NO_REDO);
2808   } else if (!dict_index_is_spatial(index)) {
2809     /* Enable insert buffering if it's neither temp-table
2810     nor spatial index. */
2811     search_mode |= BTR_INSERT;
2812   }
2813 
2814   /* Ensure that we acquire index->lock when inserting into an
2815   index with index->online_status == ONLINE_INDEX_COMPLETE, but
2816   could still be subject to rollback_inplace_alter_table().
2817   This prevents a concurrent change of index->online_status.
2818   The memory object cannot be freed as long as we have an open
2819   reference to the table, or index->table->n_ref_count > 0. */
2820   bool check = !index->is_committed();
2821 
2822   DBUG_EXECUTE_IF("idx_mimic_not_committed", {
2823     check = true;
2824     mode = BTR_MODIFY_TREE;
2825   });
2826 
2827   if (check) {
2828     DEBUG_SYNC_C("row_ins_sec_index_enter");
2829     if (mode == BTR_MODIFY_LEAF) {
2830       search_mode |= BTR_ALREADY_S_LATCHED;
2831       mtr_s_lock(dict_index_get_lock(index), &mtr);
2832     } else {
2833       mtr_sx_lock(dict_index_get_lock(index), &mtr);
2834     }
2835 
2836     if (row_log_online_op_try(index, entry, thr_get_trx(thr)->id)) {
2837       goto func_exit;
2838     }
2839   }
2840 
2841   /* Note that we use PAGE_CUR_LE as the search mode, because then
2842   the function will return in both low_match and up_match of the
2843   cursor sensible values */
2844 
2845   if (!thr_get_trx(thr)->check_unique_secondary) {
2846     search_mode |= BTR_IGNORE_SEC_UNIQUE;
2847   }
2848 
2849   if (dict_index_is_spatial(index)) {
2850     cursor.index = index;
2851     rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2852     rtr_info_update_btr(&cursor, &rtr_info);
2853 
2854     btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_RTREE_INSERT,
2855                                 search_mode, &cursor, 0, __FILE__, __LINE__,
2856                                 &mtr);
2857 
2858     if (mode == BTR_MODIFY_LEAF && rtr_info.mbr_adj) {
2859       mtr_commit(&mtr);
2860       rtr_clean_rtr_info(&rtr_info, true);
2861       rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2862       rtr_info_update_btr(&cursor, &rtr_info);
2863 
2864       mtr_start(&mtr);
2865 
2866       search_mode &= ~BTR_MODIFY_LEAF;
2867 
2868       search_mode |= BTR_MODIFY_TREE;
2869 
2870       btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_RTREE_INSERT,
2871                                   search_mode, &cursor, 0, __FILE__, __LINE__,
2872                                   &mtr);
2873       mode = BTR_MODIFY_TREE;
2874     }
2875 
2876     DBUG_EXECUTE_IF("rtree_test_check_count", { goto func_exit; });
2877 
2878   } else {
2879     if (index->table->is_intrinsic()) {
2880       btr_cur_search_to_nth_level_with_no_latch(
2881           index, 0, entry, PAGE_CUR_LE, &cursor, __FILE__, __LINE__, &mtr);
2882       ut_ad(cursor.page_cur.block != nullptr);
2883       ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2884     } else {
2885       btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE, search_mode,
2886                                   &cursor, 0, __FILE__, __LINE__, &mtr);
2887     }
2888   }
2889 
2890   if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
2891     ut_ad(!dict_index_is_spatial(index));
2892     /* The insert was buffered during the search: we are done */
2893     goto func_exit;
2894   }
2895 
2896 #ifdef UNIV_DEBUG
2897   {
2898     page_t *page = btr_cur_get_page(&cursor);
2899     rec_t *first_rec = page_rec_get_next(page_get_infimum_rec(page));
2900 
2901     ut_ad(page_rec_is_supremum(first_rec) ||
2902           rec_n_fields_is_sane(index, first_rec, entry));
2903   }
2904 #endif /* UNIV_DEBUG */
2905 
2906   n_unique = dict_index_get_n_unique(index);
2907 
2908   if (dict_index_is_unique(index) &&
2909       (cursor.low_match >= n_unique || cursor.up_match >= n_unique)) {
2910     mtr_commit(&mtr);
2911 
2912     DEBUG_SYNC_C("row_ins_sec_index_unique");
2913 
2914     if (row_ins_sec_mtr_start_and_check_if_aborted(&mtr, index, check,
2915                                                    search_mode)) {
2916       goto func_exit;
2917     }
2918 
2919     err = row_ins_scan_sec_index_for_duplicate(flags, index, entry, thr, check,
2920                                                &mtr, offsets_heap);
2921 
2922     mtr_commit(&mtr);
2923 
2924     switch (err) {
2925       case DB_SUCCESS:
2926         break;
2927       case DB_DUPLICATE_KEY:
2928         if (!index->is_committed()) {
2929           ut_ad(!thr_get_trx(thr)->dict_operation_lock_mode);
2930 
2931           dict_set_corrupted(index);
2932           /* Do not return any error to the
2933           caller. The duplicate will be reported
2934           by ALTER TABLE or CREATE UNIQUE INDEX.
2935           Unfortunately we cannot report the
2936           duplicate key value to the DDL thread,
2937           because the altered_table object is
2938           private to its call stack. */
2939           err = DB_SUCCESS;
2940         }
2941         /* fall through */
2942       default:
2943         if (dict_index_is_spatial(index)) {
2944           rtr_clean_rtr_info(&rtr_info, true);
2945         }
2946         return err;
2947     }
2948 
2949     if (row_ins_sec_mtr_start_and_check_if_aborted(&mtr, index, check,
2950                                                    search_mode)) {
2951       goto func_exit;
2952     }
2953 
2954     DEBUG_SYNC_C("row_ins_sec_index_entry_dup_locks_created");
2955 
2956     /* We did not find a duplicate and we have now
2957     locked with s-locks the necessary records to
2958     prevent any insertion of a duplicate by another
2959     transaction. Let us now reposition the cursor and
2960     continue the insertion. */
2961     if (index->table->is_intrinsic()) {
2962       btr_cur_search_to_nth_level_with_no_latch(
2963           index, 0, entry, PAGE_CUR_LE, &cursor, __FILE__, __LINE__, &mtr);
2964       ut_ad(cursor.page_cur.block != nullptr);
2965       ut_ad(cursor.page_cur.block->made_dirty_with_no_latch);
2966     } else {
2967       btr_cur_search_to_nth_level(
2968           index, 0, entry, PAGE_CUR_LE,
2969           (search_mode & ~(BTR_INSERT | BTR_IGNORE_SEC_UNIQUE)), &cursor, 0,
2970           __FILE__, __LINE__, &mtr);
2971     }
2972   }
2973 
2974   if (dup_chk_only) {
2975     goto func_exit;
2976   }
2977 
2978   if (row_ins_must_modify_rec(&cursor)) {
2979     /* If the existing record is being modified and the new record
2980     is doesn't fit the provided slot then existing record is added
2981     to free list and new record is inserted. This also means
2982     cursor that we have cached for SELECT is now invalid. */
2983     if (index->last_sel_cur) {
2984       index->last_sel_cur->invalid = true;
2985     }
2986 
2987     /* There is already an index entry with a long enough common
2988     prefix, we must convert the insert into a modify of an
2989     existing record */
2990     offsets = rec_get_offsets(btr_cur_get_rec(&cursor), index, offsets,
2991                               ULINT_UNDEFINED, &offsets_heap);
2992 
2993     err = row_ins_sec_index_entry_by_modify(
2994         flags, mode, &cursor, &offsets, offsets_heap, heap, entry, thr, &mtr);
2995 
2996     if (err == DB_SUCCESS && dict_index_is_spatial(index) && rtr_info.mbr_adj) {
2997       err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
2998     }
2999   } else {
3000     rec_t *insert_rec;
3001     big_rec_t *big_rec;
3002 
3003     if (mode == BTR_MODIFY_LEAF) {
3004       err = btr_cur_optimistic_insert(flags, &cursor, &offsets, &offsets_heap,
3005                                       entry, &insert_rec, &big_rec, thr, &mtr);
3006       if (err == DB_SUCCESS && dict_index_is_spatial(index) &&
3007           rtr_info.mbr_adj) {
3008         err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3009       }
3010     } else {
3011       ut_ad(mode == BTR_MODIFY_TREE);
3012       if (buf_LRU_buf_pool_running_out()) {
3013         err = DB_LOCK_TABLE_FULL;
3014         goto func_exit;
3015       }
3016 
3017       err = btr_cur_optimistic_insert(flags, &cursor, &offsets, &offsets_heap,
3018                                       entry, &insert_rec, &big_rec, thr, &mtr);
3019       if (err == DB_FAIL) {
3020         err =
3021             btr_cur_pessimistic_insert(flags, &cursor, &offsets, &offsets_heap,
3022                                        entry, &insert_rec, &big_rec, thr, &mtr);
3023       }
3024       if (err == DB_SUCCESS && dict_index_is_spatial(index) &&
3025           rtr_info.mbr_adj) {
3026         err = rtr_ins_enlarge_mbr(&cursor, thr, &mtr);
3027       }
3028     }
3029 
3030     if (err == DB_SUCCESS && trx_id) {
3031       page_update_max_trx_id(btr_cur_get_block(&cursor),
3032                              btr_cur_get_page_zip(&cursor), trx_id, &mtr);
3033     }
3034 
3035     ut_ad(!big_rec);
3036   }
3037 
3038 func_exit:
3039   if (dict_index_is_spatial(index)) {
3040     rtr_clean_rtr_info(&rtr_info, true);
3041   }
3042 
3043   mtr_commit(&mtr);
3044   return err;
3045 }
3046 
3047 /** Inserts an entry into a clustered index. Tries first optimistic,
3048  then pessimistic descent down the tree. If the entry matches enough
3049  to a delete marked record, performs the insert by updating or delete
3050  unmarking the delete marked record.
3051  @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
row_ins_clust_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)3052 dberr_t row_ins_clust_index_entry(
3053     dict_index_t *index, /*!< in: clustered index */
3054     dtuple_t *entry,     /*!< in/out: index entry to insert */
3055     que_thr_t *thr,      /*!< in: query thread */
3056     bool dup_chk_only)
3057 /*!< in: if true, just do duplicate check
3058 and return. don't execute actual insert. */
3059 {
3060   dberr_t err;
3061   ulint n_uniq;
3062 
3063   DBUG_TRACE;
3064 
3065   if (!index->table->foreign_set.empty()) {
3066     err = row_ins_check_foreign_constraints(index->table, index, entry, thr);
3067     if (err != DB_SUCCESS) {
3068       return err;
3069     }
3070   }
3071 
3072   n_uniq = dict_index_is_unique(index) ? index->n_uniq : 0;
3073 
3074   /* Try first optimistic descent to the B-tree */
3075   uint32_t flags;
3076 
3077   if (!index->table->is_intrinsic()) {
3078     log_free_check();
3079     flags = index->table->is_temporary() ? BTR_NO_LOCKING_FLAG : 0;
3080 
3081     /* For intermediate table of copy alter operation,
3082     skip undo logging and record lock checking for
3083     insertion operation. */
3084     if (index->table->skip_alter_undo) {
3085       flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
3086     }
3087 
3088   } else {
3089     flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3090   }
3091 
3092   if (index->table->is_intrinsic() && dict_index_is_auto_gen_clust(index)) {
3093     /* Check if the memory allocated for intrinsic cache*/
3094     if (!index->last_ins_cur) {
3095       dict_allocate_mem_intrinsic_cache(index);
3096     }
3097     err = row_ins_sorted_clust_index_entry(BTR_MODIFY_LEAF, index, entry, thr);
3098   } else {
3099     err = row_ins_clust_index_entry_low(flags, BTR_MODIFY_LEAF, index, n_uniq,
3100                                         entry, thr, dup_chk_only);
3101   }
3102 
3103   DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3104                       "after_row_ins_clust_index_entry_leaf");
3105 
3106   if (err != DB_FAIL) {
3107     DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
3108     return err;
3109   }
3110 
3111   /* Try then pessimistic descent to the B-tree */
3112   if (!index->table->is_intrinsic()) {
3113     log_free_check();
3114   } else if (!index->last_sel_cur) {
3115     dict_allocate_mem_intrinsic_cache(index);
3116     index->last_sel_cur->invalid = true;
3117   } else {
3118     index->last_sel_cur->invalid = true;
3119   }
3120 
3121   if (index->table->is_intrinsic() && dict_index_is_auto_gen_clust(index)) {
3122     err = row_ins_sorted_clust_index_entry(BTR_MODIFY_TREE, index, entry, thr);
3123   } else {
3124     err = row_ins_clust_index_entry_low(flags, BTR_MODIFY_TREE, index, n_uniq,
3125                                         entry, thr, dup_chk_only);
3126   }
3127 
3128   return err;
3129 }
3130 
3131 /** Inserts an entry into a secondary index. Tries first optimistic,
3132  then pessimistic descent down the tree. If the entry matches enough
3133  to a delete marked record, performs the insert by updating or delete
3134  unmarking the delete marked record.
3135  @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
row_ins_sec_index_entry(dict_index_t * index,dtuple_t * entry,que_thr_t * thr,bool dup_chk_only)3136 dberr_t row_ins_sec_index_entry(
3137     dict_index_t *index, /*!< in: secondary index */
3138     dtuple_t *entry,     /*!< in/out: index entry to insert */
3139     que_thr_t *thr,      /*!< in: query thread */
3140     bool dup_chk_only)
3141 /*!< in: if true, just do duplicate check
3142 and return. don't execute actual insert. */
3143 {
3144   dberr_t err;
3145   mem_heap_t *offsets_heap;
3146   mem_heap_t *heap;
3147   trx_id_t trx_id = 0;
3148 
3149   DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
3150     DBUG_SET("-d,row_ins_sec_index_entry_timeout");
3151     return (DB_LOCK_WAIT);
3152   });
3153 
3154   DBUG_EXECUTE_IF("row_ins_sec_index_entry_lock_wait", {
3155     static uint16_t count = 0;
3156     if (index->is_multi_value()) {
3157       ++count;
3158     }
3159     if (count == 2) {
3160       count = 0;
3161       return (DB_LOCK_WAIT);
3162     }
3163   });
3164 
3165   if (!index->table->foreign_set.empty()) {
3166     err = row_ins_check_foreign_constraints(index->table, index, entry, thr);
3167     if (err != DB_SUCCESS) {
3168       return (err);
3169     }
3170   }
3171 
3172   offsets_heap = mem_heap_create(1024);
3173   heap = mem_heap_create(1024);
3174 
3175   /* Try first optimistic descent to the B-tree */
3176 
3177   uint32_t flags;
3178 
3179   if (!index->table->is_intrinsic()) {
3180     log_free_check();
3181     ut_ad(thr_get_trx(thr)->id != 0);
3182 
3183     flags = index->table->is_temporary() ? BTR_NO_LOCKING_FLAG : 0;
3184     /* For intermediate table during copy alter table,
3185     skip the undo log and record lock checking for
3186     insertion operation. */
3187     if (index->table->skip_alter_undo) {
3188       trx_id = thr_get_trx(thr)->id;
3189       flags |= BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG;
3190     }
3191 
3192   } else {
3193     flags = BTR_NO_LOCKING_FLAG | BTR_NO_UNDO_LOG_FLAG;
3194   }
3195 
3196   err = row_ins_sec_index_entry_low(flags, BTR_MODIFY_LEAF, index, offsets_heap,
3197                                     heap, entry, trx_id, thr, dup_chk_only);
3198   if (err == DB_FAIL) {
3199     mem_heap_empty(heap);
3200 
3201     /* Try then pessimistic descent to the B-tree */
3202 
3203     if (!index->table->is_intrinsic()) {
3204       log_free_check();
3205     } else if (!index->last_sel_cur) {
3206       dict_allocate_mem_intrinsic_cache(index);
3207       index->last_sel_cur->invalid = true;
3208     } else {
3209       index->last_sel_cur->invalid = true;
3210     }
3211 
3212     err =
3213         row_ins_sec_index_entry_low(flags, BTR_MODIFY_TREE, index, offsets_heap,
3214                                     heap, entry, 0, thr, dup_chk_only);
3215   }
3216 
3217   mem_heap_free(heap);
3218   mem_heap_free(offsets_heap);
3219   return (err);
3220 }
3221 
3222 /** Inserts an entry into a secondary index, which is created for
3223 multi-value field. For each value to be inserted, it tries first optimistic,
3224 then pessimistic descent down the tree. If the entry matches enough
3225 to a delete marked record, performs the insert by updating or delete
3226 unmarking the delete marked record.
3227 @param[in]      index           secondary index which is for multi-value field
3228 @param[in,out]  entry           index entry to insert
3229 @param[in,out]  multi_val_pos   the start position to insert next multi-value
3230                                 data, and the returned value should be either
3231                                 0 if all are done, or the position where the
3232                                 insert failed. So return value of 0 could be
3233                                 a bit ambiguous, however the return error
3234                                 can help to see which case it is
3235 @param[in]      thr             query thread
3236 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
row_ins_sec_index_multi_value_entry(dict_index_t * index,dtuple_t * entry,uint32_t & multi_val_pos,que_thr_t * thr)3237 static dberr_t row_ins_sec_index_multi_value_entry(dict_index_t *index,
3238                                                    dtuple_t *entry,
3239                                                    uint32_t &multi_val_pos,
3240                                                    que_thr_t *thr) {
3241   ut_d(trx_t *trx = thr_get_trx(thr));
3242 
3243   ut_ad(trx->id != 0);
3244   ut_ad(!index->table->is_intrinsic());
3245   ut_ad(index->is_committed());
3246   ut_ad(!dict_index_is_online_ddl(index));
3247   ut_ad(index->is_multi_value());
3248 
3249   dberr_t err = DB_SUCCESS;
3250   Multi_value_entry_builder_insert mv_entry_builder(index, entry);
3251 
3252   for (dtuple_t *mv_entry = mv_entry_builder.begin(multi_val_pos);
3253        mv_entry != nullptr; mv_entry = mv_entry_builder.next()) {
3254     err = row_ins_sec_index_entry(index, mv_entry, thr, false);
3255     if (err != DB_SUCCESS) {
3256       multi_val_pos = mv_entry_builder.last_multi_value_position();
3257       return (err);
3258     }
3259   }
3260 
3261   multi_val_pos = 0;
3262 
3263   return (err);
3264 }
3265 
3266 /** Inserts an index entry to index. Tries first optimistic, then pessimistic
3267 descent down the tree. If the entry matches enough to a delete marked record,
3268 performs the insert by updating or delete unmarking the delete marked
3269 record.
3270 @param[in]	index		index to insert the entry
3271 @param[in,out]	entry		entry to insert
3272 @param[in,out]	multi_val_pos	if multi-value index, the start position
3273                                 to insert next multi-value data,
3274                                 and the returned value should be either
3275                                 0 if all are done, or the position where the
3276                                 insert failed. So return value of 0 could be
3277                                 a bit ambiguous, however the return error
3278                                 can help to see which case it is
3279 @param[in]	thr		query thread
3280 @return DB_SUCCESS, DB_LOCK_WAIT, DB_DUPLICATE_KEY, or some other error code */
row_ins_index_entry(dict_index_t * index,dtuple_t * entry,uint32_t & multi_val_pos,que_thr_t * thr)3281 static dberr_t row_ins_index_entry(dict_index_t *index, dtuple_t *entry,
3282                                    uint32_t &multi_val_pos, que_thr_t *thr) {
3283   ut_ad(thr_get_trx(thr)->id != 0);
3284 
3285   DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
3286     DBUG_SET("-d,row_ins_index_entry_timeout");
3287     return (DB_LOCK_WAIT);
3288   });
3289 
3290   if (index->is_clustered()) {
3291     return (row_ins_clust_index_entry(index, entry, thr, false));
3292   } else if (index->is_multi_value()) {
3293     return (
3294         row_ins_sec_index_multi_value_entry(index, entry, multi_val_pos, thr));
3295   } else {
3296     return (row_ins_sec_index_entry(index, entry, thr, false));
3297   }
3298 }
3299 
3300 /** This function generate MBR (Minimum Bounding Box) for spatial objects
3301  and set it to spatial index field. */
row_ins_spatial_index_entry_set_mbr_field(dfield_t * field,const dfield_t * row_field,uint32_t * srid,const dd::Spatial_reference_system * srs)3302 static void row_ins_spatial_index_entry_set_mbr_field(
3303     dfield_t *field,           /*!< in/out: mbr field */
3304     const dfield_t *row_field, /*!< in: row field */
3305     uint32_t *srid,            /*!< in/out: spatial reference id */
3306     const dd::Spatial_reference_system *srs) /*!< in: SRS of row_field */
3307 {
3308   uchar *dptr = nullptr;
3309   ulint dlen = 0;
3310   double mbr[SPDIMS * 2];
3311 
3312   /* This must be a GEOMETRY datatype */
3313   ut_ad(DATA_GEOMETRY_MTYPE(field->type.mtype));
3314 
3315   dptr = static_cast<uchar *>(dfield_get_data(row_field));
3316   dlen = dfield_get_len(row_field);
3317 
3318   /* obtain the MBR */
3319   get_mbr_from_store(srs, dptr, static_cast<uint>(dlen), SPDIMS, mbr, srid);
3320 
3321   /* Set mbr as index entry data */
3322   dfield_write_mbr(field, mbr);
3323 }
3324 
3325 /** Sets the values of the dtuple fields in entry from the values of appropriate
3326 columns in row.
3327 @param[in]	index	index handler
3328 @param[out]	entry	index entry to make
3329 @param[in]	row	row
3330 
3331 @return DB_SUCCESS if the set is successful */
row_ins_index_entry_set_vals(const dict_index_t * index,dtuple_t * entry,const dtuple_t * row)3332 dberr_t row_ins_index_entry_set_vals(const dict_index_t *index, dtuple_t *entry,
3333                                      const dtuple_t *row) {
3334   ulint n_fields;
3335   ulint i;
3336   ulint num_v = dtuple_get_n_v_fields(entry);
3337 
3338   n_fields = dtuple_get_n_fields(entry);
3339 
3340   for (i = 0; i < n_fields + num_v; i++) {
3341     dict_field_t *ind_field = nullptr;
3342     dfield_t *field;
3343     const dfield_t *row_field;
3344     ulint len;
3345     dict_col_t *col;
3346 
3347     if (i >= n_fields) {
3348       /* This is virtual field */
3349       field = dtuple_get_nth_v_field(entry, i - n_fields);
3350       col = &dict_table_get_nth_v_col(index->table, i - n_fields)->m_col;
3351     } else {
3352       field = dtuple_get_nth_field(entry, i);
3353       ind_field = index->get_field(i);
3354       col = ind_field->col;
3355     }
3356 
3357     if (col->is_virtual()) {
3358       const dict_v_col_t *v_col = reinterpret_cast<const dict_v_col_t *>(col);
3359       ut_ad(dtuple_get_n_fields(row) == index->table->get_n_cols());
3360       row_field = dtuple_get_nth_v_field(row, v_col->v_pos);
3361     } else {
3362       row_field = dtuple_get_nth_field(row, ind_field->col->ind);
3363     }
3364 
3365     len = dfield_get_len(row_field);
3366 
3367     /* Check column prefix indexes */
3368     if (ind_field != nullptr && ind_field->prefix_len > 0 &&
3369         dfield_get_len(row_field) != UNIV_SQL_NULL) {
3370       const dict_col_t *col = ind_field->col;
3371 
3372       len = dtype_get_at_most_n_mbchars(
3373           col->prtype, col->mbminmaxlen, ind_field->prefix_len, len,
3374           static_cast<const char *>(dfield_get_data(row_field)));
3375 
3376       ut_ad(!dfield_is_ext(row_field));
3377     }
3378 
3379     /* Handle spatial index. For the first field, replace
3380     the data with its MBR (Minimum Bounding Box). */
3381     if ((i == 0) && dict_index_is_spatial(index)) {
3382       if (!row_field->data || row_field->len < GEO_DATA_HEADER_SIZE) {
3383         return (DB_CANT_CREATE_GEOMETRY_OBJECT);
3384       }
3385 
3386       uint32_t srid;
3387       row_ins_spatial_index_entry_set_mbr_field(field, row_field, &srid,
3388                                                 index->rtr_srs.get());
3389 
3390       if (index->srid_is_valid && index->srid != srid) {
3391         return DB_CANT_CREATE_GEOMETRY_OBJECT;
3392       }
3393 
3394       continue;
3395     }
3396 
3397     dfield_set_data(field, dfield_get_data(row_field), len);
3398     if (dfield_is_ext(row_field)) {
3399       ut_ad(index->is_clustered());
3400       dfield_set_ext(field);
3401     }
3402   }
3403 
3404   return (DB_SUCCESS);
3405 }
3406 
3407 /** Inserts a single index entry to the table.
3408  @return DB_SUCCESS if operation successfully completed, else error
3409  code or DB_LOCK_WAIT */
3410 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins_index_entry_step(ins_node_t * node,que_thr_t * thr)3411     row_ins_index_entry_step(ins_node_t *node, /*!< in: row insert node */
3412                              que_thr_t *thr)   /*!< in: query thread */
3413 {
3414   dberr_t err;
3415 
3416   DBUG_TRACE;
3417 
3418   ut_ad(dtuple_check_typed(node->row));
3419 
3420   err = row_ins_index_entry_set_vals(node->index, node->entry, node->row);
3421 
3422   if (err != DB_SUCCESS) {
3423     return err;
3424   }
3425 
3426   ut_ad(dtuple_check_typed(node->entry));
3427 
3428   err = row_ins_index_entry(node->index, node->entry, node->ins_multi_val_pos,
3429                             thr);
3430 
3431   DEBUG_SYNC_C_IF_THD(thr_get_trx(thr)->mysql_thd,
3432                       "after_row_ins_index_entry_step");
3433 
3434   return err;
3435 }
3436 
3437 /** Allocates a row id for row and inits the node->index field. */
3438 UNIV_INLINE
row_ins_alloc_row_id_step(ins_node_t * node)3439 void row_ins_alloc_row_id_step(ins_node_t *node) /*!< in: row insert node */
3440 {
3441   row_id_t row_id;
3442 
3443   ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
3444 
3445   if (dict_index_is_unique(node->table->first_index())) {
3446     /* No row id is stored if the clustered index is unique */
3447 
3448     return;
3449   }
3450 
3451   /* Fill in row id value to row */
3452 
3453   row_id = dict_sys_get_new_row_id();
3454 
3455   dict_sys_write_row_id(node->row_id_buf, row_id);
3456 }
3457 
3458 /** Gets a row to insert from the values list. */
3459 UNIV_INLINE
row_ins_get_row_from_values(ins_node_t * node)3460 void row_ins_get_row_from_values(ins_node_t *node) /*!< in: row insert node */
3461 {
3462   que_node_t *list_node;
3463   dfield_t *dfield;
3464   dtuple_t *row;
3465   ulint i;
3466 
3467   /* The field values are copied in the buffers of the select node and
3468   it is safe to use them until we fetch from select again: therefore
3469   we can just copy the pointers */
3470 
3471   row = node->row;
3472 
3473   i = 0;
3474   list_node = node->values_list;
3475 
3476   while (list_node) {
3477     eval_exp(list_node);
3478 
3479     dfield = dtuple_get_nth_field(row, i);
3480     dfield_copy_data(dfield, que_node_get_val(list_node));
3481 
3482     i++;
3483     list_node = que_node_get_next(list_node);
3484   }
3485 }
3486 
3487 /** Gets a row to insert from the select list. */
3488 UNIV_INLINE
row_ins_get_row_from_select(ins_node_t * node)3489 void row_ins_get_row_from_select(ins_node_t *node) /*!< in: row insert node */
3490 {
3491   que_node_t *list_node;
3492   dfield_t *dfield;
3493   dtuple_t *row;
3494   ulint i;
3495 
3496   /* The field values are copied in the buffers of the select node and
3497   it is safe to use them until we fetch from select again: therefore
3498   we can just copy the pointers */
3499 
3500   row = node->row;
3501 
3502   i = 0;
3503   list_node = node->select->select_list;
3504 
3505   while (list_node) {
3506     dfield = dtuple_get_nth_field(row, i);
3507     dfield_copy_data(dfield, que_node_get_val(list_node));
3508 
3509     i++;
3510     list_node = que_node_get_next(list_node);
3511   }
3512 }
3513 
3514 /** Inserts a row to a table.
3515  @return DB_SUCCESS if operation successfully completed, else error
3516  code or DB_LOCK_WAIT */
3517 static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_ins(ins_node_t * node,que_thr_t * thr)3518     row_ins(ins_node_t *node, /*!< in: row insert node */
3519             que_thr_t *thr)   /*!< in: query thread */
3520 {
3521   dberr_t err;
3522 
3523   DBUG_TRACE;
3524 
3525   DBUG_PRINT("row_ins", ("table: %s", node->table->name.m_name));
3526 
3527   if (node->state == INS_NODE_ALLOC_ROW_ID) {
3528     row_ins_alloc_row_id_step(node);
3529 
3530     node->index = node->table->first_index();
3531     node->entry = UT_LIST_GET_FIRST(node->entry_list);
3532 
3533     if (node->ins_type == INS_SEARCHED) {
3534       row_ins_get_row_from_select(node);
3535 
3536     } else if (node->ins_type == INS_VALUES) {
3537       row_ins_get_row_from_values(node);
3538     }
3539 
3540     node->state = INS_NODE_INSERT_ENTRIES;
3541   }
3542 
3543   ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
3544 
3545   while (node->index != nullptr) {
3546     if (node->index->type != DICT_FTS) {
3547       err = row_ins_index_entry_step(node, thr);
3548 
3549       switch (err) {
3550         case DB_SUCCESS:
3551           break;
3552         case DB_DUPLICATE_KEY:
3553           thr_get_trx(thr)->error_state = DB_DUPLICATE_KEY;
3554           thr_get_trx(thr)->error_index = node->index;
3555         // fall through
3556         default:
3557           return err;
3558       }
3559     }
3560 
3561     node->index = node->index->next();
3562     node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3563 
3564     DBUG_EXECUTE_IF("row_ins_skip_sec", node->index = nullptr;
3565                     node->entry = nullptr; break;);
3566 
3567     /* Skip corrupted secondary index and its entry */
3568     while (node->index && node->index->is_corrupted()) {
3569       node->index = node->index->next();
3570       node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
3571     }
3572   }
3573 
3574   ut_ad(node->entry == nullptr);
3575 
3576   thr_get_trx(thr)->error_index = nullptr;
3577   node->state = INS_NODE_ALLOC_ROW_ID;
3578 
3579   return DB_SUCCESS;
3580 }
3581 
3582 /** Inserts a row to a table. This is a high-level function used in SQL
3583  execution graphs.
3584  @return query thread to run next or NULL */
row_ins_step(que_thr_t * thr)3585 que_thr_t *row_ins_step(que_thr_t *thr) /*!< in: query thread */
3586 {
3587   ins_node_t *node;
3588   que_node_t *parent;
3589   sel_node_t *sel_node;
3590   trx_t *trx;
3591   dberr_t err;
3592 
3593   ut_ad(thr);
3594 
3595   DEBUG_SYNC_C("innodb_row_ins_step_enter");
3596 
3597   trx = thr_get_trx(thr);
3598 
3599   trx_start_if_not_started_xa(trx, true);
3600 
3601   node = static_cast<ins_node_t *>(thr->run_node);
3602 
3603   ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
3604   ut_ad(!node->table->is_intrinsic());
3605 
3606   parent = que_node_get_parent(node);
3607   sel_node = node->select;
3608 
3609   if (thr->prev_node == parent) {
3610     node->state = INS_NODE_SET_IX_LOCK;
3611   }
3612 
3613   /* If this is the first time this node is executed (or when
3614   execution resumes after wait for the table IX lock), set an
3615   IX lock on the table and reset the possible select node. MySQL's
3616   partitioned table code may also call an insert within the same
3617   SQL statement AFTER it has used this table handle to do a search.
3618   This happens, for example, when a row update moves it to another
3619   partition. In that case, we have already set the IX lock on the
3620   table during the search operation, and there is no need to set
3621   it again here. But we must write trx->id to node->trx_id_buf. */
3622 
3623   memset(node->trx_id_buf, 0, DATA_TRX_ID_LEN);
3624   trx_write_trx_id(node->trx_id_buf, trx->id);
3625 
3626   if (node->state == INS_NODE_SET_IX_LOCK) {
3627     node->state = INS_NODE_ALLOC_ROW_ID;
3628 
3629     /* It may be that the current session has not yet started
3630     its transaction, or it has been committed: */
3631 
3632     if (trx->id == node->trx_id) {
3633       /* No need to do IX-locking */
3634 
3635       goto same_trx;
3636     }
3637 
3638     err = lock_table(0, node->table, LOCK_IX, thr);
3639 
3640     DBUG_EXECUTE_IF("ib_row_ins_ix_lock_wait", err = DB_LOCK_WAIT;);
3641 
3642     if (err != DB_SUCCESS) {
3643       goto error_handling;
3644     }
3645 
3646     node->trx_id = trx->id;
3647   same_trx:
3648     if (node->ins_type == INS_SEARCHED) {
3649       /* Reset the cursor */
3650       sel_node->state = SEL_NODE_OPEN;
3651 
3652       /* Fetch a row to insert */
3653 
3654       thr->run_node = sel_node;
3655 
3656       return (thr);
3657     }
3658   }
3659 
3660   if ((node->ins_type == INS_SEARCHED) && (sel_node->state != SEL_NODE_FETCH)) {
3661     ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
3662 
3663     /* No more rows to insert */
3664     thr->run_node = parent;
3665 
3666     return (thr);
3667   }
3668 
3669   /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
3670 
3671   err = row_ins(node, thr);
3672 
3673 error_handling:
3674   trx->error_state = err;
3675 
3676   if (err != DB_SUCCESS) {
3677     /* err == DB_LOCK_WAIT or SQL error detected */
3678     return (nullptr);
3679   }
3680 
3681   /* DO THE TRIGGER ACTIONS HERE */
3682 
3683   if (node->ins_type == INS_SEARCHED) {
3684     /* Fetch a row to insert */
3685 
3686     thr->run_node = sel_node;
3687   } else {
3688     thr->run_node = que_node_get_parent(node);
3689   }
3690 
3691   return (thr);
3692 }
3693