1 /*****************************************************************************
2
3 Copyright (c) 2005, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0merge.cc
29 New index creation routines using a merge sort
30
31 Created 12/4/2005 Jan Lindstrom
32 Completed by Sunny Bains and Marko Makela
33 *******************************************************/
34
35 #include <math.h>
36
37 #include "ha_prototypes.h"
38
39 #include "row0merge.h"
40 #include "row0ext.h"
41 #include "row0log.h"
42 #include "row0ins.h"
43 #include "row0sel.h"
44 #include "dict0crea.h"
45 #include "trx0purge.h"
46 #include "lock0lock.h"
47 #include "pars0pars.h"
48 #include "ut0sort.h"
49 #include "row0ftsort.h"
50 #include "row0import.h"
51 #include "handler0alter.h"
52 #include "btr0bulk.h"
53 #include "fsp0sysspace.h"
54 #include "ut0new.h"
55 #include "ut0stage.h"
56
57 /* Ignore posix_fadvise() on those platforms where it does not exist */
58 #if defined _WIN32
59 # define posix_fadvise(fd, offset, len, advice) /* nothing */
60 #endif /* _WIN32 */
61
62 /* Whether to disable file system cache */
63 char srv_disable_sort_file_cache;
64
65 /** Class that caches index row tuples made from a single cluster
66 index page scan, and then insert into corresponding index tree */
67 class index_tuple_info_t {
68 public:
69 /** constructor
70 @param[in] heap memory heap
71 @param[in] index index to be created */
index_tuple_info_t(mem_heap_t * heap,dict_index_t * index)72 index_tuple_info_t(
73 mem_heap_t* heap,
74 dict_index_t* index) UNIV_NOTHROW
75 {
76 m_heap = heap;
77 m_index = index;
78 m_dtuple_vec = UT_NEW_NOKEY(idx_tuple_vec());
79 }
80
81 /** destructor */
~index_tuple_info_t()82 ~index_tuple_info_t()
83 {
84 UT_DELETE(m_dtuple_vec);
85 }
86
87 /** Get the index object
88 @return the index object */
get_index()89 dict_index_t* get_index() UNIV_NOTHROW
90 {
91 return(m_index);
92 }
93
94 /** Caches an index row into index tuple vector
95 @param[in] row table row
96 @param[in] ext externally stored column
97 prefixes, or NULL */
add(const dtuple_t * row,const row_ext_t * ext)98 void add(
99 const dtuple_t* row,
100 const row_ext_t* ext) UNIV_NOTHROW
101 {
102 dtuple_t* dtuple;
103
104 dtuple = row_build_index_entry(row, ext, m_index, m_heap);
105
106 ut_ad(dtuple);
107
108 m_dtuple_vec->push_back(dtuple);
109 }
110
111 /** Insert spatial index rows cached in vector into spatial index
112 @param[in] trx_id transaction id
113 @param[in,out] row_heap memory heap
114 @param[in] pcur cluster index scanning cursor
115 @param[in,out] scan_mtr mini-transaction for pcur
116 @param[out] mtr_committed whether scan_mtr got committed
117 @return DB_SUCCESS if successful, else error number */
insert(trx_id_t trx_id,mem_heap_t * row_heap,btr_pcur_t * pcur,mtr_t * scan_mtr,bool * mtr_committed)118 dberr_t insert(
119 trx_id_t trx_id,
120 mem_heap_t* row_heap,
121 btr_pcur_t* pcur,
122 mtr_t* scan_mtr,
123 bool* mtr_committed)
124 {
125 big_rec_t* big_rec;
126 rec_t* rec;
127 btr_cur_t ins_cur;
128 mtr_t mtr;
129 rtr_info_t rtr_info;
130 ulint* ins_offsets = NULL;
131 dberr_t error = DB_SUCCESS;
132 dtuple_t* dtuple;
133 ulint count = 0;
134 const ulint flag = BTR_NO_UNDO_LOG_FLAG
135 | BTR_NO_LOCKING_FLAG
136 | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG;
137
138 ut_ad(dict_index_is_spatial(m_index));
139
140 DBUG_EXECUTE_IF("row_merge_instrument_log_check_flush",
141 log_sys->check_flush_or_checkpoint = true;
142 );
143
144 for (idx_tuple_vec::iterator it = m_dtuple_vec->begin();
145 it != m_dtuple_vec->end();
146 ++it) {
147 dtuple = *it;
148 ut_ad(dtuple);
149
150 if (log_sys->check_flush_or_checkpoint) {
151 if (!(*mtr_committed)) {
152 btr_pcur_move_to_prev_on_page(pcur);
153 btr_pcur_store_position(pcur, scan_mtr);
154 mtr_commit(scan_mtr);
155 *mtr_committed = true;
156 }
157
158 log_free_check();
159 }
160
161 mtr.start();
162 mtr.set_named_space(m_index->space);
163
164 ins_cur.index = m_index;
165 rtr_init_rtr_info(&rtr_info, false, &ins_cur, m_index,
166 false);
167 rtr_info_update_btr(&ins_cur, &rtr_info);
168
169 btr_cur_search_to_nth_level(m_index, 0, dtuple,
170 PAGE_CUR_RTREE_INSERT,
171 BTR_MODIFY_LEAF, &ins_cur,
172 0, __FILE__, __LINE__,
173 &mtr);
174
175 /* It need to update MBR in parent entry,
176 so change search mode to BTR_MODIFY_TREE */
177 if (rtr_info.mbr_adj) {
178 mtr_commit(&mtr);
179 rtr_clean_rtr_info(&rtr_info, true);
180 rtr_init_rtr_info(&rtr_info, false, &ins_cur,
181 m_index, false);
182 rtr_info_update_btr(&ins_cur, &rtr_info);
183 mtr_start(&mtr);
184 mtr.set_named_space(m_index->space);
185 btr_cur_search_to_nth_level(
186 m_index, 0, dtuple,
187 PAGE_CUR_RTREE_INSERT,
188 BTR_MODIFY_TREE, &ins_cur, 0,
189 __FILE__, __LINE__, &mtr);
190 }
191
192 error = btr_cur_optimistic_insert(
193 flag, &ins_cur, &ins_offsets, &row_heap,
194 dtuple, &rec, &big_rec, 0, NULL, &mtr);
195
196 if (error == DB_FAIL) {
197 ut_ad(!big_rec);
198 mtr.commit();
199 mtr.start();
200 mtr.set_named_space(m_index->space);
201
202 rtr_clean_rtr_info(&rtr_info, true);
203 rtr_init_rtr_info(&rtr_info, false,
204 &ins_cur, m_index, false);
205
206 rtr_info_update_btr(&ins_cur, &rtr_info);
207 btr_cur_search_to_nth_level(
208 m_index, 0, dtuple,
209 PAGE_CUR_RTREE_INSERT,
210 BTR_MODIFY_TREE,
211 &ins_cur, 0,
212 __FILE__, __LINE__, &mtr);
213
214
215 error = btr_cur_pessimistic_insert(
216 flag, &ins_cur, &ins_offsets,
217 &row_heap, dtuple, &rec,
218 &big_rec, 0, NULL, &mtr);
219 }
220
221 DBUG_EXECUTE_IF(
222 "row_merge_ins_spatial_fail",
223 error = DB_FAIL;
224 );
225
226 if (error == DB_SUCCESS) {
227 if (rtr_info.mbr_adj) {
228 error = rtr_ins_enlarge_mbr(
229 &ins_cur, NULL, &mtr);
230 }
231
232 if (error == DB_SUCCESS) {
233 page_update_max_trx_id(
234 btr_cur_get_block(&ins_cur),
235 btr_cur_get_page_zip(&ins_cur),
236 trx_id, &mtr);
237 }
238 }
239
240 mtr_commit(&mtr);
241
242 rtr_clean_rtr_info(&rtr_info, true);
243 count++;
244 }
245
246 m_dtuple_vec->clear();
247
248 return(error);
249 }
250
251 private:
252 /** Cache index rows made from a cluster index scan. Usually
253 for rows on single cluster index page */
254 typedef std::vector<dtuple_t*, ut_allocator<dtuple_t*> >
255 idx_tuple_vec;
256
257 /** vector used to cache index rows made from cluster index scan */
258 idx_tuple_vec* m_dtuple_vec;
259
260 /** the index being built */
261 dict_index_t* m_index;
262
263 /** memory heap for creating index tuples */
264 mem_heap_t* m_heap;
265 };
266
267 /* Maximum pending doc memory limit in bytes for a fts tokenization thread */
268 #define FTS_PENDING_DOC_MEMORY_LIMIT 1000000
269
270 /** Insert sorted data tuples to the index.
271 @param[in] trx_id transaction identifier
272 @param[in] index index to be inserted
273 @param[in] old_table old table
274 @param[in] fd file descriptor
275 @param[in,out] block file buffer
276 @param[in] row_buf row_buf the sorted data tuples,
277 or NULL if fd, block will be used instead
278 @param[in,out] btr_bulk btr bulk instance
279 @param[in,out] stage performance schema accounting object, used by
280 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
281 and then stage->inc() will be called for each record that is processed.
282 @return DB_SUCCESS or error number */
283 static MY_ATTRIBUTE((warn_unused_result))
284 dberr_t
285 row_merge_insert_index_tuples(
286 trx_id_t trx_id,
287 dict_index_t* index,
288 const dict_table_t* old_table,
289 int fd,
290 row_merge_block_t* block,
291 const row_merge_buf_t* row_buf,
292 BtrBulk* btr_bulk,
293 ut_stage_alter_t* stage = NULL);
294
295 /******************************************************//**
296 Encode an index record. */
297 static
298 void
row_merge_buf_encode(byte ** b,const dict_index_t * index,const mtuple_t * entry,ulint n_fields)299 row_merge_buf_encode(
300 /*=================*/
301 byte** b, /*!< in/out: pointer to
302 current end of output buffer */
303 const dict_index_t* index, /*!< in: index */
304 const mtuple_t* entry, /*!< in: index fields
305 of the record to encode */
306 ulint n_fields) /*!< in: number of fields
307 in the entry */
308 {
309 ulint size;
310 ulint extra_size;
311
312 size = rec_get_converted_size_temp(
313 index, entry->fields, n_fields, NULL, &extra_size);
314 ut_ad(size >= extra_size);
315
316 /* Encode extra_size + 1 */
317 if (extra_size + 1 < 0x80) {
318 *(*b)++ = (byte) (extra_size + 1);
319 } else {
320 ut_ad((extra_size + 1) < 0x8000);
321 *(*b)++ = (byte) (0x80 | ((extra_size + 1) >> 8));
322 *(*b)++ = (byte) (extra_size + 1);
323 }
324
325 rec_convert_dtuple_to_temp(*b + extra_size, index,
326 entry->fields, n_fields, NULL);
327
328 *b += size;
329 }
330
331 /******************************************************//**
332 Allocate a sort buffer.
333 @return own: sort buffer */
334 static MY_ATTRIBUTE((malloc))
335 row_merge_buf_t*
row_merge_buf_create_low(mem_heap_t * heap,dict_index_t * index,ulint max_tuples,ulint buf_size)336 row_merge_buf_create_low(
337 /*=====================*/
338 mem_heap_t* heap, /*!< in: heap where allocated */
339 dict_index_t* index, /*!< in: secondary index */
340 ulint max_tuples, /*!< in: maximum number of
341 data tuples */
342 ulint buf_size) /*!< in: size of the buffer,
343 in bytes */
344 {
345 row_merge_buf_t* buf;
346
347 ut_ad(max_tuples > 0);
348
349 ut_ad(max_tuples <= srv_sort_buf_size);
350
351 buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
352 buf->heap = heap;
353 buf->index = index;
354 buf->max_tuples = max_tuples;
355 buf->tuples = static_cast<mtuple_t*>(
356 ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
357 buf->tmp_tuples = buf->tuples + max_tuples;
358
359 return(buf);
360 }
361
362 /******************************************************//**
363 Allocate a sort buffer.
364 @return own: sort buffer */
365 row_merge_buf_t*
row_merge_buf_create(dict_index_t * index)366 row_merge_buf_create(
367 /*=================*/
368 dict_index_t* index) /*!< in: secondary index */
369 {
370 row_merge_buf_t* buf;
371 ulint max_tuples;
372 ulint buf_size;
373 mem_heap_t* heap;
374
375 max_tuples = static_cast<ulint>(srv_sort_buf_size)
376 / ut_max(static_cast<ulint>(1),
377 dict_index_get_min_size(index));
378
379 buf_size = (sizeof *buf);
380
381 heap = mem_heap_create(buf_size);
382
383 buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
384
385 return(buf);
386 }
387
388 /******************************************************//**
389 Empty a sort buffer.
390 @return sort buffer */
391 row_merge_buf_t*
row_merge_buf_empty(row_merge_buf_t * buf)392 row_merge_buf_empty(
393 /*================*/
394 row_merge_buf_t* buf) /*!< in,own: sort buffer */
395 {
396 ulint buf_size = sizeof *buf;
397 ulint max_tuples = buf->max_tuples;
398 mem_heap_t* heap = buf->heap;
399 dict_index_t* index = buf->index;
400 mtuple_t* tuples = buf->tuples;
401
402 mem_heap_empty(heap);
403
404 buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
405 buf->heap = heap;
406 buf->index = index;
407 buf->max_tuples = max_tuples;
408 buf->tuples = tuples;
409 buf->tmp_tuples = buf->tuples + max_tuples;
410
411 return(buf);
412 }
413
414 /******************************************************//**
415 Deallocate a sort buffer. */
416 void
row_merge_buf_free(row_merge_buf_t * buf)417 row_merge_buf_free(
418 /*===============*/
419 row_merge_buf_t* buf) /*!< in,own: sort buffer to be freed */
420 {
421 ut_free(buf->tuples);
422 mem_heap_free(buf->heap);
423 }
424
425 /** Convert the field data from compact to redundant format.
426 @param[in] row_field field to copy from
427 @param[out] field field to copy to
428 @param[in] len length of the field data
429 @param[in] zip_size compressed BLOB page size,
430 zero for uncompressed BLOBs
431 @param[in,out] heap memory heap where to allocate data when
432 converting to ROW_FORMAT=REDUNDANT, or NULL
433 when not to invoke
434 row_merge_buf_redundant_convert(). */
435 static
436 void
row_merge_buf_redundant_convert(const dfield_t * row_field,dfield_t * field,ulint len,const page_size_t & page_size,mem_heap_t * heap)437 row_merge_buf_redundant_convert(
438 const dfield_t* row_field,
439 dfield_t* field,
440 ulint len,
441 const page_size_t& page_size,
442 mem_heap_t* heap)
443 {
444 ut_ad(DATA_MBMINLEN(field->type.mbminmaxlen) == 1);
445 ut_ad(DATA_MBMAXLEN(field->type.mbminmaxlen) > 1);
446
447 byte* buf = (byte*) mem_heap_alloc(heap, len);
448 ulint field_len = row_field->len;
449 ut_ad(field_len <= len);
450
451 if (row_field->ext) {
452 const byte* field_data = static_cast<byte*>(
453 dfield_get_data(row_field));
454 ulint ext_len;
455
456 ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
457 ut_a(memcmp(field_data + field_len - BTR_EXTERN_FIELD_REF_SIZE,
458 field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
459
460 byte* data = btr_copy_externally_stored_field(
461 &ext_len, field_data, page_size, field_len, heap);
462
463 ut_ad(ext_len < len);
464
465 memcpy(buf, data, ext_len);
466 field_len = ext_len;
467 } else {
468 memcpy(buf, row_field->data, field_len);
469 }
470
471 memset(buf + field_len, 0x20, len - field_len);
472
473 dfield_set_data(field, buf, len);
474 }
475
476 /** Insert a data tuple into a sort buffer.
477 @param[in,out] buf sort buffer
478 @param[in] fts_index fts index to be created
479 @param[in] old_table original table
480 @param[in] new_table new table
481 @param[in,out] psort_info parallel sort info
482 @param[in] row table row
483 @param[in] ext cache of externally stored
484 column prefixes, or NULL
485 @param[in,out] doc_id Doc ID if we are creating
486 FTS index
487 @param[in,out] conv_heap memory heap where to allocate data when
488 converting to ROW_FORMAT=REDUNDANT, or NULL
489 when not to invoke
490 row_merge_buf_redundant_convert()
491 @param[in,out] err set if error occurs
492 @param[in,out] v_heap heap memory to process data for virtual column
493 @param[in,out] my_table mysql table object
494 @param[in] trx transaction object
495 @return number of rows added, 0 if out of space */
496 static
497 ulint
row_merge_buf_add(row_merge_buf_t * buf,dict_index_t * fts_index,const dict_table_t * old_table,const dict_table_t * new_table,fts_psort_t * psort_info,const dtuple_t * row,const row_ext_t * ext,doc_id_t * doc_id,mem_heap_t * conv_heap,dberr_t * err,mem_heap_t ** v_heap,TABLE * my_table,trx_t * trx)498 row_merge_buf_add(
499 row_merge_buf_t* buf,
500 dict_index_t* fts_index,
501 const dict_table_t* old_table,
502 const dict_table_t* new_table,
503 fts_psort_t* psort_info,
504 const dtuple_t* row,
505 const row_ext_t* ext,
506 doc_id_t* doc_id,
507 mem_heap_t* conv_heap,
508 dberr_t* err,
509 mem_heap_t** v_heap,
510 TABLE* my_table,
511 trx_t* trx)
512 {
513 ulint i;
514 const dict_index_t* index;
515 mtuple_t* entry;
516 dfield_t* field;
517 const dict_field_t* ifield;
518 ulint n_fields;
519 ulint data_size;
520 ulint extra_size;
521 ulint bucket = 0;
522 doc_id_t write_doc_id;
523 ulint n_row_added = 0;
524 DBUG_ENTER("row_merge_buf_add");
525
526 if (buf->n_tuples >= buf->max_tuples) {
527 DBUG_RETURN(0);
528 }
529
530 DBUG_EXECUTE_IF(
531 "ib_row_merge_buf_add_two",
532 if (buf->n_tuples >= 2) DBUG_RETURN(0););
533
534 UNIV_PREFETCH_R(row->fields);
535
536 /* If we are building FTS index, buf->index points to
537 the 'fts_sort_idx', and real FTS index is stored in
538 fts_index */
539 index = (buf->index->type & DICT_FTS) ? fts_index : buf->index;
540
541 /* create spatial index should not come here */
542 ut_ad(!dict_index_is_spatial(index));
543
544 n_fields = dict_index_get_n_fields(index);
545
546 entry = &buf->tuples[buf->n_tuples];
547 field = entry->fields = static_cast<dfield_t*>(
548 mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
549
550 data_size = 0;
551 extra_size = UT_BITS_IN_BYTES(index->n_nullable);
552
553 ifield = dict_index_get_nth_field(index, 0);
554
555 for (i = 0; i < n_fields; i++, field++, ifield++) {
556 ulint len;
557 const dict_col_t* col;
558 const dict_v_col_t* v_col = NULL;
559 ulint col_no;
560 ulint fixed_len;
561 const dfield_t* row_field;
562
563 col = ifield->col;
564 if (dict_col_is_virtual(col)) {
565 v_col = reinterpret_cast<const dict_v_col_t*>(col);
566 }
567
568 col_no = dict_col_get_no(col);
569
570 /* Process the Doc ID column */
571 if (*doc_id > 0
572 && col_no == index->table->fts->doc_col
573 && !dict_col_is_virtual(col)) {
574 fts_write_doc_id((byte*) &write_doc_id, *doc_id);
575
576 /* Note: field->data now points to a value on the
577 stack: &write_doc_id after dfield_set_data(). Because
578 there is only one doc_id per row, it shouldn't matter.
579 We allocate a new buffer before we leave the function
580 later below. */
581
582 dfield_set_data(
583 field, &write_doc_id, sizeof(write_doc_id));
584
585 field->type.mtype = ifield->col->mtype;
586 field->type.prtype = ifield->col->prtype;
587 field->type.mbminmaxlen = DATA_MBMINMAXLEN(0, 0);
588 field->type.len = ifield->col->len;
589 } else {
590 /* Use callback to get the virtual column value */
591 if (dict_col_is_virtual(col)) {
592 dict_index_t* clust_index
593 = dict_table_get_first_index(new_table);
594
595 row_field = innobase_get_computed_value(
596 row, v_col, clust_index,
597 v_heap, NULL, ifield, trx->mysql_thd,
598 my_table, old_table, NULL, NULL);
599
600 if (row_field == NULL) {
601 *err = DB_COMPUTE_VALUE_FAILED;
602 DBUG_RETURN(0);
603 }
604 dfield_copy(field, row_field);
605 } else {
606 row_field = dtuple_get_nth_field(row, col_no);
607 dfield_copy(field, row_field);
608 }
609
610
611 /* Tokenize and process data for FTS */
612 if (index->type & DICT_FTS) {
613 fts_doc_item_t* doc_item;
614 byte* value;
615 void* ptr;
616 const ulint max_trial_count = 10000;
617 ulint trial_count = 0;
618
619 /* fetch Doc ID if it already exists
620 in the row, and not supplied by the
621 caller. Even if the value column is
622 NULL, we still need to get the Doc
623 ID so to maintain the correct max
624 Doc ID */
625 if (*doc_id == 0) {
626 const dfield_t* doc_field;
627 doc_field = dtuple_get_nth_field(
628 row,
629 index->table->fts->doc_col);
630 *doc_id = (doc_id_t) mach_read_from_8(
631 static_cast<byte*>(
632 dfield_get_data(doc_field)));
633
634 if (*doc_id == 0) {
635 ib::warn() << "FTS Doc ID is"
636 " zero. Record"
637 " skipped";
638 DBUG_RETURN(0);
639 }
640 }
641
642 if (dfield_is_null(field)) {
643 n_row_added = 1;
644 continue;
645 }
646
647 ptr = ut_malloc_nokey(sizeof(*doc_item)
648 + field->len);
649
650 doc_item = static_cast<fts_doc_item_t*>(ptr);
651 value = static_cast<byte*>(ptr)
652 + sizeof(*doc_item);
653 memcpy(value, field->data, field->len);
654 field->data = value;
655
656 doc_item->field = field;
657 doc_item->doc_id = *doc_id;
658
659 bucket = *doc_id % fts_sort_pll_degree;
660
661 /* Add doc item to fts_doc_list */
662 mutex_enter(&psort_info[bucket].mutex);
663
664 if (psort_info[bucket].error == DB_SUCCESS) {
665 UT_LIST_ADD_LAST(
666 psort_info[bucket].fts_doc_list,
667 doc_item);
668 psort_info[bucket].memory_used +=
669 sizeof(*doc_item) + field->len;
670 } else {
671 ut_free(doc_item);
672 }
673
674 mutex_exit(&psort_info[bucket].mutex);
675
676 /* Sleep when memory used exceeds limit*/
677 while (psort_info[bucket].memory_used
678 > FTS_PENDING_DOC_MEMORY_LIMIT
679 && trial_count++ < max_trial_count) {
680 os_thread_sleep(1000);
681 }
682
683 n_row_added = 1;
684 continue;
685 }
686
687 if (field->len != UNIV_SQL_NULL
688 && col->mtype == DATA_MYSQL
689 && col->len != field->len) {
690 if (conv_heap != NULL) {
691 row_merge_buf_redundant_convert(
692 row_field, field, col->len,
693 dict_table_page_size(old_table),
694 conv_heap);
695 } else {
696 /* Field length mismatch should not
697 happen when rebuilding redundant row
698 format table. */
699 ut_ad(dict_table_is_comp(index->table));
700 }
701 }
702 }
703
704 len = dfield_get_len(field);
705
706 if (dfield_is_null(field)) {
707 ut_ad(!(col->prtype & DATA_NOT_NULL));
708 continue;
709 } else if (!ext) {
710 } else if (dict_index_is_clust(index)) {
711 /* Flag externally stored fields. */
712 const byte* buf = row_ext_lookup(ext, col_no,
713 &len);
714 if (UNIV_LIKELY_NULL(buf)) {
715 ut_a(buf != field_ref_zero);
716 if (i < dict_index_get_n_unique(index)) {
717 dfield_set_data(field, buf, len);
718 } else {
719 dfield_set_ext(field);
720 len = dfield_get_len(field);
721 }
722 }
723 } else if (!dict_col_is_virtual(col)) {
724 /* Only non-virtual column are stored externally */
725 const byte* buf = row_ext_lookup(ext, col_no,
726 &len);
727 if (UNIV_LIKELY_NULL(buf)) {
728 ut_a(buf != field_ref_zero);
729 dfield_set_data(field, buf, len);
730 }
731 }
732
733 /* If a column prefix index, take only the prefix */
734
735 if (ifield->prefix_len) {
736 len = dtype_get_at_most_n_mbchars(
737 col->prtype,
738 col->mbminmaxlen,
739 ifield->prefix_len,
740 len,
741 static_cast<char*>(dfield_get_data(field)));
742 dfield_set_len(field, len);
743 }
744
745 ut_ad(len <= col->len
746 || DATA_LARGE_MTYPE(col->mtype)
747 || (col->mtype == DATA_POINT
748 && len == DATA_MBR_LEN));
749
750 fixed_len = ifield->fixed_len;
751 if (fixed_len && !dict_table_is_comp(index->table)
752 && DATA_MBMINLEN(col->mbminmaxlen)
753 != DATA_MBMAXLEN(col->mbminmaxlen)) {
754 /* CHAR in ROW_FORMAT=REDUNDANT is always
755 fixed-length, but in the temporary file it is
756 variable-length for variable-length character
757 sets. */
758 fixed_len = 0;
759 }
760
761 if (fixed_len) {
762 #ifdef UNIV_DEBUG
763 ulint mbminlen = DATA_MBMINLEN(col->mbminmaxlen);
764 ulint mbmaxlen = DATA_MBMAXLEN(col->mbminmaxlen);
765
766 /* len should be between size calcualted base on
767 mbmaxlen and mbminlen */
768 ut_ad(len <= fixed_len);
769 ut_ad(!mbmaxlen || len >= mbminlen
770 * (fixed_len / mbmaxlen));
771
772 ut_ad(!dfield_is_ext(field));
773 #endif /* UNIV_DEBUG */
774 } else if (dfield_is_ext(field)) {
775 extra_size += 2;
776 } else if (len < 128
777 || (!DATA_BIG_COL(col))) {
778 extra_size++;
779 } else {
780 /* For variable-length columns, we look up the
781 maximum length from the column itself. If this
782 is a prefix index column shorter than 256 bytes,
783 this will waste one byte. */
784 extra_size += 2;
785 }
786 data_size += len;
787 }
788
789 /* If this is FTS index, we already populated the sort buffer, return
790 here */
791 if (index->type & DICT_FTS) {
792 DBUG_RETURN(n_row_added);
793 }
794
795 #ifdef UNIV_DEBUG
796 {
797 ulint size;
798 ulint extra;
799
800 size = rec_get_converted_size_temp(
801 index, entry->fields, n_fields, NULL, &extra);
802
803 ut_ad(data_size + extra_size == size);
804 ut_ad(extra_size == extra);
805 }
806 #endif /* UNIV_DEBUG */
807
808 /* Add to the total size of the record in row_merge_block_t
809 the encoded length of extra_size and the extra bytes (extra_size).
810 See row_merge_buf_write() for the variable-length encoding
811 of extra_size. */
812 data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
813
814 /* Record size can exceed page size while converting to
815 redundant row format. But there is assert
816 ut_ad(size < UNIV_PAGE_SIZE) in rec_offs_data_size().
817 It may hit the assert before attempting to insert the row. */
818 if (conv_heap != NULL && data_size > UNIV_PAGE_SIZE) {
819 *err = DB_TOO_BIG_RECORD;
820 }
821
822 ut_ad(data_size < srv_sort_buf_size);
823
824 /* Reserve one byte for the end marker of row_merge_block_t. */
825 if (buf->total_size + data_size >= srv_sort_buf_size - 1) {
826 DBUG_RETURN(0);
827 }
828
829 buf->total_size += data_size;
830 buf->n_tuples++;
831 n_row_added++;
832
833 field = entry->fields;
834
835 /* Copy the data fields. */
836
837 do {
838 dfield_dup(field++, buf->heap);
839 } while (--n_fields);
840
841 if (conv_heap != NULL) {
842 mem_heap_empty(conv_heap);
843 }
844
845 DBUG_RETURN(n_row_added);
846 }
847
848 /*************************************************************//**
849 Report a duplicate key. */
850 void
row_merge_dup_report(row_merge_dup_t * dup,const dfield_t * entry)851 row_merge_dup_report(
852 /*=================*/
853 row_merge_dup_t* dup, /*!< in/out: for reporting duplicates */
854 const dfield_t* entry) /*!< in: duplicate index entry */
855 {
856 if (!dup->n_dup++) {
857 /* Only report the first duplicate record,
858 but count all duplicate records. */
859 innobase_fields_to_mysql(dup->table, dup->index, entry);
860 }
861 }
862
863 /*************************************************************//**
864 Compare two tuples.
865 @return positive, 0, negative if a is greater, equal, less, than b,
866 respectively */
867 static MY_ATTRIBUTE((warn_unused_result))
868 int
row_merge_tuple_cmp(ulint n_uniq,ulint n_field,const mtuple_t & a,const mtuple_t & b,row_merge_dup_t * dup)869 row_merge_tuple_cmp(
870 /*================*/
871 ulint n_uniq, /*!< in: number of unique fields */
872 ulint n_field,/*!< in: number of fields */
873 const mtuple_t& a, /*!< in: first tuple to be compared */
874 const mtuple_t& b, /*!< in: second tuple to be compared */
875 row_merge_dup_t* dup) /*!< in/out: for reporting duplicates,
876 NULL if non-unique index */
877 {
878 int cmp;
879 const dfield_t* af = a.fields;
880 const dfield_t* bf = b.fields;
881 ulint n = n_uniq;
882
883 ut_ad(n_uniq > 0);
884 ut_ad(n_uniq <= n_field);
885
886 /* Compare the fields of the tuples until a difference is
887 found or we run out of fields to compare. If !cmp at the
888 end, the tuples are equal. */
889 do {
890 cmp = cmp_dfield_dfield(af++, bf++);
891 } while (!cmp && --n);
892
893 if (cmp) {
894 return(cmp);
895 }
896
897 if (dup) {
898 /* Report a duplicate value error if the tuples are
899 logically equal. NULL columns are logically inequal,
900 although they are equal in the sorting order. Find
901 out if any of the fields are NULL. */
902 for (const dfield_t* df = a.fields; df != af; df++) {
903 if (dfield_is_null(df)) {
904 goto no_report;
905 }
906 }
907
908 row_merge_dup_report(dup, a.fields);
909 }
910
911 no_report:
912 /* The n_uniq fields were equal, but we compare all fields so
913 that we will get the same (internal) order as in the B-tree. */
914 for (n = n_field - n_uniq + 1; --n; ) {
915 cmp = cmp_dfield_dfield(af++, bf++);
916 if (cmp) {
917 return(cmp);
918 }
919 }
920
921 /* This should never be reached, except in a secondary index
922 when creating a secondary index and a PRIMARY KEY, and there
923 is a duplicate in the PRIMARY KEY that has not been detected
924 yet. Internally, an index must never contain duplicates. */
925 return(cmp);
926 }
927
928 /** Wrapper for row_merge_tuple_sort() to inject some more context to
929 UT_SORT_FUNCTION_BODY().
930 @param tuples array of tuples that being sorted
931 @param aux work area, same size as tuples[]
932 @param low lower bound of the sorting area, inclusive
933 @param high upper bound of the sorting area, inclusive */
934 #define row_merge_tuple_sort_ctx(tuples, aux, low, high) \
935 row_merge_tuple_sort(n_uniq, n_field, dup, tuples, aux, low, high)
936 /** Wrapper for row_merge_tuple_cmp() to inject some more context to
937 UT_SORT_FUNCTION_BODY().
938 @param a first tuple to be compared
939 @param b second tuple to be compared
940 @return positive, 0, negative, if a is greater, equal, less, than b,
941 respectively */
942 #define row_merge_tuple_cmp_ctx(a,b) \
943 row_merge_tuple_cmp(n_uniq, n_field, a, b, dup)
944
945 /**********************************************************************//**
946 Merge sort the tuple buffer in main memory. */
947 static
948 void
row_merge_tuple_sort(ulint n_uniq,ulint n_field,row_merge_dup_t * dup,mtuple_t * tuples,mtuple_t * aux,ulint low,ulint high)949 row_merge_tuple_sort(
950 /*=================*/
951 ulint n_uniq, /*!< in: number of unique fields */
952 ulint n_field,/*!< in: number of fields */
953 row_merge_dup_t* dup, /*!< in/out: reporter of duplicates
954 (NULL if non-unique index) */
955 mtuple_t* tuples, /*!< in/out: tuples */
956 mtuple_t* aux, /*!< in/out: work area */
957 ulint low, /*!< in: lower bound of the
958 sorting area, inclusive */
959 ulint high) /*!< in: upper bound of the
960 sorting area, exclusive */
961 {
962 ut_ad(n_field > 0);
963 ut_ad(n_uniq <= n_field);
964
965 UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
966 tuples, aux, low, high, row_merge_tuple_cmp_ctx);
967 }
968
969 /******************************************************//**
970 Sort a buffer. */
971 void
row_merge_buf_sort(row_merge_buf_t * buf,row_merge_dup_t * dup)972 row_merge_buf_sort(
973 /*===============*/
974 row_merge_buf_t* buf, /*!< in/out: sort buffer */
975 row_merge_dup_t* dup) /*!< in/out: reporter of duplicates
976 (NULL if non-unique index) */
977 {
978 ut_ad(!dict_index_is_spatial(buf->index));
979
980 row_merge_tuple_sort(dict_index_get_n_unique(buf->index),
981 dict_index_get_n_fields(buf->index),
982 dup,
983 buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
984 }
985
986 /******************************************************//**
987 Write a buffer to a block. */
988 void
row_merge_buf_write(const row_merge_buf_t * buf,const merge_file_t * of UNIV_UNUSED,row_merge_block_t * block)989 row_merge_buf_write(
990 /*================*/
991 const row_merge_buf_t* buf, /*!< in: sorted buffer */
992 const merge_file_t* of UNIV_UNUSED,
993 /*!< in: output file */
994 row_merge_block_t* block) /*!< out: buffer for writing to file */
995 {
996 const dict_index_t* index = buf->index;
997 ulint n_fields= dict_index_get_n_fields(index);
998 byte* b = &block[0];
999
1000 DBUG_ENTER("row_merge_buf_write");
1001
1002 for (ulint i = 0; i < buf->n_tuples; i++) {
1003 const mtuple_t* entry = &buf->tuples[i];
1004
1005 row_merge_buf_encode(&b, index, entry, n_fields);
1006 ut_ad(b < &block[srv_sort_buf_size]);
1007
1008 DBUG_PRINT("ib_merge_sort",
1009 ("%p,fd=%d,%lu %lu: %s",
1010 reinterpret_cast<const void*>(b), of->fd,
1011 ulong(of->offset), ulong(i),
1012 rec_printer(entry->fields,
1013 n_fields).str().c_str()));
1014 }
1015
1016 /* Write an "end-of-chunk" marker. */
1017 ut_a(b < &block[srv_sort_buf_size]);
1018 ut_a(b == &block[0] + buf->total_size);
1019 *b++ = 0;
1020 #ifdef UNIV_DEBUG_VALGRIND
1021 /* The rest of the block is uninitialized. Initialize it
1022 to avoid bogus warnings. */
1023 memset(b, 0xff, &block[srv_sort_buf_size] - b);
1024 #endif /* UNIV_DEBUG_VALGRIND */
1025 DBUG_PRINT("ib_merge_sort",
1026 ("write %p,%d,%lu EOF",
1027 reinterpret_cast<const void*>(b), of->fd,
1028 ulong(of->offset)));
1029 DBUG_VOID_RETURN;
1030 }
1031
1032 /******************************************************//**
1033 Create a memory heap and allocate space for row_merge_rec_offsets()
1034 and mrec_buf_t[3].
1035 @return memory heap */
1036 static
1037 mem_heap_t*
row_merge_heap_create(const dict_index_t * index,mrec_buf_t ** buf,ulint ** offsets1,ulint ** offsets2)1038 row_merge_heap_create(
1039 /*==================*/
1040 const dict_index_t* index, /*!< in: record descriptor */
1041 mrec_buf_t** buf, /*!< out: 3 buffers */
1042 ulint** offsets1, /*!< out: offsets */
1043 ulint** offsets2) /*!< out: offsets */
1044 {
1045 ulint i = 1 + REC_OFFS_HEADER_SIZE
1046 + dict_index_get_n_fields(index);
1047 mem_heap_t* heap = mem_heap_create(2 * i * sizeof **offsets1
1048 + 3 * sizeof **buf);
1049
1050 *buf = static_cast<mrec_buf_t*>(
1051 mem_heap_alloc(heap, 3 * sizeof **buf));
1052 *offsets1 = static_cast<ulint*>(
1053 mem_heap_alloc(heap, i * sizeof **offsets1));
1054 *offsets2 = static_cast<ulint*>(
1055 mem_heap_alloc(heap, i * sizeof **offsets2));
1056
1057 (*offsets1)[0] = (*offsets2)[0] = i;
1058 (*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
1059
1060 return(heap);
1061 }
1062
1063 /********************************************************************//**
1064 Read a merge block from the file system.
1065 @return TRUE if request was successful, FALSE if fail */
1066 ibool
row_merge_read(int fd,ulint offset,row_merge_block_t * buf)1067 row_merge_read(
1068 /*===========*/
1069 int fd, /*!< in: file descriptor */
1070 ulint offset, /*!< in: offset where to read
1071 in number of row_merge_block_t
1072 elements */
1073 row_merge_block_t* buf) /*!< out: data */
1074 {
1075 os_offset_t ofs = ((os_offset_t) offset) * srv_sort_buf_size;
1076 dberr_t err;
1077
1078 DBUG_ENTER("row_merge_read");
1079 DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
1080 DBUG_EXECUTE_IF("row_merge_read_failure", DBUG_RETURN(FALSE););
1081
1082 IORequest request;
1083
1084 /* Merge sort pages are never compressed. */
1085 request.disable_compression();
1086
1087 err = os_file_read_no_error_handling_int_fd(
1088 request,
1089 fd, buf, ofs, srv_sort_buf_size, NULL);
1090
1091 #ifdef POSIX_FADV_DONTNEED
1092 /* Each block is read exactly once. Free up the file cache. */
1093 posix_fadvise(fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
1094 #endif /* POSIX_FADV_DONTNEED */
1095
1096 if (err != DB_SUCCESS) {
1097 ib::error() << "Failed to read merge block at " << ofs;
1098 }
1099
1100 DBUG_RETURN(err == DB_SUCCESS);
1101 }
1102
1103 /********************************************************************//**
1104 Write a merge block to the file system.
1105 @return TRUE if request was successful, FALSE if fail */
1106 ibool
row_merge_write(int fd,ulint offset,const void * buf)1107 row_merge_write(
1108 /*============*/
1109 int fd, /*!< in: file descriptor */
1110 ulint offset, /*!< in: offset where to write,
1111 in number of row_merge_block_t elements */
1112 const void* buf) /*!< in: data */
1113 {
1114 size_t buf_len = srv_sort_buf_size;
1115 os_offset_t ofs = buf_len * (os_offset_t) offset;
1116 dberr_t err;
1117
1118 DBUG_ENTER("row_merge_write");
1119 DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
1120 DBUG_EXECUTE_IF("row_merge_write_failure", DBUG_RETURN(FALSE););
1121
1122 IORequest request(IORequest::WRITE);
1123
1124 request.disable_compression();
1125
1126 err = os_file_write_int_fd(
1127 request,
1128 "(merge)", fd, buf, ofs, buf_len);
1129
1130 #ifdef POSIX_FADV_DONTNEED
1131 /* The block will be needed on the next merge pass,
1132 but it can be evicted from the file cache meanwhile. */
1133 posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
1134 #endif /* POSIX_FADV_DONTNEED */
1135
1136 DBUG_RETURN(err == DB_SUCCESS);
1137 }
1138
1139 /********************************************************************//**
1140 Read a merge record.
1141 @return pointer to next record, or NULL on I/O error or end of list */
1142 const byte*
row_merge_read_rec(row_merge_block_t * block,mrec_buf_t * buf,const byte * b,const dict_index_t * index,int fd,ulint * foffs,const mrec_t ** mrec,ulint * offsets)1143 row_merge_read_rec(
1144 /*===============*/
1145 row_merge_block_t* block, /*!< in/out: file buffer */
1146 mrec_buf_t* buf, /*!< in/out: secondary buffer */
1147 const byte* b, /*!< in: pointer to record */
1148 const dict_index_t* index, /*!< in: index of the record */
1149 int fd, /*!< in: file descriptor */
1150 ulint* foffs, /*!< in/out: file offset */
1151 const mrec_t** mrec, /*!< out: pointer to merge record,
1152 or NULL on end of list
1153 (non-NULL on I/O error) */
1154 ulint* offsets)/*!< out: offsets of mrec */
1155 {
1156 ulint extra_size;
1157 ulint data_size;
1158 ulint avail_size;
1159
1160 ut_ad(block);
1161 ut_ad(buf);
1162 ut_ad(b >= &block[0]);
1163 ut_ad(b < &block[srv_sort_buf_size]);
1164 ut_ad(index);
1165 ut_ad(foffs);
1166 ut_ad(mrec);
1167 ut_ad(offsets);
1168
1169 ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
1170 + dict_index_get_n_fields(index));
1171
1172 DBUG_ENTER("row_merge_read_rec");
1173
1174 extra_size = *b++;
1175
1176 if (UNIV_UNLIKELY(!extra_size)) {
1177 /* End of list */
1178 *mrec = NULL;
1179 DBUG_PRINT("ib_merge_sort",
1180 ("read %p,%p,%d,%lu EOF\n",
1181 reinterpret_cast<const void*>(b),
1182 reinterpret_cast<const void*>(block),
1183 fd, ulong(*foffs)));
1184 DBUG_RETURN(NULL);
1185 }
1186
1187 if (extra_size >= 0x80) {
1188 /* Read another byte of extra_size. */
1189
1190 if (UNIV_UNLIKELY(b >= &block[srv_sort_buf_size])) {
1191 if (!row_merge_read(fd, ++(*foffs), block)) {
1192 err_exit:
1193 /* Signal I/O error. */
1194 *mrec = b;
1195 DBUG_RETURN(NULL);
1196 }
1197
1198 /* Wrap around to the beginning of the buffer. */
1199 b = &block[0];
1200 }
1201
1202 extra_size = (extra_size & 0x7f) << 8;
1203 extra_size |= *b++;
1204 }
1205
1206 /* Normalize extra_size. Above, value 0 signals "end of list". */
1207 extra_size--;
1208
1209 /* Read the extra bytes. */
1210
1211 if (UNIV_UNLIKELY(b + extra_size >= &block[srv_sort_buf_size])) {
1212 /* The record spans two blocks. Copy the entire record
1213 to the auxiliary buffer and handle this as a special
1214 case. */
1215
1216 avail_size = &block[srv_sort_buf_size] - b;
1217 ut_ad(avail_size < sizeof *buf);
1218 memcpy(*buf, b, avail_size);
1219
1220 if (!row_merge_read(fd, ++(*foffs), block)) {
1221
1222 goto err_exit;
1223 }
1224
1225 /* Wrap around to the beginning of the buffer. */
1226 b = &block[0];
1227
1228 /* Copy the record. */
1229 memcpy(*buf + avail_size, b, extra_size - avail_size);
1230 b += extra_size - avail_size;
1231
1232 *mrec = *buf + extra_size;
1233
1234 rec_init_offsets_temp(*mrec, index, offsets);
1235
1236 data_size = rec_offs_data_size(offsets);
1237
1238 /* These overflows should be impossible given that
1239 records are much smaller than either buffer, and
1240 the record starts near the beginning of each buffer. */
1241 ut_a(extra_size + data_size < sizeof *buf);
1242 ut_a(b + data_size < &block[srv_sort_buf_size]);
1243
1244 /* Copy the data bytes. */
1245 memcpy(*buf + extra_size, b, data_size);
1246 b += data_size;
1247
1248 goto func_exit;
1249 }
1250
1251 *mrec = b + extra_size;
1252
1253 rec_init_offsets_temp(*mrec, index, offsets);
1254
1255 data_size = rec_offs_data_size(offsets);
1256 ut_ad(extra_size + data_size < sizeof *buf);
1257
1258 b += extra_size + data_size;
1259
1260 if (UNIV_LIKELY(b < &block[srv_sort_buf_size])) {
1261 /* The record fits entirely in the block.
1262 This is the normal case. */
1263 goto func_exit;
1264 }
1265
1266 /* The record spans two blocks. Copy it to buf. */
1267
1268 b -= extra_size + data_size;
1269 avail_size = &block[srv_sort_buf_size] - b;
1270 memcpy(*buf, b, avail_size);
1271 *mrec = *buf + extra_size;
1272
1273 /* We cannot invoke rec_offs_make_valid() here, because there
1274 are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
1275 Similarly, rec_offs_validate() would fail, because it invokes
1276 rec_get_status(). */
1277 ut_d(offsets[2] = (ulint) *mrec);
1278 ut_d(offsets[3] = (ulint) index);
1279
1280 if (!row_merge_read(fd, ++(*foffs), block)) {
1281
1282 goto err_exit;
1283 }
1284
1285 /* Wrap around to the beginning of the buffer. */
1286 b = &block[0];
1287
1288 /* Copy the rest of the record. */
1289 memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
1290 b += extra_size + data_size - avail_size;
1291
1292 func_exit:
1293 DBUG_PRINT("ib_merge_sort",
1294 ("%p,%p,fd=%d,%lu: %s",
1295 reinterpret_cast<const void*>(b),
1296 reinterpret_cast<const void*>(block),
1297 fd, ulong(*foffs),
1298 rec_printer(*mrec, 0, offsets).str().c_str()));
1299 DBUG_RETURN(b);
1300 }
1301
1302 /********************************************************************//**
1303 Write a merge record. */
1304 static
1305 void
row_merge_write_rec_low(byte * b,ulint e,ulint size,int fd,ulint foffs,const mrec_t * mrec,const ulint * offsets)1306 row_merge_write_rec_low(
1307 /*====================*/
1308 byte* b, /*!< out: buffer */
1309 ulint e, /*!< in: encoded extra_size */
1310 #ifndef NDEBUG
1311 ulint size, /*!< in: total size to write */
1312 int fd, /*!< in: file descriptor */
1313 ulint foffs, /*!< in: file offset */
1314 #endif /* !NDEBUG */
1315 const mrec_t* mrec, /*!< in: record to write */
1316 const ulint* offsets)/*!< in: offsets of mrec */
1317 #ifdef NDEBUG
1318 # define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets) \
1319 row_merge_write_rec_low(b, e, mrec, offsets)
1320 #endif /* NDEBUG */
1321 {
1322 DBUG_ENTER("row_merge_write_rec_low");
1323
1324 #ifndef NDEBUG
1325 const byte* const end = b + size;
1326 #endif /* NDEBUG */
1327 assert(e == rec_offs_extra_size(offsets) + 1);
1328 DBUG_PRINT("ib_merge_sort",
1329 ("%p,fd=%d,%lu: %s",
1330 reinterpret_cast<const void*>(b), fd, ulong(foffs),
1331 rec_printer(mrec, 0, offsets).str().c_str()));
1332
1333 if (e < 0x80) {
1334 *b++ = (byte) e;
1335 } else {
1336 *b++ = (byte) (0x80 | (e >> 8));
1337 *b++ = (byte) e;
1338 }
1339
1340 memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
1341 assert(b + rec_offs_size(offsets) == end);
1342 DBUG_VOID_RETURN;
1343 }
1344
1345 /********************************************************************//**
1346 Write a merge record.
1347 @return pointer to end of block, or NULL on error */
1348 static
1349 byte*
row_merge_write_rec(row_merge_block_t * block,mrec_buf_t * buf,byte * b,int fd,ulint * foffs,const mrec_t * mrec,const ulint * offsets)1350 row_merge_write_rec(
1351 /*================*/
1352 row_merge_block_t* block, /*!< in/out: file buffer */
1353 mrec_buf_t* buf, /*!< in/out: secondary buffer */
1354 byte* b, /*!< in: pointer to end of block */
1355 int fd, /*!< in: file descriptor */
1356 ulint* foffs, /*!< in/out: file offset */
1357 const mrec_t* mrec, /*!< in: record to write */
1358 const ulint* offsets)/*!< in: offsets of mrec */
1359 {
1360 ulint extra_size;
1361 ulint size;
1362 ulint avail_size;
1363
1364 ut_ad(block);
1365 ut_ad(buf);
1366 ut_ad(b >= &block[0]);
1367 ut_ad(b < &block[srv_sort_buf_size]);
1368 ut_ad(mrec);
1369 ut_ad(foffs);
1370 ut_ad(mrec < &block[0] || mrec > &block[srv_sort_buf_size]);
1371 ut_ad(mrec < buf[0] || mrec > buf[1]);
1372
1373 /* Normalize extra_size. Value 0 signals "end of list". */
1374 extra_size = rec_offs_extra_size(offsets) + 1;
1375
1376 size = extra_size + (extra_size >= 0x80)
1377 + rec_offs_data_size(offsets);
1378
1379 if (UNIV_UNLIKELY(b + size >= &block[srv_sort_buf_size])) {
1380 /* The record spans two blocks.
1381 Copy it to the temporary buffer first. */
1382 avail_size = &block[srv_sort_buf_size] - b;
1383
1384 row_merge_write_rec_low(buf[0],
1385 extra_size, size, fd, *foffs,
1386 mrec, offsets);
1387
1388 /* Copy the head of the temporary buffer, write
1389 the completed block, and copy the tail of the
1390 record to the head of the new block. */
1391 memcpy(b, buf[0], avail_size);
1392
1393 if (!row_merge_write(fd, (*foffs)++, block)) {
1394 return(NULL);
1395 }
1396
1397 UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1398
1399 /* Copy the rest. */
1400 b = &block[0];
1401 memcpy(b, buf[0] + avail_size, size - avail_size);
1402 b += size - avail_size;
1403 } else {
1404 row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
1405 mrec, offsets);
1406 b += size;
1407 }
1408
1409 return(b);
1410 }
1411
1412 /********************************************************************//**
1413 Write an end-of-list marker.
1414 @return pointer to end of block, or NULL on error */
1415 static
1416 byte*
row_merge_write_eof(row_merge_block_t * block,byte * b,int fd,ulint * foffs)1417 row_merge_write_eof(
1418 /*================*/
1419 row_merge_block_t* block, /*!< in/out: file buffer */
1420 byte* b, /*!< in: pointer to end of block */
1421 int fd, /*!< in: file descriptor */
1422 ulint* foffs) /*!< in/out: file offset */
1423 {
1424 ut_ad(block);
1425 ut_ad(b >= &block[0]);
1426 ut_ad(b < &block[srv_sort_buf_size]);
1427 ut_ad(foffs);
1428
1429 DBUG_ENTER("row_merge_write_eof");
1430 DBUG_PRINT("ib_merge_sort",
1431 ("%p,%p,fd=%d,%lu",
1432 reinterpret_cast<const void*>(b),
1433 reinterpret_cast<const void*>(block),
1434 fd, ulong(*foffs)));
1435
1436 *b++ = 0;
1437 UNIV_MEM_ASSERT_RW(&block[0], b - &block[0]);
1438 UNIV_MEM_ASSERT_W(&block[0], srv_sort_buf_size);
1439 #ifdef UNIV_DEBUG_VALGRIND
1440 /* The rest of the block is uninitialized. Initialize it
1441 to avoid bogus warnings. */
1442 memset(b, 0xff, &block[srv_sort_buf_size] - b);
1443 #endif /* UNIV_DEBUG_VALGRIND */
1444
1445 if (!row_merge_write(fd, (*foffs)++, block)) {
1446 DBUG_RETURN(NULL);
1447 }
1448
1449 UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1450 DBUG_RETURN(&block[0]);
1451 }
1452
1453 /** Create a temporary file if it has not been created already.
1454 @param[in,out] tmpfd temporary file handle
1455 @param[in] path location for creating temporary file
1456 @return file descriptor, or -1 on failure */
1457 static MY_ATTRIBUTE((warn_unused_result))
1458 int
row_merge_tmpfile_if_needed(int * tmpfd,const char * path)1459 row_merge_tmpfile_if_needed(
1460 int* tmpfd,
1461 const char* path)
1462 {
1463 if (*tmpfd < 0) {
1464 *tmpfd = row_merge_file_create_low(path);
1465 if (*tmpfd >= 0) {
1466 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1467 }
1468 }
1469
1470 return(*tmpfd);
1471 }
1472
1473 /** Create a temporary file for merge sort if it was not created already.
1474 @param[in,out] file merge file structure
1475 @param[in] nrec number of records in the file
1476 @param[in] path location for creating temporary file
1477 @return file descriptor, or -1 on failure */
1478 static MY_ATTRIBUTE((warn_unused_result))
1479 int
row_merge_file_create_if_needed(merge_file_t * file,int * tmpfd,ulint nrec,const char * path)1480 row_merge_file_create_if_needed(
1481 merge_file_t* file,
1482 int* tmpfd,
1483 ulint nrec,
1484 const char* path)
1485 {
1486 ut_ad(file->fd < 0 || *tmpfd >=0);
1487 if (file->fd < 0 && row_merge_file_create(file, path) >= 0) {
1488 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1489 if (row_merge_tmpfile_if_needed(tmpfd, path) < 0) {
1490 return(-1);
1491 }
1492
1493 file->n_rec = nrec;
1494 }
1495
1496 ut_ad(file->fd < 0 || *tmpfd >=0);
1497 return(file->fd);
1498 }
1499
1500 /** Copy the merge data tuple from another merge data tuple.
1501 @param[in] mtuple source merge data tuple
1502 @param[in,out] prev_mtuple destination merge data tuple
1503 @param[in] n_unique number of unique fields exist in the mtuple
1504 @param[in,out] heap memory heap where last_mtuple allocated */
1505 static
1506 void
row_mtuple_create(const mtuple_t * mtuple,mtuple_t * prev_mtuple,ulint n_unique,mem_heap_t * heap)1507 row_mtuple_create(
1508 const mtuple_t* mtuple,
1509 mtuple_t* prev_mtuple,
1510 ulint n_unique,
1511 mem_heap_t* heap)
1512 {
1513 memcpy(prev_mtuple->fields, mtuple->fields,
1514 n_unique * sizeof *mtuple->fields);
1515
1516 dfield_t* field = prev_mtuple->fields;
1517
1518 for (ulint i = 0; i < n_unique; i++) {
1519 dfield_dup(field++, heap);
1520 }
1521 }
1522
1523 /** Compare two merge data tuples.
1524 @param[in] prev_mtuple merge data tuple
1525 @param[in] current_mtuple merge data tuple
1526 @param[in,out] dup reporter of duplicates
1527 @retval positive, 0, negative if current_mtuple is greater, equal, less, than
1528 last_mtuple. */
1529 static
1530 int
row_mtuple_cmp(const mtuple_t * prev_mtuple,const mtuple_t * current_mtuple,row_merge_dup_t * dup)1531 row_mtuple_cmp(
1532 const mtuple_t* prev_mtuple,
1533 const mtuple_t* current_mtuple,
1534 row_merge_dup_t* dup)
1535 {
1536 ut_ad(dict_index_is_clust(dup->index));
1537 const ulint n_unique = dict_index_get_n_unique(dup->index);
1538
1539 return(row_merge_tuple_cmp(
1540 n_unique, n_unique, *current_mtuple, *prev_mtuple, dup));
1541 }
1542
1543 /** Insert cached spatial index rows.
1544 @param[in] trx_id transaction id
1545 @param[in] sp_tuples cached spatial rows
1546 @param[in] num_spatial number of spatial indexes
1547 @param[in,out] row_heap heap for insert
1548 @param[in,out] sp_heap heap for tuples
1549 @param[in,out] pcur cluster index cursor
1550 @param[in,out] mtr mini transaction
1551 @param[in,out] mtr_committed whether scan_mtr got committed
1552 @return DB_SUCCESS or error number */
1553 static
1554 dberr_t
row_merge_spatial_rows(trx_id_t trx_id,index_tuple_info_t ** sp_tuples,ulint num_spatial,mem_heap_t * row_heap,mem_heap_t * sp_heap,btr_pcur_t * pcur,mtr_t * mtr,bool * mtr_committed)1555 row_merge_spatial_rows(
1556 trx_id_t trx_id,
1557 index_tuple_info_t** sp_tuples,
1558 ulint num_spatial,
1559 mem_heap_t* row_heap,
1560 mem_heap_t* sp_heap,
1561 btr_pcur_t* pcur,
1562 mtr_t* mtr,
1563 bool* mtr_committed)
1564 {
1565 dberr_t err = DB_SUCCESS;
1566
1567 if (sp_tuples == NULL) {
1568 return(DB_SUCCESS);
1569 }
1570
1571 ut_ad(sp_heap != NULL);
1572
1573 for (ulint j = 0; j < num_spatial; j++) {
1574 err = sp_tuples[j]->insert(
1575 trx_id, row_heap,
1576 pcur, mtr, mtr_committed);
1577
1578 if (err != DB_SUCCESS) {
1579 return(err);
1580 }
1581 }
1582
1583 mem_heap_empty(sp_heap);
1584
1585 return(err);
1586 }
1587
1588 /** Check if the geometry field is valid.
1589 @param[in] row the row
1590 @param[in] index spatial index
1591 @return true if it's valid, false if it's invalid. */
1592 static
1593 bool
row_geo_field_is_valid(const dtuple_t * row,dict_index_t * index)1594 row_geo_field_is_valid(
1595 const dtuple_t* row,
1596 dict_index_t* index)
1597 {
1598 const dict_field_t* ind_field
1599 = dict_index_get_nth_field(index, 0);
1600 const dict_col_t* col
1601 = ind_field->col;
1602 ulint col_no
1603 = dict_col_get_no(col);
1604 const dfield_t* dfield
1605 = dtuple_get_nth_field(row, col_no);
1606
1607 if (dfield_is_null(dfield)
1608 || dfield_get_len(dfield) < GEO_DATA_HEADER_SIZE) {
1609 return(false);
1610 }
1611
1612 return(true);
1613 }
1614
1615 /** Reads clustered index of the table and create temporary files
1616 containing the index entries for the indexes to be built.
1617 @param[in] trx transaction
1618 @param[in,out] table MySQL table object, for reporting erroneous
1619 records
1620 @param[in] old_table table where rows are read from
1621 @param[in] new_table table where indexes are created; identical to
1622 old_table unless creating a PRIMARY KEY
1623 @param[in] online true if creating indexes online
1624 @param[in] index indexes to be created
1625 @param[in] fts_sort_idx full-text index to be created, or NULL
1626 @param[in] psort_info parallel sort info for fts_sort_idx creation,
1627 or NULL
1628 @param[in] files temporary files
1629 @param[in] key_numbers MySQL key numbers to create
1630 @param[in] n_index number of indexes to create
1631 @param[in] add_cols default values of added columns, or NULL
1632 @param[in] add_v newly added virtual columns along with indexes
1633 @param[in] col_map mapping of old column numbers to new ones, or
1634 NULL if old_table == new_table
1635 @param[in] add_autoinc number of added AUTO_INCREMENT columns, or
1636 ULINT_UNDEFINED if none is added
1637 @param[in,out] sequence autoinc sequence
1638 @param[in,out] block file buffer
1639 @param[in] skip_pk_sort whether the new PRIMARY KEY will follow
1640 existing order
1641 @param[in,out] tmpfd temporary file handle
1642 @param[in,out] stage performance schema accounting object, used by
1643 ALTER TABLE. stage->n_pk_recs_inc() will be called for each record read and
1644 stage->inc() will be called for each page read.
1645 @param[in] eval_table mysql table used to evaluate virtual column
1646 value, see innobase_get_computed_value().
1647 @return DB_SUCCESS or error */
1648 static MY_ATTRIBUTE((warn_unused_result))
1649 dberr_t
row_merge_read_clustered_index(trx_t * trx,struct TABLE * table,const dict_table_t * old_table,const dict_table_t * new_table,bool online,dict_index_t ** index,dict_index_t * fts_sort_idx,fts_psort_t * psort_info,merge_file_t * files,const ulint * key_numbers,ulint n_index,const dtuple_t * add_cols,const dict_add_v_col_t * add_v,const ulint * col_map,ulint add_autoinc,ib_sequence_t & sequence,row_merge_block_t * block,bool skip_pk_sort,int * tmpfd,ut_stage_alter_t * stage,struct TABLE * eval_table)1650 row_merge_read_clustered_index(
1651 trx_t* trx,
1652 struct TABLE* table,
1653 const dict_table_t* old_table,
1654 const dict_table_t* new_table,
1655 bool online,
1656 dict_index_t** index,
1657 dict_index_t* fts_sort_idx,
1658 fts_psort_t* psort_info,
1659 merge_file_t* files,
1660 const ulint* key_numbers,
1661 ulint n_index,
1662 const dtuple_t* add_cols,
1663 const dict_add_v_col_t* add_v,
1664 const ulint* col_map,
1665 ulint add_autoinc,
1666 ib_sequence_t& sequence,
1667 row_merge_block_t* block,
1668 bool skip_pk_sort,
1669 int* tmpfd,
1670 ut_stage_alter_t* stage,
1671 struct TABLE* eval_table)
1672 {
1673 dict_index_t* clust_index; /* Clustered index */
1674 mem_heap_t* row_heap; /* Heap memory to create
1675 clustered index tuples */
1676 row_merge_buf_t** merge_buf; /* Temporary list for records*/
1677 mem_heap_t* v_heap = NULL; /* Heap memory to process large
1678 data for virtual column */
1679 btr_pcur_t pcur; /* Cursor on the clustered
1680 index */
1681 mtr_t mtr; /* Mini transaction */
1682 dberr_t err = DB_SUCCESS;/* Return code */
1683 ulint n_nonnull = 0; /* number of columns
1684 changed to NOT NULL */
1685 ulint* nonnull = NULL; /* NOT NULL columns */
1686 dict_index_t* fts_index = NULL;/* FTS index */
1687 doc_id_t doc_id = 0;
1688 doc_id_t max_doc_id = 0;
1689 ibool add_doc_id = FALSE;
1690 os_event_t fts_parallel_sort_event = NULL;
1691 ibool fts_pll_sort = FALSE;
1692 int64_t sig_count = 0;
1693 index_tuple_info_t** sp_tuples = NULL;
1694 mem_heap_t* sp_heap = NULL;
1695 ulint num_spatial = 0;
1696 BtrBulk* clust_btr_bulk = NULL;
1697 bool clust_temp_file = false;
1698 mem_heap_t* mtuple_heap = NULL;
1699 mtuple_t prev_mtuple;
1700 mem_heap_t* conv_heap = NULL;
1701 FlushObserver* observer = trx->flush_observer;
1702 DBUG_ENTER("row_merge_read_clustered_index");
1703
1704 ut_ad((old_table == new_table) == !col_map);
1705 ut_ad(!add_cols || col_map);
1706
1707 trx->op_info = "reading clustered index";
1708
1709 #ifdef FTS_INTERNAL_DIAG_PRINT
1710 DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
1711 #endif
1712
1713 /* Create and initialize memory for record buffers */
1714
1715 merge_buf = static_cast<row_merge_buf_t**>(
1716 ut_malloc_nokey(n_index * sizeof *merge_buf));
1717
1718 row_merge_dup_t clust_dup = {index[0], table, col_map, 0};
1719 dfield_t* prev_fields;
1720 const ulint n_uniq = dict_index_get_n_unique(index[0]);
1721
1722 ut_ad(trx->mysql_thd != NULL);
1723
1724 const char* path = thd_innodb_tmpdir(trx->mysql_thd);
1725
1726 ut_ad(!skip_pk_sort || dict_index_is_clust(index[0]));
1727 /* There is no previous tuple yet. */
1728 prev_mtuple.fields = NULL;
1729
1730 for (ulint i = 0; i < n_index; i++) {
1731 if (index[i]->type & DICT_FTS) {
1732
1733 /* We are building a FT index, make sure
1734 we have the temporary 'fts_sort_idx' */
1735 ut_a(fts_sort_idx);
1736
1737 fts_index = index[i];
1738
1739 merge_buf[i] = row_merge_buf_create(fts_sort_idx);
1740
1741 add_doc_id = DICT_TF2_FLAG_IS_SET(
1742 new_table, DICT_TF2_FTS_ADD_DOC_ID);
1743
1744 /* If Doc ID does not exist in the table itself,
1745 fetch the first FTS Doc ID */
1746 if (add_doc_id) {
1747 fts_get_next_doc_id(
1748 (dict_table_t*) new_table,
1749 &doc_id);
1750 ut_ad(doc_id > 0);
1751 }
1752
1753 fts_pll_sort = TRUE;
1754 row_fts_start_psort(psort_info);
1755 fts_parallel_sort_event =
1756 psort_info[0].psort_common->sort_event;
1757 } else {
1758 if (dict_index_is_spatial(index[i])) {
1759 num_spatial++;
1760 }
1761
1762 merge_buf[i] = row_merge_buf_create(index[i]);
1763 }
1764 }
1765
1766 if (num_spatial > 0) {
1767 ulint count = 0;
1768
1769 sp_heap = mem_heap_create(512);
1770
1771 sp_tuples = static_cast<index_tuple_info_t**>(
1772 ut_malloc_nokey(num_spatial
1773 * sizeof(*sp_tuples)));
1774
1775 for (ulint i = 0; i < n_index; i++) {
1776 if (dict_index_is_spatial(index[i])) {
1777 sp_tuples[count]
1778 = UT_NEW_NOKEY(
1779 index_tuple_info_t(
1780 sp_heap,
1781 index[i]));
1782 count++;
1783 }
1784 }
1785
1786 ut_ad(count == num_spatial);
1787 }
1788
1789 mtr_start(&mtr);
1790
1791 /* Find the clustered index and create a persistent cursor
1792 based on that. */
1793
1794 clust_index = dict_table_get_first_index(old_table);
1795
1796 btr_pcur_open_at_index_side(
1797 true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
1798
1799 if (old_table != new_table) {
1800 /* The table is being rebuilt. Identify the columns
1801 that were flagged NOT NULL in the new table, so that
1802 we can quickly check that the records in the old table
1803 do not violate the added NOT NULL constraints. */
1804
1805 nonnull = static_cast<ulint*>(
1806 ut_malloc_nokey(dict_table_get_n_cols(new_table)
1807 * sizeof *nonnull));
1808
1809 for (ulint i = 0; i < dict_table_get_n_cols(old_table); i++) {
1810 if (dict_table_get_nth_col(old_table, i)->prtype
1811 & DATA_NOT_NULL) {
1812 continue;
1813 }
1814
1815 const ulint j = col_map[i];
1816
1817 if (j == ULINT_UNDEFINED) {
1818 /* The column was dropped. */
1819 continue;
1820 }
1821
1822 if (dict_table_get_nth_col(new_table, j)->prtype
1823 & DATA_NOT_NULL) {
1824 nonnull[n_nonnull++] = j;
1825 }
1826 }
1827
1828 if (!n_nonnull) {
1829 ut_free(nonnull);
1830 nonnull = NULL;
1831 }
1832 }
1833
1834 row_heap = mem_heap_create(sizeof(mrec_buf_t));
1835
1836 if (dict_table_is_comp(old_table)
1837 && !dict_table_is_comp(new_table)) {
1838 conv_heap = mem_heap_create(sizeof(mrec_buf_t));
1839 }
1840
1841 if (skip_pk_sort) {
1842 prev_fields = static_cast<dfield_t*>(
1843 ut_malloc_nokey(n_uniq * sizeof *prev_fields));
1844 mtuple_heap = mem_heap_create(sizeof(mrec_buf_t));
1845 } else {
1846 prev_fields = NULL;
1847 }
1848
1849 /* Scan the clustered index. */
1850 for (;;) {
1851 const rec_t* rec;
1852 ulint* offsets;
1853 const dtuple_t* row;
1854 row_ext_t* ext;
1855 page_cur_t* cur = btr_pcur_get_page_cur(&pcur);
1856
1857 mem_heap_empty(row_heap);
1858
1859 page_cur_move_to_next(cur);
1860
1861 stage->n_pk_recs_inc();
1862
1863 if (page_cur_is_after_last(cur)) {
1864
1865 stage->inc();
1866
1867 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
1868 err = DB_INTERRUPTED;
1869 trx->error_key_num = 0;
1870 goto func_exit;
1871 }
1872
1873 if (online && old_table != new_table) {
1874 err = row_log_table_get_error(clust_index);
1875 if (err != DB_SUCCESS) {
1876 trx->error_key_num = 0;
1877 goto func_exit;
1878 }
1879 }
1880
1881 #ifdef NDEBUG
1882 # define dbug_run_purge false
1883 #else /* NDEBUG */
1884 bool dbug_run_purge = false;
1885 #endif /* NDEBUG */
1886 DBUG_EXECUTE_IF(
1887 "ib_purge_on_create_index_page_switch",
1888 dbug_run_purge = true;);
1889
1890 /* Insert the cached spatial index rows. */
1891 bool mtr_committed = false;
1892
1893 err = row_merge_spatial_rows(
1894 trx->id, sp_tuples, num_spatial,
1895 row_heap, sp_heap, &pcur,
1896 &mtr, &mtr_committed);
1897
1898 if (err != DB_SUCCESS) {
1899 goto func_exit;
1900 }
1901
1902 if (mtr_committed) {
1903 goto scan_next;
1904 }
1905
1906 if (dbug_run_purge
1907 || rw_lock_get_waiters(
1908 dict_index_get_lock(clust_index))) {
1909 /* There are waiters on the clustered
1910 index tree lock, likely the purge
1911 thread. Store and restore the cursor
1912 position, and yield so that scanning a
1913 large table will not starve other
1914 threads. */
1915
1916 /* Store the cursor position on the last user
1917 record on the page. */
1918 btr_pcur_move_to_prev_on_page(&pcur);
1919 /* Leaf pages must never be empty, unless
1920 this is the only page in the index tree. */
1921 ut_ad(btr_pcur_is_on_user_rec(&pcur)
1922 || btr_pcur_get_block(
1923 &pcur)->page.id.page_no()
1924 == clust_index->page);
1925
1926 btr_pcur_store_position(&pcur, &mtr);
1927 mtr_commit(&mtr);
1928
1929 if (dbug_run_purge) {
1930 /* This is for testing
1931 purposes only (see
1932 DBUG_EXECUTE_IF above). We
1933 signal the purge thread and
1934 hope that the purge batch will
1935 complete before we execute
1936 btr_pcur_restore_position(). */
1937 trx_purge_run();
1938 os_thread_sleep(1000000);
1939 }
1940
1941 /* Give the waiters a chance to proceed. */
1942 os_thread_yield();
1943 scan_next:
1944 mtr_start(&mtr);
1945 /* Restore position on the record, or its
1946 predecessor if the record was purged
1947 meanwhile. */
1948 btr_pcur_restore_position(
1949 BTR_SEARCH_LEAF, &pcur, &mtr);
1950 /* Move to the successor of the
1951 original record. */
1952 if (!btr_pcur_move_to_next_user_rec(
1953 &pcur, &mtr)) {
1954 end_of_index:
1955 row = NULL;
1956 mtr_commit(&mtr);
1957 mem_heap_free(row_heap);
1958 ut_free(nonnull);
1959 goto write_buffers;
1960 }
1961 } else {
1962 ulint next_page_no;
1963 buf_block_t* block;
1964
1965 next_page_no = btr_page_get_next(
1966 page_cur_get_page(cur), &mtr);
1967
1968 if (next_page_no == FIL_NULL) {
1969 goto end_of_index;
1970 }
1971
1972 block = page_cur_get_block(cur);
1973 block = btr_block_get(
1974 page_id_t(block->page.id.space(),
1975 next_page_no),
1976 block->page.size,
1977 BTR_SEARCH_LEAF,
1978 clust_index, &mtr);
1979
1980 btr_leaf_page_release(page_cur_get_block(cur),
1981 BTR_SEARCH_LEAF, &mtr);
1982 page_cur_set_before_first(block, cur);
1983 page_cur_move_to_next(cur);
1984
1985 ut_ad(!page_cur_is_after_last(cur));
1986 }
1987 }
1988
1989 rec = page_cur_get_rec(cur);
1990
1991 offsets = rec_get_offsets(rec, clust_index, NULL,
1992 ULINT_UNDEFINED, &row_heap);
1993
1994 if (online) {
1995 /* Perform a REPEATABLE READ.
1996
1997 When rebuilding the table online,
1998 row_log_table_apply() must not see a newer
1999 state of the table when applying the log.
2000 This is mainly to prevent false duplicate key
2001 errors, because the log will identify records
2002 by the PRIMARY KEY, and also to prevent unsafe
2003 BLOB access.
2004
2005 When creating a secondary index online, this
2006 table scan must not see records that have only
2007 been inserted to the clustered index, but have
2008 not been written to the online_log of
2009 index[]. If we performed READ UNCOMMITTED, it
2010 could happen that the ADD INDEX reaches
2011 ONLINE_INDEX_COMPLETE state between the time
2012 the DML thread has updated the clustered index
2013 but has not yet accessed secondary index. */
2014 ut_ad(MVCC::is_view_active(trx->read_view));
2015
2016 if (!trx->read_view->changes_visible(
2017 row_get_rec_trx_id(
2018 rec, clust_index, offsets),
2019 old_table->name)) {
2020 rec_t* old_vers;
2021
2022 row_vers_build_for_consistent_read(
2023 rec, &mtr, clust_index, &offsets,
2024 trx->read_view, &row_heap,
2025 row_heap, &old_vers, NULL);
2026
2027 rec = old_vers;
2028
2029 if (!rec) {
2030 continue;
2031 }
2032 }
2033
2034 if (rec_get_deleted_flag(
2035 rec,
2036 dict_table_is_comp(old_table))) {
2037 /* This record was deleted in the latest
2038 committed version, or it was deleted and
2039 then reinserted-by-update before purge
2040 kicked in. Skip it. */
2041 continue;
2042 }
2043
2044 ut_ad(!rec_offs_any_null_extern(rec, offsets));
2045 } else if (rec_get_deleted_flag(
2046 rec, dict_table_is_comp(old_table))) {
2047 /* Skip delete-marked records.
2048
2049 Skipping delete-marked records will make the
2050 created indexes unuseable for transactions
2051 whose read views were created before the index
2052 creation completed, but preserving the history
2053 would make it tricky to detect duplicate
2054 keys. */
2055 continue;
2056 }
2057
2058 /* When !online, we are holding a lock on old_table, preventing
2059 any inserts that could have written a record 'stub' before
2060 writing out off-page columns. */
2061 ut_ad(!rec_offs_any_null_extern(rec, offsets));
2062
2063 /* Build a row based on the clustered index. */
2064
2065 row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index,
2066 rec, offsets, new_table,
2067 add_cols, add_v, col_map, &ext,
2068 row_heap);
2069 ut_ad(row);
2070
2071 for (ulint i = 0; i < n_nonnull; i++) {
2072 const dfield_t* field = &row->fields[nonnull[i]];
2073
2074 ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL);
2075
2076 if (dfield_is_null(field)) {
2077 err = DB_INVALID_NULL;
2078 trx->error_key_num = 0;
2079 goto func_exit;
2080 }
2081 }
2082
2083 /* Get the next Doc ID */
2084 if (add_doc_id) {
2085 doc_id++;
2086 } else {
2087 doc_id = 0;
2088 }
2089
2090 if (add_autoinc != ULINT_UNDEFINED) {
2091
2092 ut_ad(add_autoinc
2093 < dict_table_get_n_user_cols(new_table));
2094
2095 const dfield_t* dfield;
2096
2097 dfield = dtuple_get_nth_field(row, add_autoinc);
2098 if (dfield_is_null(dfield)) {
2099 goto write_buffers;
2100 }
2101
2102 const dtype_t* dtype = dfield_get_type(dfield);
2103 byte* b = static_cast<byte*>(dfield_get_data(dfield));
2104
2105 if (sequence.eof()) {
2106 err = DB_ERROR;
2107 trx->error_key_num = 0;
2108
2109 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
2110 ER_AUTOINC_READ_FAILED, "[NULL]");
2111
2112 goto func_exit;
2113 }
2114
2115 ulonglong value = sequence++;
2116
2117 switch (dtype_get_mtype(dtype)) {
2118 case DATA_INT: {
2119 ibool usign;
2120 ulint len = dfield_get_len(dfield);
2121
2122 usign = dtype_get_prtype(dtype) & DATA_UNSIGNED;
2123 mach_write_ulonglong(b, value, len, usign);
2124
2125 break;
2126 }
2127
2128 case DATA_FLOAT:
2129 mach_float_write(
2130 b, static_cast<float>(value));
2131 break;
2132
2133 case DATA_DOUBLE:
2134 mach_double_write(
2135 b, static_cast<double>(value));
2136 break;
2137
2138 default:
2139 ut_ad(0);
2140 }
2141 }
2142
2143 write_buffers:
2144 /* Build all entries for all the indexes to be created
2145 in a single scan of the clustered index. */
2146
2147 ulint s_idx_cnt = 0;
2148 bool skip_sort = skip_pk_sort
2149 && dict_index_is_clust(merge_buf[0]->index);
2150
2151 for (ulint i = 0; i < n_index; i++, skip_sort = false) {
2152 row_merge_buf_t* buf = merge_buf[i];
2153 merge_file_t* file = &files[i];
2154 ulint rows_added = 0;
2155
2156 if (dict_index_is_spatial(buf->index)) {
2157 if (!row) {
2158 continue;
2159 }
2160
2161 ut_ad(sp_tuples[s_idx_cnt]->get_index()
2162 == buf->index);
2163
2164 /* If the geometry field is invalid, report
2165 error. */
2166 if (!row_geo_field_is_valid(row, buf->index)) {
2167 err = DB_CANT_CREATE_GEOMETRY_OBJECT;
2168 break;
2169 }
2170
2171 sp_tuples[s_idx_cnt]->add(row, ext);
2172 s_idx_cnt++;
2173
2174 continue;
2175 }
2176
2177 if (UNIV_LIKELY
2178 (row && (rows_added = row_merge_buf_add(
2179 buf, fts_index, old_table, new_table,
2180 psort_info, row, ext, &doc_id,
2181 conv_heap, &err,
2182 &v_heap, eval_table, trx)))) {
2183
2184 /* If we are creating FTS index,
2185 a single row can generate more
2186 records for tokenized word */
2187 file->n_rec += rows_added;
2188
2189 if (err != DB_SUCCESS) {
2190 ut_ad(err == DB_TOO_BIG_RECORD);
2191 break;
2192 }
2193
2194 if (doc_id > max_doc_id) {
2195 max_doc_id = doc_id;
2196 }
2197
2198 if (buf->index->type & DICT_FTS) {
2199 /* Check if error occurs in child thread */
2200 for (ulint j = 0;
2201 j < fts_sort_pll_degree; j++) {
2202 if (psort_info[j].error
2203 != DB_SUCCESS) {
2204 err = psort_info[j].error;
2205 trx->error_key_num = i;
2206 break;
2207 }
2208 }
2209
2210 if (err != DB_SUCCESS) {
2211 break;
2212 }
2213 }
2214
2215 if (skip_sort) {
2216 ut_ad(buf->n_tuples > 0);
2217 const mtuple_t* curr =
2218 &buf->tuples[buf->n_tuples - 1];
2219
2220 ut_ad(i == 0);
2221 ut_ad(dict_index_is_clust(merge_buf[0]->index));
2222 /* Detect duplicates by comparing the
2223 current record with previous record.
2224 When temp file is not used, records
2225 should be in sorted order. */
2226 if (prev_mtuple.fields != NULL
2227 && (row_mtuple_cmp(
2228 &prev_mtuple, curr,
2229 &clust_dup) == 0)) {
2230
2231 err = DB_DUPLICATE_KEY;
2232 trx->error_key_num
2233 = key_numbers[0];
2234 goto func_exit;
2235 }
2236
2237 prev_mtuple.fields = curr->fields;
2238 }
2239
2240 continue;
2241 }
2242
2243 if (err == DB_COMPUTE_VALUE_FAILED) {
2244 trx->error_key_num = i;
2245 goto func_exit;
2246 }
2247
2248 if (buf->index->type & DICT_FTS) {
2249 if (!row || !doc_id) {
2250 continue;
2251 }
2252 }
2253
2254 /* The buffer must be sufficiently large
2255 to hold at least one record. It may only
2256 be empty when we reach the end of the
2257 clustered index. row_merge_buf_add()
2258 must not have been called in this loop. */
2259 ut_ad(buf->n_tuples || row == NULL);
2260
2261 /* We have enough data tuples to form a block.
2262 Sort them and write to disk if temp file is used
2263 or insert into index if temp file is not used. */
2264 ut_ad(old_table == new_table
2265 ? !dict_index_is_clust(buf->index)
2266 : (i == 0) == dict_index_is_clust(buf->index));
2267
2268 /* We have enough data tuples to form a block.
2269 Sort them (if !skip_sort) and write to disk. */
2270
2271 if (buf->n_tuples) {
2272 if (skip_sort) {
2273 /* Temporary File is not used.
2274 so insert sorted block to the index */
2275 if (row != NULL) {
2276 bool mtr_committed = false;
2277
2278 /* We have to do insert the
2279 cached spatial index rows, since
2280 after the mtr_commit, the cluster
2281 index page could be updated, then
2282 the data in cached rows become
2283 invalid. */
2284 err = row_merge_spatial_rows(
2285 trx->id, sp_tuples,
2286 num_spatial,
2287 row_heap, sp_heap,
2288 &pcur, &mtr,
2289 &mtr_committed);
2290
2291 if (err != DB_SUCCESS) {
2292 goto func_exit;
2293 }
2294
2295 /* We are not at the end of
2296 the scan yet. We must
2297 mtr_commit() in order to be
2298 able to call log_free_check()
2299 in row_merge_insert_index_tuples().
2300 Due to mtr_commit(), the
2301 current row will be invalid, and
2302 we must reread it on the next
2303 loop iteration. */
2304 if (!mtr_committed) {
2305 btr_pcur_move_to_prev_on_page(
2306 &pcur);
2307 btr_pcur_store_position(
2308 &pcur, &mtr);
2309
2310 mtr_commit(&mtr);
2311 }
2312 }
2313
2314 mem_heap_empty(mtuple_heap);
2315 prev_mtuple.fields = prev_fields;
2316
2317 row_mtuple_create(
2318 &buf->tuples[buf->n_tuples - 1],
2319 &prev_mtuple, n_uniq,
2320 mtuple_heap);
2321
2322 if (clust_btr_bulk == NULL) {
2323 clust_btr_bulk = UT_NEW_NOKEY(
2324 BtrBulk(index[i],
2325 trx->id,
2326 observer));
2327
2328 clust_btr_bulk->init();
2329 } else {
2330 clust_btr_bulk->latch();
2331 }
2332
2333 err = row_merge_insert_index_tuples(
2334 trx->id, index[i], old_table,
2335 -1, NULL, buf, clust_btr_bulk);
2336
2337 if (row == NULL) {
2338 err = clust_btr_bulk->finish(
2339 err);
2340 UT_DELETE(clust_btr_bulk);
2341 clust_btr_bulk = NULL;
2342 } else {
2343 /* Release latches for possible
2344 log_free_chck in spatial index
2345 build. */
2346 clust_btr_bulk->release();
2347 }
2348
2349 if (err != DB_SUCCESS) {
2350 break;
2351 }
2352
2353 if (row != NULL) {
2354 /* Restore the cursor on the
2355 previous clustered index record,
2356 and empty the buffer. The next
2357 iteration of the outer loop will
2358 advance the cursor and read the
2359 next record (the one which we
2360 had to ignore due to the buffer
2361 overflow). */
2362 mtr_start(&mtr);
2363 btr_pcur_restore_position(
2364 BTR_SEARCH_LEAF, &pcur,
2365 &mtr);
2366 buf = row_merge_buf_empty(buf);
2367 /* Restart the outer loop on the
2368 record. We did not insert it
2369 into any index yet. */
2370 ut_ad(i == 0);
2371 break;
2372 }
2373 } else if (dict_index_is_unique(buf->index)) {
2374 row_merge_dup_t dup = {
2375 buf->index, table, col_map, 0};
2376
2377 row_merge_buf_sort(buf, &dup);
2378
2379 if (dup.n_dup) {
2380 err = DB_DUPLICATE_KEY;
2381 trx->error_key_num
2382 = key_numbers[i];
2383 break;
2384 }
2385 } else {
2386 row_merge_buf_sort(buf, NULL);
2387 }
2388 } else if (online && new_table == old_table) {
2389 /* Note the newest transaction that
2390 modified this index when the scan was
2391 completed. We prevent older readers
2392 from accessing this index, to ensure
2393 read consistency. */
2394
2395 trx_id_t max_trx_id;
2396
2397 ut_a(row == NULL);
2398 rw_lock_x_lock(
2399 dict_index_get_lock(buf->index));
2400 ut_a(dict_index_get_online_status(buf->index)
2401 == ONLINE_INDEX_CREATION);
2402
2403 max_trx_id = row_log_get_max_trx(buf->index);
2404
2405 if (max_trx_id > buf->index->trx_id) {
2406 buf->index->trx_id = max_trx_id;
2407 }
2408
2409 rw_lock_x_unlock(
2410 dict_index_get_lock(buf->index));
2411 }
2412
2413 /* Secondary index and clustered index which is
2414 not in sorted order can use the temporary file.
2415 Fulltext index should not use the temporary file. */
2416 if (!skip_sort && !(buf->index->type & DICT_FTS)) {
2417 /* In case we can have all rows in sort buffer,
2418 we can insert directly into the index without
2419 temporary file if clustered index does not uses
2420 temporary file. */
2421 if (row == NULL && file->fd == -1
2422 && !clust_temp_file) {
2423 DBUG_EXECUTE_IF(
2424 "row_merge_write_failure",
2425 err = DB_TEMP_FILE_WRITE_FAIL;
2426 trx->error_key_num = i;
2427 goto all_done;);
2428
2429 DBUG_EXECUTE_IF(
2430 "row_merge_tmpfile_fail",
2431 err = DB_OUT_OF_MEMORY;
2432 trx->error_key_num = i;
2433 goto all_done;);
2434
2435 BtrBulk btr_bulk(index[i], trx->id,
2436 observer);
2437 btr_bulk.init();
2438
2439 err = row_merge_insert_index_tuples(
2440 trx->id, index[i], old_table,
2441 -1, NULL, buf, &btr_bulk);
2442
2443 err = btr_bulk.finish(err);
2444
2445 DBUG_EXECUTE_IF(
2446 "row_merge_insert_big_row",
2447 err = DB_TOO_BIG_RECORD;);
2448
2449 if (err != DB_SUCCESS) {
2450 break;
2451 }
2452 } else {
2453 if (row_merge_file_create_if_needed(
2454 file, tmpfd,
2455 buf->n_tuples, path) < 0) {
2456 err = DB_OUT_OF_MEMORY;
2457 trx->error_key_num = i;
2458 goto func_exit;
2459 }
2460
2461 /* Ensure that duplicates in the
2462 clustered index will be detected before
2463 inserting secondary index records. */
2464 if (dict_index_is_clust(buf->index)) {
2465 clust_temp_file = true;
2466 }
2467
2468 ut_ad(file->n_rec > 0);
2469
2470 row_merge_buf_write(buf, file, block);
2471
2472 if (!row_merge_write(
2473 file->fd, file->offset++,
2474 block)) {
2475 err = DB_TEMP_FILE_WRITE_FAIL;
2476 trx->error_key_num = i;
2477 break;
2478 }
2479
2480 UNIV_MEM_INVALID(
2481 &block[0], srv_sort_buf_size);
2482 }
2483 }
2484 merge_buf[i] = row_merge_buf_empty(buf);
2485
2486 if (UNIV_LIKELY(row != NULL)) {
2487 /* Try writing the record again, now
2488 that the buffer has been written out
2489 and emptied. */
2490
2491 if (UNIV_UNLIKELY
2492 (!(rows_added = row_merge_buf_add(
2493 buf, fts_index, old_table,
2494 new_table, psort_info, row, ext,
2495 &doc_id, conv_heap,
2496 &err, &v_heap, table, trx)))) {
2497 /* An empty buffer should have enough
2498 room for at least one record. */
2499 ut_error;
2500 }
2501
2502 if (err != DB_SUCCESS) {
2503 break;
2504 }
2505
2506 file->n_rec += rows_added;
2507 }
2508 }
2509
2510 if (row == NULL) {
2511 goto all_done;
2512 }
2513
2514 if (err != DB_SUCCESS) {
2515 goto func_exit;
2516 }
2517
2518 if (v_heap) {
2519 mem_heap_empty(v_heap);
2520 }
2521 }
2522
2523 func_exit:
2524 /* row_merge_spatial_rows may have committed
2525 the mtr before an error occurs. */
2526 if (mtr.is_active()) {
2527 mtr_commit(&mtr);
2528 }
2529 mem_heap_free(row_heap);
2530 ut_free(nonnull);
2531
2532 all_done:
2533 if (clust_btr_bulk != NULL) {
2534 ut_ad(err != DB_SUCCESS);
2535 clust_btr_bulk->latch();
2536 err = clust_btr_bulk->finish(
2537 err);
2538 UT_DELETE(clust_btr_bulk);
2539 }
2540
2541 if (prev_fields != NULL) {
2542 ut_free(prev_fields);
2543 mem_heap_free(mtuple_heap);
2544 }
2545
2546 if (v_heap) {
2547 mem_heap_free(v_heap);
2548 }
2549
2550 if (conv_heap != NULL) {
2551 mem_heap_free(conv_heap);
2552 }
2553
2554 #ifdef FTS_INTERNAL_DIAG_PRINT
2555 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
2556 #endif
2557 if (fts_pll_sort) {
2558 bool all_exit = false;
2559 ulint trial_count = 0;
2560 const ulint max_trial_count = 10000;
2561
2562 wait_again:
2563 /* Check if error occurs in child thread */
2564 for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2565 if (psort_info[j].error != DB_SUCCESS) {
2566 err = psort_info[j].error;
2567 trx->error_key_num = j;
2568 break;
2569 }
2570 }
2571
2572 /* Tell all children that parent has done scanning */
2573 for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2574 if (err == DB_SUCCESS) {
2575 psort_info[i].state = FTS_PARENT_COMPLETE;
2576 } else {
2577 psort_info[i].state = FTS_PARENT_EXITING;
2578 }
2579 }
2580
2581 /* Now wait all children to report back to be completed */
2582 os_event_wait_time_low(fts_parallel_sort_event,
2583 1000000, sig_count);
2584
2585 for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2586 if (psort_info[i].child_status != FTS_CHILD_COMPLETE
2587 && psort_info[i].child_status != FTS_CHILD_EXITING) {
2588 sig_count = os_event_reset(
2589 fts_parallel_sort_event);
2590 goto wait_again;
2591 }
2592 }
2593
2594 /* Now all children should complete, wait a bit until
2595 they all finish setting the event, before we free everything.
2596 This has a 10 second timeout */
2597 do {
2598 all_exit = true;
2599
2600 for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2601 if (psort_info[j].child_status
2602 != FTS_CHILD_EXITING) {
2603 all_exit = false;
2604 os_thread_sleep(1000);
2605 break;
2606 }
2607 }
2608 trial_count++;
2609 } while (!all_exit && trial_count < max_trial_count);
2610
2611 if (!all_exit) {
2612 ib::fatal() << "Not all child sort threads exited"
2613 " when creating FTS index '"
2614 << fts_sort_idx->name << "'";
2615 }
2616 }
2617
2618 #ifdef FTS_INTERNAL_DIAG_PRINT
2619 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Tokenization\n");
2620 #endif
2621 for (ulint i = 0; i < n_index; i++) {
2622 row_merge_buf_free(merge_buf[i]);
2623 }
2624
2625 row_fts_free_pll_merge_buf(psort_info);
2626
2627 ut_free(merge_buf);
2628
2629 btr_pcur_close(&pcur);
2630
2631 if (sp_tuples != NULL) {
2632 for (ulint i = 0; i < num_spatial; i++) {
2633 UT_DELETE(sp_tuples[i]);
2634 }
2635 ut_free(sp_tuples);
2636
2637 if (sp_heap) {
2638 mem_heap_free(sp_heap);
2639 }
2640 }
2641
2642 /* Update the next Doc ID we used. Table should be locked, so
2643 no concurrent DML */
2644 if (max_doc_id && err == DB_SUCCESS) {
2645 /* Sync fts cache for other fts indexes to keep all
2646 fts indexes consistent in sync_doc_id. */
2647 err = fts_sync_table(const_cast<dict_table_t*>(new_table),
2648 false, true, false);
2649
2650 if (err == DB_SUCCESS) {
2651 fts_update_next_doc_id(
2652 0, new_table,
2653 old_table->name.m_name, max_doc_id);
2654 }
2655 }
2656
2657 trx->op_info = "";
2658
2659 DBUG_RETURN(err);
2660 }
2661
2662 /** Write a record via buffer 2 and read the next record to buffer N.
2663 @param N number of the buffer (0 or 1)
2664 @param INDEX record descriptor
2665 @param AT_END statement to execute at end of input */
2666 #define ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END) \
2667 do { \
2668 b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], \
2669 &buf[2], b2, \
2670 of->fd, &of->offset, \
2671 mrec##N, offsets##N); \
2672 if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) { \
2673 goto corrupt; \
2674 } \
2675 b##N = row_merge_read_rec(&block[N * srv_sort_buf_size],\
2676 &buf[N], b##N, INDEX, \
2677 file->fd, foffs##N, \
2678 &mrec##N, offsets##N); \
2679 if (UNIV_UNLIKELY(!b##N)) { \
2680 if (mrec##N) { \
2681 goto corrupt; \
2682 } \
2683 AT_END; \
2684 } \
2685 } while (0)
2686
2687 #ifdef HAVE_PSI_STAGE_INTERFACE
2688 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
2689 do { \
2690 if (stage != NULL) { \
2691 stage->inc(); \
2692 } \
2693 ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END); \
2694 } while (0)
2695 #else /* HAVE_PSI_STAGE_INTERFACE */
2696 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
2697 ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END)
2698 #endif /* HAVE_PSI_STAGE_INTERFACE */
2699
2700 /** Merge two blocks of records on disk and write a bigger block.
2701 @param[in] dup descriptor of index being created
2702 @param[in] file file containing index entries
2703 @param[in,out] block 3 buffers
2704 @param[in,out] foffs0 offset of first source list in the file
2705 @param[in,out] foffs1 offset of second source list in the file
2706 @param[in,out] of output file
2707 @param[in,out] stage performance schema accounting object, used by
2708 ALTER TABLE. If not NULL stage->inc() will be called for each record
2709 processed.
2710 @return DB_SUCCESS or error code */
2711 static MY_ATTRIBUTE((warn_unused_result))
2712 dberr_t
row_merge_blocks(const row_merge_dup_t * dup,const merge_file_t * file,row_merge_block_t * block,ulint * foffs0,ulint * foffs1,merge_file_t * of,ut_stage_alter_t * stage)2713 row_merge_blocks(
2714 const row_merge_dup_t* dup,
2715 const merge_file_t* file,
2716 row_merge_block_t* block,
2717 ulint* foffs0,
2718 ulint* foffs1,
2719 merge_file_t* of,
2720 ut_stage_alter_t* stage)
2721 {
2722 mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */
2723
2724 mrec_buf_t* buf; /*!< buffer for handling
2725 split mrec in block[] */
2726 const byte* b0; /*!< pointer to block[0] */
2727 const byte* b1; /*!< pointer to block[srv_sort_buf_size] */
2728 byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */
2729 const mrec_t* mrec0; /*!< merge rec, points to block[0] or buf[0] */
2730 const mrec_t* mrec1; /*!< merge rec, points to
2731 block[srv_sort_buf_size] or buf[1] */
2732 ulint* offsets0;/* offsets of mrec0 */
2733 ulint* offsets1;/* offsets of mrec1 */
2734
2735 DBUG_ENTER("row_merge_blocks");
2736 DBUG_PRINT("ib_merge_sort",
2737 ("fd=%d,%lu+%lu to fd=%d,%lu",
2738 file->fd, ulong(*foffs0), ulong(*foffs1),
2739 of->fd, ulong(of->offset)));
2740
2741 heap = row_merge_heap_create(dup->index, &buf, &offsets0, &offsets1);
2742
2743 /* Write a record and read the next record. Split the output
2744 file in two halves, which can be merged on the following pass. */
2745
2746 if (!row_merge_read(file->fd, *foffs0, &block[0])
2747 || !row_merge_read(file->fd, *foffs1, &block[srv_sort_buf_size])) {
2748 corrupt:
2749 mem_heap_free(heap);
2750 DBUG_RETURN(DB_CORRUPTION);
2751 }
2752
2753 b0 = &block[0];
2754 b1 = &block[srv_sort_buf_size];
2755 b2 = &block[2 * srv_sort_buf_size];
2756
2757 b0 = row_merge_read_rec(
2758 &block[0], &buf[0], b0, dup->index,
2759 file->fd, foffs0, &mrec0, offsets0);
2760 b1 = row_merge_read_rec(
2761 &block[srv_sort_buf_size],
2762 &buf[srv_sort_buf_size], b1, dup->index,
2763 file->fd, foffs1, &mrec1, offsets1);
2764 if (UNIV_UNLIKELY(!b0 && mrec0)
2765 || UNIV_UNLIKELY(!b1 && mrec1)) {
2766
2767 goto corrupt;
2768 }
2769
2770 while (mrec0 && mrec1) {
2771 int cmp = cmp_rec_rec_simple(
2772 mrec0, mrec1, offsets0, offsets1,
2773 dup->index, dup->table);
2774 if (cmp < 0) {
2775 ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto merged);
2776 } else if (cmp) {
2777 ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto merged);
2778 } else {
2779 mem_heap_free(heap);
2780 DBUG_RETURN(DB_DUPLICATE_KEY);
2781 }
2782 }
2783
2784 merged:
2785 if (mrec0) {
2786 /* append all mrec0 to output */
2787 for (;;) {
2788 ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto done0);
2789 }
2790 }
2791 done0:
2792 if (mrec1) {
2793 /* append all mrec1 to output */
2794 for (;;) {
2795 ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto done1);
2796 }
2797 }
2798 done1:
2799
2800 mem_heap_free(heap);
2801 b2 = row_merge_write_eof(&block[2 * srv_sort_buf_size],
2802 b2, of->fd, &of->offset);
2803 DBUG_RETURN(b2 ? DB_SUCCESS : DB_CORRUPTION);
2804 }
2805
2806 /** Copy a block of index entries.
2807 @param[in] index index being created
2808 @param[in] file input file
2809 @param[in,out] block 3 buffers
2810 @param[in,out] foffs0 input file offset
2811 @param[in,out] of output file
2812 @param[in,out] stage performance schema accounting object, used by
2813 ALTER TABLE. If not NULL stage->inc() will be called for each record
2814 processed.
2815 @return TRUE on success, FALSE on failure */
2816 static MY_ATTRIBUTE((warn_unused_result))
2817 ibool
row_merge_blocks_copy(const dict_index_t * index,const merge_file_t * file,row_merge_block_t * block,ulint * foffs0,merge_file_t * of,ut_stage_alter_t * stage)2818 row_merge_blocks_copy(
2819 const dict_index_t* index,
2820 const merge_file_t* file,
2821 row_merge_block_t* block,
2822 ulint* foffs0,
2823 merge_file_t* of,
2824 ut_stage_alter_t* stage)
2825 {
2826 mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */
2827
2828 mrec_buf_t* buf; /*!< buffer for handling
2829 split mrec in block[] */
2830 const byte* b0; /*!< pointer to block[0] */
2831 byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */
2832 const mrec_t* mrec0; /*!< merge rec, points to block[0] */
2833 ulint* offsets0;/* offsets of mrec0 */
2834 ulint* offsets1;/* dummy offsets */
2835
2836 DBUG_ENTER("row_merge_blocks_copy");
2837 DBUG_PRINT("ib_merge_sort",
2838 ("fd=%d," ULINTPF " to fd=%d," ULINTPF,
2839 file->fd, *foffs0,
2840 of->fd, of->offset));
2841
2842 heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
2843
2844 /* Write a record and read the next record. Split the output
2845 file in two halves, which can be merged on the following pass. */
2846
2847 if (!row_merge_read(file->fd, *foffs0, &block[0])) {
2848 corrupt:
2849 mem_heap_free(heap);
2850 DBUG_RETURN(FALSE);
2851 }
2852
2853 b0 = &block[0];
2854
2855 b2 = &block[2 * srv_sort_buf_size];
2856
2857 b0 = row_merge_read_rec(&block[0], &buf[0], b0, index,
2858 file->fd, foffs0, &mrec0, offsets0);
2859 if (UNIV_UNLIKELY(!b0 && mrec0)) {
2860
2861 goto corrupt;
2862 }
2863
2864 if (mrec0) {
2865 /* append all mrec0 to output */
2866 for (;;) {
2867 ROW_MERGE_WRITE_GET_NEXT(0, index, goto done0);
2868 }
2869 }
2870 done0:
2871
2872 /* The file offset points to the beginning of the last page
2873 that has been read. Update it to point to the next block. */
2874 (*foffs0)++;
2875
2876 mem_heap_free(heap);
2877 DBUG_RETURN(row_merge_write_eof(&block[2 * srv_sort_buf_size],
2878 b2, of->fd, &of->offset)
2879 != NULL);
2880 }
2881
2882 /** Merge disk files.
2883 @param[in] trx transaction
2884 @param[in] dup descriptor of index being created
2885 @param[in,out] file file containing index entries
2886 @param[in,out] block 3 buffers
2887 @param[in,out] tmpfd temporary file handle
2888 @param[in,out] num_run Number of runs that remain to be merged
2889 @param[in,out] run_offset Array that contains the first offset number
2890 for each merge run
2891 @param[in,out] stage performance schema accounting object, used by
2892 ALTER TABLE. If not NULL stage->inc() will be called for each record
2893 processed.
2894 @return DB_SUCCESS or error code */
2895 static
2896 dberr_t
row_merge(trx_t * trx,const row_merge_dup_t * dup,merge_file_t * file,row_merge_block_t * block,int * tmpfd,ulint * num_run,ulint * run_offset,ut_stage_alter_t * stage)2897 row_merge(
2898 trx_t* trx,
2899 const row_merge_dup_t* dup,
2900 merge_file_t* file,
2901 row_merge_block_t* block,
2902 int* tmpfd,
2903 ulint* num_run,
2904 ulint* run_offset,
2905 ut_stage_alter_t* stage)
2906 {
2907 ulint foffs0; /*!< first input offset */
2908 ulint foffs1; /*!< second input offset */
2909 dberr_t error; /*!< error code */
2910 merge_file_t of; /*!< output file */
2911 const ulint ihalf = run_offset[*num_run / 2];
2912 /*!< half the input file */
2913 ulint n_run = 0;
2914 /*!< num of runs generated from this merge */
2915
2916 UNIV_MEM_ASSERT_W(&block[0], 3 * srv_sort_buf_size);
2917
2918 ut_ad(ihalf < file->offset);
2919
2920 of.fd = *tmpfd;
2921 of.offset = 0;
2922 of.n_rec = 0;
2923
2924 #ifdef POSIX_FADV_SEQUENTIAL
2925 /* The input file will be read sequentially, starting from the
2926 beginning and the middle. In Linux, the POSIX_FADV_SEQUENTIAL
2927 affects the entire file. Each block will be read exactly once. */
2928 posix_fadvise(file->fd, 0, 0,
2929 POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
2930 #endif /* POSIX_FADV_SEQUENTIAL */
2931
2932 /* Merge blocks to the output file. */
2933 foffs0 = 0;
2934 foffs1 = ihalf;
2935
2936 UNIV_MEM_INVALID(run_offset, *num_run * sizeof *run_offset);
2937
2938 for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
2939
2940 if (trx_is_interrupted(trx)) {
2941 return(DB_INTERRUPTED);
2942 }
2943
2944 /* Remember the offset number for this run */
2945 run_offset[n_run++] = of.offset;
2946
2947 error = row_merge_blocks(dup, file, block,
2948 &foffs0, &foffs1, &of, stage);
2949
2950 if (error != DB_SUCCESS) {
2951 return(error);
2952 }
2953
2954 }
2955
2956 /* Copy the last blocks, if there are any. */
2957
2958 while (foffs0 < ihalf) {
2959 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
2960 return(DB_INTERRUPTED);
2961 }
2962
2963 /* Remember the offset number for this run */
2964 run_offset[n_run++] = of.offset;
2965
2966 if (!row_merge_blocks_copy(dup->index, file, block,
2967 &foffs0, &of, stage)) {
2968 return(DB_CORRUPTION);
2969 }
2970 }
2971
2972 ut_ad(foffs0 == ihalf);
2973
2974 while (foffs1 < file->offset) {
2975 if (trx_is_interrupted(trx)) {
2976 return(DB_INTERRUPTED);
2977 }
2978
2979 /* Remember the offset number for this run */
2980 run_offset[n_run++] = of.offset;
2981
2982 if (!row_merge_blocks_copy(dup->index, file, block,
2983 &foffs1, &of, stage)) {
2984 return(DB_CORRUPTION);
2985 }
2986 }
2987
2988 ut_ad(foffs1 == file->offset);
2989
2990 if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
2991 return(DB_CORRUPTION);
2992 }
2993
2994 ut_ad(n_run <= *num_run);
2995
2996 *num_run = n_run;
2997
2998 /* Each run can contain one or more offsets. As merge goes on,
2999 the number of runs (to merge) will reduce until we have one
3000 single run. So the number of runs will always be smaller than
3001 the number of offsets in file */
3002 ut_ad((*num_run) <= file->offset);
3003
3004 /* The number of offsets in output file is always equal or
3005 smaller than input file */
3006 ut_ad(of.offset <= file->offset);
3007
3008 /* Swap file descriptors for the next pass. */
3009 *tmpfd = file->fd;
3010 *file = of;
3011
3012 UNIV_MEM_INVALID(&block[0], 3 * srv_sort_buf_size);
3013
3014 return(DB_SUCCESS);
3015 }
3016
3017 /** Merge disk files.
3018 @param[in] trx transaction
3019 @param[in] dup descriptor of index being created
3020 @param[in,out] file file containing index entries
3021 @param[in,out] block 3 buffers
3022 @param[in,out] tmpfd temporary file handle
3023 @param[in,out] stage performance schema accounting object, used by
3024 ALTER TABLE. If not NULL, stage->begin_phase_sort() will be called initially
3025 and then stage->inc() will be called for each record processed.
3026 @return DB_SUCCESS or error code */
3027 dberr_t
row_merge_sort(trx_t * trx,const row_merge_dup_t * dup,merge_file_t * file,row_merge_block_t * block,int * tmpfd,ut_stage_alter_t * stage)3028 row_merge_sort(
3029 trx_t* trx,
3030 const row_merge_dup_t* dup,
3031 merge_file_t* file,
3032 row_merge_block_t* block,
3033 int* tmpfd,
3034 ut_stage_alter_t* stage /* = NULL */)
3035 {
3036 const ulint half = file->offset / 2;
3037 ulint num_runs;
3038 ulint* run_offset;
3039 dberr_t error = DB_SUCCESS;
3040 DBUG_ENTER("row_merge_sort");
3041
3042 /* Record the number of merge runs we need to perform */
3043 num_runs = file->offset;
3044
3045 if (stage != NULL) {
3046 stage->begin_phase_sort(log2(num_runs));
3047 }
3048
3049 /* If num_runs are less than 1, nothing to merge */
3050 if (num_runs <= 1) {
3051 DBUG_RETURN(error);
3052 }
3053
3054 /* "run_offset" records each run's first offset number */
3055 run_offset = (ulint*) ut_malloc_nokey(file->offset * sizeof(ulint));
3056
3057 /* This tells row_merge() where to start for the first round
3058 of merge. */
3059 run_offset[half] = half;
3060
3061 /* The file should always contain at least one byte (the end
3062 of file marker). Thus, it must be at least one block. */
3063 ut_ad(file->offset > 0);
3064
3065 /* Merge the runs until we have one big run */
3066 do {
3067 error = row_merge(trx, dup, file, block, tmpfd,
3068 &num_runs, run_offset, stage);
3069
3070 if (error != DB_SUCCESS) {
3071 break;
3072 }
3073
3074 UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
3075 } while (num_runs > 1);
3076
3077 ut_free(run_offset);
3078
3079 DBUG_RETURN(error);
3080 }
3081
3082 /** Copy externally stored columns to the data tuple.
3083 @param[in] mrec record containing BLOB pointers,
3084 or NULL to use tuple instead
3085 @param[in] offsets offsets of mrec
3086 @param[in] zip_size compressed page size in bytes, or 0
3087 @param[in,out] tuple data tuple
3088 @param[in,out] heap memory heap */
3089 static
3090 void
row_merge_copy_blobs(const mrec_t * mrec,const ulint * offsets,const page_size_t & page_size,dtuple_t * tuple,mem_heap_t * heap)3091 row_merge_copy_blobs(
3092 const mrec_t* mrec,
3093 const ulint* offsets,
3094 const page_size_t& page_size,
3095 dtuple_t* tuple,
3096 mem_heap_t* heap)
3097 {
3098 ut_ad(mrec == NULL || rec_offs_any_extern(offsets));
3099
3100 for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
3101 ulint len;
3102 const void* data;
3103 dfield_t* field = dtuple_get_nth_field(tuple, i);
3104 ulint field_len;
3105 const byte* field_data;
3106
3107 if (!dfield_is_ext(field)) {
3108 continue;
3109 }
3110
3111 ut_ad(!dfield_is_null(field));
3112
3113 /* During the creation of a PRIMARY KEY, the table is
3114 X-locked, and we skip copying records that have been
3115 marked for deletion. Therefore, externally stored
3116 columns cannot possibly be freed between the time the
3117 BLOB pointers are read (row_merge_read_clustered_index())
3118 and dereferenced (below). */
3119 if (mrec == NULL) {
3120 field_data
3121 = static_cast<byte*>(dfield_get_data(field));
3122 field_len = dfield_get_len(field);
3123
3124 ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
3125
3126 ut_a(memcmp(field_data + field_len
3127 - BTR_EXTERN_FIELD_REF_SIZE,
3128 field_ref_zero,
3129 BTR_EXTERN_FIELD_REF_SIZE));
3130
3131 data = btr_copy_externally_stored_field(
3132 &len, field_data, page_size, field_len, heap);
3133 } else {
3134 data = btr_rec_copy_externally_stored_field(
3135 mrec, offsets, page_size, i, &len, heap);
3136 }
3137
3138 /* Because we have locked the table, any records
3139 written by incomplete transactions must have been
3140 rolled back already. There must not be any incomplete
3141 BLOB columns. */
3142 ut_a(data);
3143
3144 dfield_set_data(field, data, len);
3145 }
3146 }
3147
3148 /** Convert a merge record to a typed data tuple. Note that externally
3149 stored fields are not copied to heap.
3150 @param[in,out] index index on the table
3151 @param[in] mtuple merge record
3152 @param[in] heap memory heap from which memory needed is allocated
3153 @return index entry built. */
3154 static
3155 void
row_merge_mtuple_to_dtuple(dict_index_t * index,dtuple_t * dtuple,const mtuple_t * mtuple)3156 row_merge_mtuple_to_dtuple(
3157 dict_index_t* index,
3158 dtuple_t* dtuple,
3159 const mtuple_t* mtuple)
3160 {
3161 ut_ad(!dict_index_is_ibuf(index));
3162
3163 memcpy(dtuple->fields, mtuple->fields,
3164 dtuple->n_fields * sizeof *mtuple->fields);
3165 }
3166
3167 /** Insert sorted data tuples to the index.
3168 @param[in] trx_id transaction identifier
3169 @param[in] index index to be inserted
3170 @param[in] old_table old table
3171 @param[in] fd file descriptor
3172 @param[in,out] block file buffer
3173 @param[in] row_buf row_buf the sorted data tuples,
3174 or NULL if fd, block will be used instead
3175 @param[in,out] btr_bulk btr bulk instance
3176 @param[in,out] stage performance schema accounting object, used by
3177 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
3178 and then stage->inc() will be called for each record that is processed.
3179 @return DB_SUCCESS or error number */
3180 static MY_ATTRIBUTE((warn_unused_result))
3181 dberr_t
row_merge_insert_index_tuples(trx_id_t trx_id,dict_index_t * index,const dict_table_t * old_table,int fd,row_merge_block_t * block,const row_merge_buf_t * row_buf,BtrBulk * btr_bulk,ut_stage_alter_t * stage)3182 row_merge_insert_index_tuples(
3183 trx_id_t trx_id,
3184 dict_index_t* index,
3185 const dict_table_t* old_table,
3186 int fd,
3187 row_merge_block_t* block,
3188 const row_merge_buf_t* row_buf,
3189 BtrBulk* btr_bulk,
3190 ut_stage_alter_t* stage /* = NULL */)
3191 {
3192 const byte* b;
3193 mem_heap_t* heap;
3194 mem_heap_t* tuple_heap;
3195 dberr_t error = DB_SUCCESS;
3196 ulint foffs = 0;
3197 ulint* offsets;
3198 mrec_buf_t* buf;
3199 ulint n_rows = 0;
3200 dtuple_t* dtuple;
3201 DBUG_ENTER("row_merge_insert_index_tuples");
3202
3203 ut_ad(!srv_read_only_mode);
3204 ut_ad(!(index->type & DICT_FTS));
3205 ut_ad(!dict_index_is_spatial(index));
3206 ut_ad(trx_id);
3207
3208 if (stage != NULL) {
3209 stage->begin_phase_insert();
3210 }
3211
3212 tuple_heap = mem_heap_create(1000);
3213
3214 {
3215 ulint i = 1 + REC_OFFS_HEADER_SIZE
3216 + dict_index_get_n_fields(index);
3217 heap = mem_heap_create(sizeof *buf + i * sizeof *offsets);
3218 offsets = static_cast<ulint*>(
3219 mem_heap_alloc(heap, i * sizeof *offsets));
3220 offsets[0] = i;
3221 offsets[1] = dict_index_get_n_fields(index);
3222 }
3223
3224 if (row_buf != NULL) {
3225 ut_ad(fd == -1);
3226 ut_ad(block == NULL);
3227 DBUG_EXECUTE_IF("row_merge_read_failure",
3228 error = DB_CORRUPTION;
3229 goto err_exit;);
3230 buf = NULL;
3231 b = NULL;
3232 dtuple = dtuple_create(
3233 heap, dict_index_get_n_fields(index));
3234 dtuple_set_n_fields_cmp(
3235 dtuple, dict_index_get_n_unique_in_tree(index));
3236 } else {
3237 b = block;
3238 dtuple = NULL;
3239
3240 if (!row_merge_read(fd, foffs, block)) {
3241 error = DB_CORRUPTION;
3242 goto err_exit;
3243 } else {
3244 buf = static_cast<mrec_buf_t*>(
3245 mem_heap_alloc(heap, sizeof *buf));
3246 }
3247 }
3248
3249
3250 for (;;) {
3251 const mrec_t* mrec;
3252 ulint n_ext;
3253 mtr_t mtr;
3254
3255 if (stage != NULL) {
3256 stage->inc();
3257 }
3258
3259 if (row_buf != NULL) {
3260 if (n_rows >= row_buf->n_tuples) {
3261 break;
3262 }
3263
3264 /* Convert merge tuple record from
3265 row buffer to data tuple record */
3266 row_merge_mtuple_to_dtuple(
3267 index, dtuple, &row_buf->tuples[n_rows]);
3268
3269 n_ext = dtuple_get_n_ext(dtuple);
3270 n_rows++;
3271 /* BLOB pointers must be copied from dtuple */
3272 mrec = NULL;
3273 } else {
3274 b = row_merge_read_rec(block, buf, b, index,
3275 fd, &foffs, &mrec, offsets);
3276 if (UNIV_UNLIKELY(!b)) {
3277 /* End of list, or I/O error */
3278 if (mrec) {
3279 error = DB_CORRUPTION;
3280 }
3281 break;
3282 }
3283
3284 dtuple = row_rec_to_index_entry_low(
3285 mrec, index, offsets, &n_ext, tuple_heap);
3286 }
3287
3288 dict_index_t* old_index
3289 = dict_table_get_first_index(old_table);
3290
3291 if (dict_index_is_clust(index)
3292 && dict_index_is_online_ddl(old_index)) {
3293 error = row_log_table_get_error(old_index);
3294 if (error != DB_SUCCESS) {
3295 break;
3296 }
3297 }
3298
3299 if (!n_ext) {
3300 /* There are no externally stored columns. */
3301 } else {
3302 ut_ad(dict_index_is_clust(index));
3303 /* Off-page columns can be fetched safely
3304 when concurrent modifications to the table
3305 are disabled. (Purge can process delete-marked
3306 records, but row_merge_read_clustered_index()
3307 would have skipped them.)
3308
3309 When concurrent modifications are enabled,
3310 row_merge_read_clustered_index() will
3311 only see rows from transactions that were
3312 committed before the ALTER TABLE started
3313 (REPEATABLE READ).
3314
3315 Any modifications after the
3316 row_merge_read_clustered_index() scan
3317 will go through row_log_table_apply().
3318 Any modifications to off-page columns
3319 will be tracked by
3320 row_log_table_blob_alloc() and
3321 row_log_table_blob_free(). */
3322 row_merge_copy_blobs(
3323 mrec, offsets,
3324 dict_table_page_size(old_table),
3325 dtuple, tuple_heap);
3326 }
3327
3328 ut_ad(dtuple_validate(dtuple));
3329
3330 error = btr_bulk->insert(dtuple);
3331
3332 if (error != DB_SUCCESS) {
3333 goto err_exit;
3334 }
3335
3336 mem_heap_empty(tuple_heap);
3337 }
3338
3339 err_exit:
3340 mem_heap_free(tuple_heap);
3341 mem_heap_free(heap);
3342
3343 DBUG_RETURN(error);
3344 }
3345
3346 /*********************************************************************//**
3347 Sets an exclusive lock on a table, for the duration of creating indexes.
3348 @return error code or DB_SUCCESS */
3349 dberr_t
row_merge_lock_table(trx_t * trx,dict_table_t * table,enum lock_mode mode)3350 row_merge_lock_table(
3351 /*=================*/
3352 trx_t* trx, /*!< in/out: transaction */
3353 dict_table_t* table, /*!< in: table to lock */
3354 enum lock_mode mode) /*!< in: LOCK_X or LOCK_S */
3355 {
3356 ut_ad(!srv_read_only_mode);
3357 ut_ad(mode == LOCK_X || mode == LOCK_S);
3358
3359 trx->op_info = "setting table lock for creating or dropping index";
3360 trx->ddl = true;
3361 /* Trx for DDL should not be forced to rollback for now */
3362 trx->in_innodb |= TRX_FORCE_ROLLBACK_DISABLE;
3363
3364 return(lock_table_for_trx(table, trx, mode));
3365 }
3366
3367 /*********************************************************************//**
3368 Drop an index that was created before an error occurred.
3369 The data dictionary must have been locked exclusively by the caller,
3370 because the transaction will not be committed. */
3371 static
3372 void
row_merge_drop_index_dict(trx_t * trx,index_id_t index_id)3373 row_merge_drop_index_dict(
3374 /*======================*/
3375 trx_t* trx, /*!< in/out: dictionary transaction */
3376 index_id_t index_id)/*!< in: index identifier */
3377 {
3378 static const char sql[] =
3379 "PROCEDURE DROP_INDEX_PROC () IS\n"
3380 "BEGIN\n"
3381 "DELETE FROM SYS_FIELDS WHERE INDEX_ID=:indexid;\n"
3382 "DELETE FROM SYS_INDEXES WHERE ID=:indexid;\n"
3383 "END;\n";
3384 dberr_t error;
3385 pars_info_t* info;
3386
3387 ut_ad(!srv_read_only_mode);
3388 ut_ad(mutex_own(&dict_sys->mutex));
3389 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3390 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3391 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3392
3393 info = pars_info_create();
3394 pars_info_add_ull_literal(info, "indexid", index_id);
3395 trx->op_info = "dropping index from dictionary";
3396 error = que_eval_sql(info, sql, FALSE, trx);
3397
3398 if (error != DB_SUCCESS) {
3399 /* Even though we ensure that DDL transactions are WAIT
3400 and DEADLOCK free, we could encounter other errors e.g.,
3401 DB_TOO_MANY_CONCURRENT_TRXS. */
3402 trx->error_state = DB_SUCCESS;
3403
3404 ib::error() << "row_merge_drop_index_dict failed with error "
3405 << error;
3406 }
3407
3408 trx->op_info = "";
3409 }
3410
3411 /*********************************************************************//**
3412 Drop indexes that were created before an error occurred.
3413 The data dictionary must have been locked exclusively by the caller,
3414 because the transaction will not be committed. */
3415 void
row_merge_drop_indexes_dict(trx_t * trx,table_id_t table_id)3416 row_merge_drop_indexes_dict(
3417 /*========================*/
3418 trx_t* trx, /*!< in/out: dictionary transaction */
3419 table_id_t table_id)/*!< in: table identifier */
3420 {
3421 static const char sql[] =
3422 "PROCEDURE DROP_INDEXES_PROC () IS\n"
3423 "ixid CHAR;\n"
3424 "found INT;\n"
3425
3426 "DECLARE CURSOR index_cur IS\n"
3427 " SELECT ID FROM SYS_INDEXES\n"
3428 " WHERE TABLE_ID=:tableid AND\n"
3429 " SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
3430 "FOR UPDATE;\n"
3431
3432 "BEGIN\n"
3433 "found := 1;\n"
3434 "OPEN index_cur;\n"
3435 "WHILE found = 1 LOOP\n"
3436 " FETCH index_cur INTO ixid;\n"
3437 " IF (SQL % NOTFOUND) THEN\n"
3438 " found := 0;\n"
3439 " ELSE\n"
3440 " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
3441 " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
3442 " END IF;\n"
3443 "END LOOP;\n"
3444 "CLOSE index_cur;\n"
3445
3446 "END;\n";
3447 dberr_t error;
3448 pars_info_t* info;
3449
3450 ut_ad(!srv_read_only_mode);
3451 ut_ad(mutex_own(&dict_sys->mutex));
3452 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3453 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3454 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3455
3456 /* It is possible that table->n_ref_count > 1 when
3457 locked=TRUE. In this case, all code that should have an open
3458 handle to the table be waiting for the next statement to execute,
3459 or waiting for a meta-data lock.
3460
3461 A concurrent purge will be prevented by dict_operation_lock. */
3462
3463 info = pars_info_create();
3464 pars_info_add_ull_literal(info, "tableid", table_id);
3465 trx->op_info = "dropping indexes";
3466 error = que_eval_sql(info, sql, FALSE, trx);
3467
3468 switch (error) {
3469 case DB_SUCCESS:
3470 break;
3471 default:
3472 /* Even though we ensure that DDL transactions are WAIT
3473 and DEADLOCK free, we could encounter other errors e.g.,
3474 DB_TOO_MANY_CONCURRENT_TRXS. */
3475 ib::error() << "row_merge_drop_indexes_dict failed with error "
3476 << error;
3477 /* fall through */
3478 case DB_TOO_MANY_CONCURRENT_TRXS:
3479 trx->error_state = DB_SUCCESS;
3480 }
3481
3482 trx->op_info = "";
3483 }
3484
3485 /*********************************************************************//**
3486 Drop indexes that were created before an error occurred.
3487 The data dictionary must have been locked exclusively by the caller,
3488 because the transaction will not be committed. */
3489 void
row_merge_drop_indexes(trx_t * trx,dict_table_t * table,ibool locked)3490 row_merge_drop_indexes(
3491 /*===================*/
3492 trx_t* trx, /*!< in/out: dictionary transaction */
3493 dict_table_t* table, /*!< in/out: table containing the indexes */
3494 ibool locked) /*!< in: TRUE=table locked,
3495 FALSE=may need to do a lazy drop */
3496 {
3497 dict_index_t* index;
3498 dict_index_t* next_index;
3499
3500 ut_ad(!srv_read_only_mode);
3501 ut_ad(mutex_own(&dict_sys->mutex));
3502 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3503 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3504 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3505
3506 index = dict_table_get_first_index(table);
3507 ut_ad(dict_index_is_clust(index));
3508 ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE);
3509
3510 /* the caller should have an open handle to the table */
3511 ut_ad(table->get_ref_count() >= 1);
3512
3513 /* It is possible that table->n_ref_count > 1 when
3514 locked=TRUE. In this case, all code that should have an open
3515 handle to the table be waiting for the next statement to execute,
3516 or waiting for a meta-data lock.
3517
3518 A concurrent purge will be prevented by dict_operation_lock. */
3519
3520 if (!locked && table->get_ref_count() > 1) {
3521 /* We will have to drop the indexes later, when the
3522 table is guaranteed to be no longer in use. Mark the
3523 indexes as incomplete and corrupted, so that other
3524 threads will stop using them. Let dict_table_close()
3525 or crash recovery or the next invocation of
3526 prepare_inplace_alter_table() take care of dropping
3527 the indexes. */
3528
3529 while ((index = dict_table_get_next_index(index)) != NULL) {
3530 ut_ad(!dict_index_is_clust(index));
3531
3532 switch (dict_index_get_online_status(index)) {
3533 case ONLINE_INDEX_ABORTED_DROPPED:
3534 continue;
3535 case ONLINE_INDEX_COMPLETE:
3536 if (index->is_committed()) {
3537 /* Do nothing to already
3538 published indexes. */
3539 } else if (index->type & DICT_FTS) {
3540 /* Drop a completed FULLTEXT
3541 index, due to a timeout during
3542 MDL upgrade for
3543 commit_inplace_alter_table().
3544 Because only concurrent reads
3545 are allowed (and they are not
3546 seeing this index yet) we
3547 are safe to drop the index. */
3548 dict_index_t* prev = UT_LIST_GET_PREV(
3549 indexes, index);
3550 /* At least there should be
3551 the clustered index before
3552 this one. */
3553 ut_ad(prev);
3554 ut_a(table->fts);
3555 fts_drop_index(table, index, trx);
3556 /* Since
3557 INNOBASE_SHARE::idx_trans_tbl
3558 is shared between all open
3559 ha_innobase handles to this
3560 table, no thread should be
3561 accessing this dict_index_t
3562 object. Also, we should be
3563 holding LOCK=SHARED MDL on the
3564 table even after the MDL
3565 upgrade timeout. */
3566
3567 /* We can remove a DICT_FTS
3568 index from the cache, because
3569 we do not allow ADD FULLTEXT INDEX
3570 with LOCK=NONE. If we allowed that,
3571 we should exclude FTS entries from
3572 prebuilt->ins_node->entry_list
3573 in ins_node_create_entry_list(). */
3574 dict_index_remove_from_cache(
3575 table, index);
3576 index = prev;
3577 } else {
3578 rw_lock_x_lock(
3579 dict_index_get_lock(index));
3580 dict_index_set_online_status(
3581 index, ONLINE_INDEX_ABORTED);
3582 index->type |= DICT_CORRUPT;
3583 table->drop_aborted = TRUE;
3584 goto drop_aborted;
3585 }
3586 continue;
3587 case ONLINE_INDEX_CREATION:
3588 rw_lock_x_lock(dict_index_get_lock(index));
3589 ut_ad(!index->is_committed());
3590 row_log_abort_sec(index);
3591 drop_aborted:
3592 rw_lock_x_unlock(dict_index_get_lock(index));
3593
3594 DEBUG_SYNC_C("merge_drop_index_after_abort");
3595 /* covered by dict_sys->mutex */
3596 MONITOR_INC(MONITOR_BACKGROUND_DROP_INDEX);
3597 /* fall through */
3598 case ONLINE_INDEX_ABORTED:
3599 /* Drop the index tree from the
3600 data dictionary and free it from
3601 the tablespace, but keep the object
3602 in the data dictionary cache. */
3603 row_merge_drop_index_dict(trx, index->id);
3604 rw_lock_x_lock(dict_index_get_lock(index));
3605 dict_index_set_online_status(
3606 index, ONLINE_INDEX_ABORTED_DROPPED);
3607 rw_lock_x_unlock(dict_index_get_lock(index));
3608 table->drop_aborted = TRUE;
3609 continue;
3610 }
3611 ut_error;
3612 }
3613
3614 return;
3615 }
3616
3617 row_merge_drop_indexes_dict(trx, table->id);
3618
3619 /* Invalidate all row_prebuilt_t::ins_graph that are referring
3620 to this table. That is, force row_get_prebuilt_insert_row() to
3621 rebuild prebuilt->ins_node->entry_list). */
3622 ut_ad(table->def_trx_id <= trx->id);
3623 table->def_trx_id = trx->id;
3624
3625 next_index = dict_table_get_next_index(index);
3626
3627 while ((index = next_index) != NULL) {
3628 /* read the next pointer before freeing the index */
3629 next_index = dict_table_get_next_index(index);
3630
3631 ut_ad(!dict_index_is_clust(index));
3632
3633 if (!index->is_committed()) {
3634 /* If it is FTS index, drop from table->fts
3635 and also drop its auxiliary tables */
3636 if (index->type & DICT_FTS) {
3637 ut_a(table->fts);
3638 fts_drop_index(table, index, trx);
3639 }
3640
3641 switch (dict_index_get_online_status(index)) {
3642 case ONLINE_INDEX_CREATION:
3643 /* This state should only be possible
3644 when prepare_inplace_alter_table() fails
3645 after invoking row_merge_create_index().
3646 In inplace_alter_table(),
3647 row_merge_build_indexes()
3648 should never leave the index in this state.
3649 It would invoke row_log_abort_sec() on
3650 failure. */
3651 case ONLINE_INDEX_COMPLETE:
3652 /* In these cases, we are able to drop
3653 the index straight. The DROP INDEX was
3654 never deferred. */
3655 break;
3656 case ONLINE_INDEX_ABORTED:
3657 case ONLINE_INDEX_ABORTED_DROPPED:
3658 /* covered by dict_sys->mutex */
3659 MONITOR_DEC(MONITOR_BACKGROUND_DROP_INDEX);
3660 }
3661
3662 dict_index_remove_from_cache(table, index);
3663 }
3664 }
3665
3666 table->drop_aborted = FALSE;
3667 ut_d(dict_table_check_for_dup_indexes(table, CHECK_ALL_COMPLETE));
3668 }
3669
3670 /*********************************************************************//**
3671 Drop all partially created indexes during crash recovery. */
3672 void
row_merge_drop_temp_indexes(void)3673 row_merge_drop_temp_indexes(void)
3674 /*=============================*/
3675 {
3676 static const char sql[] =
3677 "PROCEDURE DROP_TEMP_INDEXES_PROC () IS\n"
3678 "ixid CHAR;\n"
3679 "found INT;\n"
3680
3681 "DECLARE CURSOR index_cur IS\n"
3682 " SELECT ID FROM SYS_INDEXES\n"
3683 " WHERE SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
3684 "FOR UPDATE;\n"
3685
3686 "BEGIN\n"
3687 "found := 1;\n"
3688 "OPEN index_cur;\n"
3689 "WHILE found = 1 LOOP\n"
3690 " FETCH index_cur INTO ixid;\n"
3691 " IF (SQL % NOTFOUND) THEN\n"
3692 " found := 0;\n"
3693 " ELSE\n"
3694 " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
3695 " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
3696 " END IF;\n"
3697 "END LOOP;\n"
3698 "CLOSE index_cur;\n"
3699 "END;\n";
3700 trx_t* trx;
3701 dberr_t error;
3702
3703 /* Load the table definitions that contain partially defined
3704 indexes, so that the data dictionary information can be checked
3705 when accessing the tablename.ibd files. */
3706 trx = trx_allocate_for_background();
3707 trx->op_info = "dropping partially created indexes";
3708 row_mysql_lock_data_dictionary(trx);
3709 /* Ensure that this transaction will be rolled back and locks
3710 will be released, if the server gets killed before the commit
3711 gets written to the redo log. */
3712 trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
3713
3714 trx->op_info = "dropping indexes";
3715 error = que_eval_sql(NULL, sql, FALSE, trx);
3716
3717 if (error != DB_SUCCESS) {
3718 /* Even though we ensure that DDL transactions are WAIT
3719 and DEADLOCK free, we could encounter other errors e.g.,
3720 DB_TOO_MANY_CONCURRENT_TRXS. */
3721 trx->error_state = DB_SUCCESS;
3722
3723 ib::error() << "row_merge_drop_temp_indexes failed with error"
3724 << error;
3725 }
3726
3727 trx_commit_for_mysql(trx);
3728 row_mysql_unlock_data_dictionary(trx);
3729 trx_free_for_background(trx);
3730 }
3731
3732
3733 /** Create temporary merge files in the given paramater path, and if
3734 UNIV_PFS_IO defined, register the file descriptor with Performance Schema.
3735 @param[in] path location for creating temporary merge files.
3736 @return File descriptor */
3737 int
row_merge_file_create_low(const char * path)3738 row_merge_file_create_low(
3739 const char* path)
3740 {
3741 int fd;
3742 if (path == NULL) {
3743 path = innobase_mysql_tmpdir();
3744 }
3745 #ifdef UNIV_PFS_IO
3746 /* This temp file open does not go through normal
3747 file APIs, add instrumentation to register with
3748 performance schema */
3749 struct PSI_file_locker* locker = NULL;
3750 Datafile df;
3751 df.make_filepath(path, "Innodb Merge Temp File", NO_EXT);
3752
3753 PSI_file_locker_state state;
3754 locker = PSI_FILE_CALL(get_thread_file_name_locker)(
3755 &state, innodb_temp_file_key.m_value, PSI_FILE_OPEN, df.filepath(),
3756 &locker);
3757 if (locker != NULL) {
3758 PSI_FILE_CALL(start_file_open_wait)(locker,
3759 __FILE__,
3760 __LINE__);
3761 }
3762 #endif
3763 fd = innobase_mysql_tmpfile(path);
3764 #ifdef UNIV_PFS_IO
3765 if (locker != NULL) {
3766 PSI_FILE_CALL(end_file_open_wait_and_bind_to_descriptor)(
3767 locker, fd);
3768 }
3769 #endif
3770
3771 if (fd < 0) {
3772 ib::error() << "Cannot create temporary merge file";
3773 return(-1);
3774 }
3775 return(fd);
3776 }
3777
3778
3779 /** Create a merge file in the given location.
3780 @param[out] merge_file merge file structure
3781 @param[in] path location for creating temporary file
3782 @return file descriptor, or -1 on failure */
3783 int
row_merge_file_create(merge_file_t * merge_file,const char * path)3784 row_merge_file_create(
3785 merge_file_t* merge_file,
3786 const char* path)
3787 {
3788 merge_file->fd = row_merge_file_create_low(path);
3789 merge_file->offset = 0;
3790 merge_file->n_rec = 0;
3791
3792 if (merge_file->fd >= 0) {
3793 if (srv_disable_sort_file_cache) {
3794 os_file_set_nocache(merge_file->fd,
3795 "row0merge.cc", "sort");
3796 }
3797 }
3798 return(merge_file->fd);
3799 }
3800
3801 /*********************************************************************//**
3802 Destroy a merge file. And de-register the file from Performance Schema
3803 if UNIV_PFS_IO is defined. */
3804 void
row_merge_file_destroy_low(int fd)3805 row_merge_file_destroy_low(
3806 /*=======================*/
3807 int fd) /*!< in: merge file descriptor */
3808 {
3809 #ifdef UNIV_PFS_IO
3810 struct PSI_file_locker* locker = NULL;
3811 PSI_file_locker_state state;
3812 locker = PSI_FILE_CALL(get_thread_file_descriptor_locker)(
3813 &state, fd, PSI_FILE_CLOSE);
3814 if (locker != NULL) {
3815 PSI_FILE_CALL(start_file_wait)(
3816 locker, 0, __FILE__, __LINE__);
3817 }
3818 #endif
3819 if (fd >= 0) {
3820 close(fd);
3821 }
3822 #ifdef UNIV_PFS_IO
3823 if (locker != NULL) {
3824 PSI_FILE_CALL(end_file_wait)(locker, 0);
3825 }
3826 #endif
3827 }
3828 /*********************************************************************//**
3829 Destroy a merge file. */
3830 void
row_merge_file_destroy(merge_file_t * merge_file)3831 row_merge_file_destroy(
3832 /*===================*/
3833 merge_file_t* merge_file) /*!< in/out: merge file structure */
3834 {
3835 ut_ad(!srv_read_only_mode);
3836
3837 if (merge_file->fd != -1) {
3838 row_merge_file_destroy_low(merge_file->fd);
3839 merge_file->fd = -1;
3840 }
3841 }
3842
3843 /*********************************************************************//**
3844 Rename an index in the dictionary that was created. The data
3845 dictionary must have been locked exclusively by the caller, because
3846 the transaction will not be committed.
3847 @return DB_SUCCESS if all OK */
3848 dberr_t
row_merge_rename_index_to_add(trx_t * trx,table_id_t table_id,index_id_t index_id)3849 row_merge_rename_index_to_add(
3850 /*==========================*/
3851 trx_t* trx, /*!< in/out: transaction */
3852 table_id_t table_id, /*!< in: table identifier */
3853 index_id_t index_id) /*!< in: index identifier */
3854 {
3855 dberr_t err = DB_SUCCESS;
3856 pars_info_t* info = pars_info_create();
3857
3858 /* We use the private SQL parser of Innobase to generate the
3859 query graphs needed in renaming indexes. */
3860
3861 static const char rename_index[] =
3862 "PROCEDURE RENAME_INDEX_PROC () IS\n"
3863 "BEGIN\n"
3864 "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
3865 "WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
3866 "END;\n";
3867
3868 ut_ad(trx);
3869 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
3870 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3871
3872 trx->op_info = "renaming index to add";
3873
3874 pars_info_add_ull_literal(info, "tableid", table_id);
3875 pars_info_add_ull_literal(info, "indexid", index_id);
3876
3877 err = que_eval_sql(info, rename_index, FALSE, trx);
3878
3879 if (err != DB_SUCCESS) {
3880 /* Even though we ensure that DDL transactions are WAIT
3881 and DEADLOCK free, we could encounter other errors e.g.,
3882 DB_TOO_MANY_CONCURRENT_TRXS. */
3883 trx->error_state = DB_SUCCESS;
3884
3885 ib::error() << "row_merge_rename_index_to_add failed with"
3886 " error " << err;
3887 }
3888
3889 trx->op_info = "";
3890
3891 return(err);
3892 }
3893
3894 /*********************************************************************//**
3895 Rename an index in the dictionary that is to be dropped. The data
3896 dictionary must have been locked exclusively by the caller, because
3897 the transaction will not be committed.
3898 @return DB_SUCCESS if all OK */
3899 dberr_t
row_merge_rename_index_to_drop(trx_t * trx,table_id_t table_id,index_id_t index_id)3900 row_merge_rename_index_to_drop(
3901 /*===========================*/
3902 trx_t* trx, /*!< in/out: transaction */
3903 table_id_t table_id, /*!< in: table identifier */
3904 index_id_t index_id) /*!< in: index identifier */
3905 {
3906 dberr_t err;
3907 pars_info_t* info = pars_info_create();
3908
3909 ut_ad(!srv_read_only_mode);
3910
3911 /* We use the private SQL parser of Innobase to generate the
3912 query graphs needed in renaming indexes. */
3913
3914 static const char rename_index[] =
3915 "PROCEDURE RENAME_INDEX_PROC () IS\n"
3916 "BEGIN\n"
3917 "UPDATE SYS_INDEXES SET NAME=CONCAT('"
3918 TEMP_INDEX_PREFIX_STR "',NAME)\n"
3919 "WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
3920 "END;\n";
3921
3922 ut_ad(trx);
3923 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
3924 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3925
3926 trx->op_info = "renaming index to drop";
3927
3928 pars_info_add_ull_literal(info, "tableid", table_id);
3929 pars_info_add_ull_literal(info, "indexid", index_id);
3930
3931 err = que_eval_sql(info, rename_index, FALSE, trx);
3932
3933 if (err != DB_SUCCESS) {
3934 /* Even though we ensure that DDL transactions are WAIT
3935 and DEADLOCK free, we could encounter other errors e.g.,
3936 DB_TOO_MANY_CONCURRENT_TRXS. */
3937 trx->error_state = DB_SUCCESS;
3938
3939 ib::error() << "row_merge_rename_index_to_drop failed with"
3940 " error " << err;
3941 }
3942
3943 trx->op_info = "";
3944
3945 return(err);
3946 }
3947
3948 /*********************************************************************//**
3949 Provide a new pathname for a table that is being renamed if it belongs to
3950 a file-per-table tablespace. The caller is responsible for freeing the
3951 memory allocated for the return value.
3952 @return new pathname of tablespace file, or NULL if space = 0 */
3953 char*
row_make_new_pathname(dict_table_t * table,const char * new_name)3954 row_make_new_pathname(
3955 /*==================*/
3956 dict_table_t* table, /*!< in: table to be renamed */
3957 const char* new_name) /*!< in: new name */
3958 {
3959 char* new_path;
3960 char* old_path;
3961
3962 ut_ad(!is_system_tablespace(table->space));
3963
3964 old_path = fil_space_get_first_path(table->space);
3965 ut_a(old_path);
3966
3967 new_path = os_file_make_new_pathname(old_path, new_name);
3968
3969 ut_free(old_path);
3970
3971 return(new_path);
3972 }
3973
3974 /*********************************************************************//**
3975 Rename the tables in the data dictionary. The data dictionary must
3976 have been locked exclusively by the caller, because the transaction
3977 will not be committed.
3978 @return error code or DB_SUCCESS */
3979 dberr_t
row_merge_rename_tables_dict(dict_table_t * old_table,dict_table_t * new_table,const char * tmp_name,trx_t * trx)3980 row_merge_rename_tables_dict(
3981 /*=========================*/
3982 dict_table_t* old_table, /*!< in/out: old table, renamed to
3983 tmp_name */
3984 dict_table_t* new_table, /*!< in/out: new table, renamed to
3985 old_table->name */
3986 const char* tmp_name, /*!< in: new name for old_table */
3987 trx_t* trx) /*!< in/out: dictionary transaction */
3988 {
3989 dberr_t err = DB_ERROR;
3990 pars_info_t* info;
3991
3992 ut_ad(!srv_read_only_mode);
3993 ut_ad(old_table != new_table);
3994 ut_ad(mutex_own(&dict_sys->mutex));
3995 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
3996 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_TABLE
3997 || trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3998
3999 trx->op_info = "renaming tables";
4000
4001 /* We use the private SQL parser of Innobase to generate the query
4002 graphs needed in updating the dictionary data in system tables. */
4003
4004 info = pars_info_create();
4005
4006 pars_info_add_str_literal(info, "new_name", new_table->name.m_name);
4007 pars_info_add_str_literal(info, "old_name", old_table->name.m_name);
4008 pars_info_add_str_literal(info, "tmp_name", tmp_name);
4009
4010 err = que_eval_sql(info,
4011 "PROCEDURE RENAME_TABLES () IS\n"
4012 "BEGIN\n"
4013 "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
4014 " WHERE NAME = :old_name;\n"
4015 "UPDATE SYS_TABLES SET NAME = :old_name\n"
4016 " WHERE NAME = :new_name;\n"
4017 "END;\n", FALSE, trx);
4018
4019 /* Update SYS_TABLESPACES and SYS_DATAFILES if the old table being
4020 renamed is a single-table tablespace, which must be implicitly
4021 renamed along with the table. */
4022 if (err == DB_SUCCESS
4023 && dict_table_is_file_per_table(old_table)
4024 && !old_table->ibd_file_missing) {
4025 /* Make pathname to update SYS_DATAFILES. */
4026 char* tmp_path = row_make_new_pathname(old_table, tmp_name);
4027
4028 info = pars_info_create();
4029
4030 pars_info_add_str_literal(info, "tmp_name", tmp_name);
4031 pars_info_add_str_literal(info, "tmp_path", tmp_path);
4032 pars_info_add_int4_literal(info, "old_space",
4033 (lint) old_table->space);
4034
4035 err = que_eval_sql(info,
4036 "PROCEDURE RENAME_OLD_SPACE () IS\n"
4037 "BEGIN\n"
4038 "UPDATE SYS_TABLESPACES"
4039 " SET NAME = :tmp_name\n"
4040 " WHERE SPACE = :old_space;\n"
4041 "UPDATE SYS_DATAFILES"
4042 " SET PATH = :tmp_path\n"
4043 " WHERE SPACE = :old_space;\n"
4044 "END;\n", FALSE, trx);
4045
4046 ut_free(tmp_path);
4047 }
4048
4049 /* Update SYS_TABLESPACES and SYS_DATAFILES if the new table being
4050 renamed is a single-table tablespace, which must be implicitly
4051 renamed along with the table. */
4052 if (err == DB_SUCCESS
4053 && dict_table_is_file_per_table(new_table)) {
4054 /* Make pathname to update SYS_DATAFILES. */
4055 char* old_path = row_make_new_pathname(
4056 new_table, old_table->name.m_name);
4057
4058 info = pars_info_create();
4059
4060 pars_info_add_str_literal(info, "old_name",
4061 old_table->name.m_name);
4062 pars_info_add_str_literal(info, "old_path", old_path);
4063 pars_info_add_int4_literal(info, "new_space",
4064 (lint) new_table->space);
4065
4066 err = que_eval_sql(info,
4067 "PROCEDURE RENAME_NEW_SPACE () IS\n"
4068 "BEGIN\n"
4069 "UPDATE SYS_TABLESPACES"
4070 " SET NAME = :old_name\n"
4071 " WHERE SPACE = :new_space;\n"
4072 "UPDATE SYS_DATAFILES"
4073 " SET PATH = :old_path\n"
4074 " WHERE SPACE = :new_space;\n"
4075 "END;\n", FALSE, trx);
4076
4077 ut_free(old_path);
4078 }
4079
4080 if (err == DB_SUCCESS && dict_table_is_discarded(new_table)) {
4081 err = row_import_update_discarded_flag(
4082 trx, new_table->id, true, true);
4083 }
4084
4085 trx->op_info = "";
4086
4087 return(err);
4088 }
4089
4090 /** Create and execute a query graph for creating an index.
4091 @param[in,out] trx trx
4092 @param[in,out] table table
4093 @param[in,out] index index
4094 @param[in] add_v new virtual columns added along with add index call
4095 @return DB_SUCCESS or error code */
4096 static MY_ATTRIBUTE((warn_unused_result))
4097 dberr_t
row_merge_create_index_graph(trx_t * trx,dict_table_t * table,dict_index_t * index,const dict_add_v_col_t * add_v)4098 row_merge_create_index_graph(
4099 trx_t* trx,
4100 dict_table_t* table,
4101 dict_index_t* index,
4102 const dict_add_v_col_t* add_v)
4103 {
4104 ind_node_t* node; /*!< Index creation node */
4105 mem_heap_t* heap; /*!< Memory heap */
4106 que_thr_t* thr; /*!< Query thread */
4107 dberr_t err;
4108
4109 DBUG_ENTER("row_merge_create_index_graph");
4110
4111 ut_ad(trx);
4112 ut_ad(table);
4113 ut_ad(index);
4114
4115 heap = mem_heap_create(512);
4116
4117 index->table = table;
4118 node = ind_create_graph_create(index, heap, add_v);
4119 thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
4120
4121 ut_a(thr == que_fork_start_command(
4122 static_cast<que_fork_t*>(que_node_get_parent(thr))));
4123
4124 que_run_threads(thr);
4125
4126 err = trx->error_state;
4127
4128 que_graph_free((que_t*) que_node_get_parent(thr));
4129
4130 DBUG_RETURN(err);
4131 }
4132
4133 /** Create the index and load in to the dictionary.
4134 @param[in,out] trx trx (sets error_state)
4135 @param[in,out] table the index is on this table
4136 @param[in] index_def the index definition
4137 @param[in] add_v new virtual columns added along with add
4138 index call
4139 @return index, or NULL on error */
4140 dict_index_t*
row_merge_create_index(trx_t * trx,dict_table_t * table,const index_def_t * index_def,const dict_add_v_col_t * add_v)4141 row_merge_create_index(
4142 trx_t* trx,
4143 dict_table_t* table,
4144 const index_def_t* index_def,
4145 const dict_add_v_col_t* add_v)
4146 {
4147 dict_index_t* index;
4148 dberr_t err;
4149 ulint n_fields = index_def->n_fields;
4150 ulint i;
4151 bool has_new_v_col = false;
4152
4153 DBUG_ENTER("row_merge_create_index");
4154
4155 ut_ad(!srv_read_only_mode);
4156
4157 /* Create the index prototype, using the passed in def, this is not
4158 a persistent operation. We pass 0 as the space id, and determine at
4159 a lower level the space id where to store the table. */
4160
4161 index = dict_mem_index_create(table->name.m_name, index_def->name,
4162 0, index_def->ind_type, n_fields);
4163
4164 ut_a(index);
4165
4166 index->set_committed(index_def->rebuild);
4167
4168 for (i = 0; i < n_fields; i++) {
4169 const char* name;
4170 index_field_t* ifield = &index_def->fields[i];
4171
4172 if (ifield->is_v_col) {
4173 if (ifield->col_no >= table->n_v_def) {
4174 ut_ad(ifield->col_no < table->n_v_def
4175 + add_v->n_v_col);
4176 ut_ad(ifield->col_no >= table->n_v_def);
4177 name = add_v->v_col_name[
4178 ifield->col_no - table->n_v_def];
4179
4180 has_new_v_col = true;
4181 } else {
4182 name = dict_table_get_v_col_name(
4183 table, ifield->col_no);
4184 }
4185 } else {
4186 name = dict_table_get_col_name(table, ifield->col_no);
4187
4188 }
4189
4190 dict_mem_index_add_field(index, name, ifield->prefix_len);
4191 }
4192
4193 /* Add the index to SYS_INDEXES, using the index prototype. */
4194 err = row_merge_create_index_graph(trx, table, index, add_v);
4195
4196 if (err == DB_SUCCESS) {
4197
4198 index = dict_table_get_index_on_name(table, index_def->name,
4199 index_def->rebuild);
4200
4201 ut_a(index);
4202
4203 index->parser = index_def->parser;
4204 index->is_ngram = index_def->is_ngram;
4205 index->has_new_v_col = has_new_v_col;
4206
4207 /* Note the id of the transaction that created this
4208 index, we use it to restrict readers from accessing
4209 this index, to ensure read consistency. */
4210 ut_ad(index->trx_id == trx->id);
4211 } else {
4212 index = NULL;
4213 }
4214
4215 DBUG_RETURN(index);
4216 }
4217
4218 /*********************************************************************//**
4219 Check if a transaction can use an index. */
4220 ibool
row_merge_is_index_usable(const trx_t * trx,const dict_index_t * index)4221 row_merge_is_index_usable(
4222 /*======================*/
4223 const trx_t* trx, /*!< in: transaction */
4224 const dict_index_t* index) /*!< in: index to check */
4225 {
4226 if (!dict_index_is_clust(index)
4227 && dict_index_is_online_ddl(index)) {
4228 /* Indexes that are being created are not useable. */
4229 return(FALSE);
4230 }
4231
4232 return(!dict_index_is_corrupted(index)
4233 && (dict_table_is_temporary(index->table)
4234 || index->trx_id == 0
4235 || !MVCC::is_view_active(trx->read_view)
4236 || trx->read_view->changes_visible(
4237 index->trx_id,
4238 index->table->name)));
4239 }
4240
4241 /*********************************************************************//**
4242 Drop a table. The caller must have ensured that the background stats
4243 thread is not processing the table. This can be done by calling
4244 dict_stats_wait_bg_to_stop_using_table() after locking the dictionary and
4245 before calling this function.
4246 @return DB_SUCCESS or error code */
4247 dberr_t
row_merge_drop_table(trx_t * trx,dict_table_t * table)4248 row_merge_drop_table(
4249 /*=================*/
4250 trx_t* trx, /*!< in: transaction */
4251 dict_table_t* table) /*!< in: table to drop */
4252 {
4253 ut_ad(!srv_read_only_mode);
4254
4255 /* There must be no open transactions on the table. */
4256 ut_a(table->get_ref_count() == 0);
4257
4258 return(row_drop_table_for_mysql(table->name.m_name,
4259 trx, false, false));
4260 }
4261
4262 /** Write an MLOG_INDEX_LOAD record to indicate in the redo-log
4263 that redo-logging of individual index pages was disabled, and
4264 the flushing of such pages to the data files was completed.
4265 @param[in] index an index tree on which redo logging was disabled */
4266 static
4267 void
row_merge_write_redo(const dict_index_t * index)4268 row_merge_write_redo(
4269 const dict_index_t* index)
4270 {
4271 mtr_t mtr;
4272 byte* log_ptr;
4273
4274 ut_ad(!dict_table_is_temporary(index->table));
4275 mtr.start();
4276 log_ptr = mlog_open(&mtr, 11 + 8);
4277 log_ptr = mlog_write_initial_log_record_low(
4278 MLOG_INDEX_LOAD,
4279 index->space, index->page, log_ptr, &mtr);
4280 mach_write_to_8(log_ptr, index->id);
4281 mlog_close(&mtr, log_ptr + 8);
4282 mtr.commit();
4283 }
4284
4285 /** Build indexes on a table by reading a clustered index, creating a temporary
4286 file containing index entries, merge sorting these index entries and inserting
4287 sorted index entries to indexes.
4288 @param[in] trx transaction
4289 @param[in] old_table table where rows are read from
4290 @param[in] new_table table where indexes are created; identical to
4291 old_table unless creating a PRIMARY KEY
4292 @param[in] online true if creating indexes online
4293 @param[in] indexes indexes to be created
4294 @param[in] key_numbers MySQL key numbers
4295 @param[in] n_indexes size of indexes[]
4296 @param[in,out] table MySQL table, for reporting erroneous key value
4297 if applicable
4298 @param[in] add_cols default values of added columns, or NULL
4299 @param[in] col_map mapping of old column numbers to new ones, or
4300 NULL if old_table == new_table
4301 @param[in] add_autoinc number of added AUTO_INCREMENT columns, or
4302 ULINT_UNDEFINED if none is added
4303 @param[in,out] sequence autoinc sequence
4304 @param[in] skip_pk_sort whether the new PRIMARY KEY will follow
4305 existing order
4306 @param[in,out] stage performance schema accounting object, used by
4307 ALTER TABLE. stage->begin_phase_read_pk() will be called at the beginning of
4308 this function and it will be passed to other functions for further accounting.
4309 @param[in] add_v new virtual columns added along with indexes
4310 @param[in] eval_table mysql table used to evaluate virtual column
4311 value, see innobase_get_computed_value().
4312 @return DB_SUCCESS or error code */
4313 dberr_t
row_merge_build_indexes(trx_t * trx,dict_table_t * old_table,dict_table_t * new_table,bool online,dict_index_t ** indexes,const ulint * key_numbers,ulint n_indexes,struct TABLE * table,const dtuple_t * add_cols,const ulint * col_map,ulint add_autoinc,ib_sequence_t & sequence,bool skip_pk_sort,ut_stage_alter_t * stage,const dict_add_v_col_t * add_v,struct TABLE * eval_table)4314 row_merge_build_indexes(
4315 trx_t* trx,
4316 dict_table_t* old_table,
4317 dict_table_t* new_table,
4318 bool online,
4319 dict_index_t** indexes,
4320 const ulint* key_numbers,
4321 ulint n_indexes,
4322 struct TABLE* table,
4323 const dtuple_t* add_cols,
4324 const ulint* col_map,
4325 ulint add_autoinc,
4326 ib_sequence_t& sequence,
4327 bool skip_pk_sort,
4328 ut_stage_alter_t* stage,
4329 const dict_add_v_col_t* add_v,
4330 struct TABLE* eval_table)
4331 {
4332 merge_file_t* merge_files;
4333 row_merge_block_t* block;
4334 ut_new_pfx_t block_pfx;
4335 ulint i;
4336 ulint j;
4337 dberr_t error;
4338 int tmpfd = -1;
4339 dict_index_t* fts_sort_idx = NULL;
4340 fts_psort_t* psort_info = NULL;
4341 fts_psort_t* merge_info = NULL;
4342 int64_t sig_count = 0;
4343 bool fts_psort_initiated = false;
4344 DBUG_ENTER("row_merge_build_indexes");
4345
4346 ut_ad(!srv_read_only_mode);
4347 ut_ad((old_table == new_table) == !col_map);
4348 ut_ad(!add_cols || col_map);
4349
4350 stage->begin_phase_read_pk(skip_pk_sort && new_table != old_table
4351 ? n_indexes - 1
4352 : n_indexes);
4353
4354 /* Allocate memory for merge file data structure and initialize
4355 fields */
4356
4357 ut_allocator<row_merge_block_t> alloc(mem_key_row_merge_sort);
4358
4359 /* This will allocate "3 * srv_sort_buf_size" elements of type
4360 row_merge_block_t. The latter is defined as byte. */
4361 block = alloc.allocate_large(3 * srv_sort_buf_size, &block_pfx);
4362
4363 if (block == NULL) {
4364 DBUG_RETURN(DB_OUT_OF_MEMORY);
4365 }
4366
4367 trx_start_if_not_started_xa(trx, true);
4368
4369 /* Check if we need a flush observer to flush dirty pages.
4370 Since we disable redo logging in bulk load, so we should flush
4371 dirty pages before online log apply, because online log apply enables
4372 redo logging(we can do further optimization here).
4373 1. online add index: flush dirty pages right before row_log_apply().
4374 2. table rebuild: flush dirty pages before row_log_table_apply().
4375
4376 we use bulk load to create all types of indexes except spatial index,
4377 for which redo logging is enabled. If we create only spatial indexes,
4378 we don't need to flush dirty pages at all. */
4379 bool need_flush_observer = (old_table != new_table);
4380
4381 for (i = 0; i < n_indexes; i++) {
4382 if (!dict_index_is_spatial(indexes[i])) {
4383 need_flush_observer = true;
4384 }
4385 }
4386
4387 FlushObserver* flush_observer = NULL;
4388 if (need_flush_observer) {
4389 flush_observer = UT_NEW_NOKEY(
4390 FlushObserver(new_table->space, trx, stage));
4391
4392 trx_set_flush_observer(trx, flush_observer);
4393 }
4394
4395 merge_files = static_cast<merge_file_t*>(
4396 ut_malloc_nokey(n_indexes * sizeof *merge_files));
4397
4398 /* Initialize all the merge file descriptors, so that we
4399 don't call row_merge_file_destroy() on uninitialized
4400 merge file descriptor */
4401
4402 for (i = 0; i < n_indexes; i++) {
4403 merge_files[i].fd = -1;
4404 }
4405
4406 for (i = 0; i < n_indexes; i++) {
4407 if (indexes[i]->type & DICT_FTS) {
4408 ibool opt_doc_id_size = FALSE;
4409
4410 /* To build FTS index, we would need to extract
4411 doc's word, Doc ID, and word's position, so
4412 we need to build a "fts sort index" indexing
4413 on above three 'fields' */
4414 fts_sort_idx = row_merge_create_fts_sort_index(
4415 indexes[i], old_table, &opt_doc_id_size);
4416
4417 row_merge_dup_t* dup
4418 = static_cast<row_merge_dup_t*>(
4419 ut_malloc_nokey(sizeof *dup));
4420 dup->index = fts_sort_idx;
4421 dup->table = table;
4422 dup->col_map = col_map;
4423 dup->n_dup = 0;
4424
4425 row_fts_psort_info_init(
4426 trx, dup, new_table, opt_doc_id_size,
4427 &psort_info, &merge_info);
4428
4429 /* We need to ensure that we free the resources
4430 allocated */
4431 fts_psort_initiated = true;
4432 }
4433 }
4434
4435 /* Reset the MySQL row buffer that is used when reporting
4436 duplicate keys. */
4437 innobase_rec_reset(table);
4438
4439 /* Read clustered index of the table and create files for
4440 secondary index entries for merge sort */
4441 error = row_merge_read_clustered_index(
4442 trx, table, old_table, new_table, online, indexes,
4443 fts_sort_idx, psort_info, merge_files, key_numbers,
4444 n_indexes, add_cols, add_v, col_map, add_autoinc,
4445 sequence, block, skip_pk_sort, &tmpfd, stage, eval_table);
4446
4447 stage->end_phase_read_pk();
4448
4449 if (error != DB_SUCCESS) {
4450
4451 goto func_exit;
4452 }
4453
4454 DEBUG_SYNC_C("row_merge_after_scan");
4455
4456 /* Now we have files containing index entries ready for
4457 sorting and inserting. */
4458
4459 for (i = 0; i < n_indexes; i++) {
4460 dict_index_t* sort_idx = indexes[i];
4461
4462 if (dict_index_is_spatial(sort_idx)) {
4463 continue;
4464 }
4465
4466 if (indexes[i]->type & DICT_FTS) {
4467 os_event_t fts_parallel_merge_event;
4468
4469 sort_idx = fts_sort_idx;
4470
4471 fts_parallel_merge_event
4472 = merge_info[0].psort_common->merge_event;
4473
4474 if (FTS_PLL_MERGE) {
4475 ulint trial_count = 0;
4476 bool all_exit = false;
4477
4478 os_event_reset(fts_parallel_merge_event);
4479 row_fts_start_parallel_merge(merge_info);
4480 wait_again:
4481 os_event_wait_time_low(
4482 fts_parallel_merge_event, 1000000,
4483 sig_count);
4484
4485 for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
4486 if (merge_info[j].child_status
4487 != FTS_CHILD_COMPLETE
4488 && merge_info[j].child_status
4489 != FTS_CHILD_EXITING) {
4490 sig_count = os_event_reset(
4491 fts_parallel_merge_event);
4492
4493 goto wait_again;
4494 }
4495 }
4496
4497 /* Now all children should complete, wait
4498 a bit until they all finish using event */
4499 while (!all_exit && trial_count < 10000) {
4500 all_exit = true;
4501
4502 for (j = 0; j < FTS_NUM_AUX_INDEX;
4503 j++) {
4504 if (merge_info[j].child_status
4505 != FTS_CHILD_EXITING) {
4506 all_exit = false;
4507 os_thread_sleep(1000);
4508 break;
4509 }
4510 }
4511 trial_count++;
4512 }
4513
4514 if (!all_exit) {
4515 ib::error() << "Not all child merge"
4516 " threads exited when creating"
4517 " FTS index '"
4518 << indexes[i]->name << "'";
4519 } else {
4520 for (j = 0; j < FTS_NUM_AUX_INDEX;
4521 j++) {
4522
4523 os_thread_join(merge_info[j]
4524 .thread_hdl);
4525 }
4526 }
4527 } else {
4528 /* This cannot report duplicates; an
4529 assertion would fail in that case. */
4530 error = row_fts_merge_insert(
4531 sort_idx, new_table,
4532 psort_info, 0);
4533 }
4534
4535 #ifdef FTS_INTERNAL_DIAG_PRINT
4536 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n");
4537 #endif
4538 } else if (merge_files[i].fd >= 0) {
4539 row_merge_dup_t dup = {
4540 sort_idx, table, col_map, 0};
4541
4542 error = row_merge_sort(
4543 trx, &dup, &merge_files[i],
4544 block, &tmpfd, stage);
4545
4546 if (error == DB_SUCCESS) {
4547 BtrBulk btr_bulk(sort_idx, trx->id,
4548 flush_observer);
4549 btr_bulk.init();
4550
4551 error = row_merge_insert_index_tuples(
4552 trx->id, sort_idx, old_table,
4553 merge_files[i].fd, block, NULL,
4554 &btr_bulk, stage);
4555
4556 error = btr_bulk.finish(error);
4557 }
4558 }
4559
4560 /* Close the temporary file to free up space. */
4561 row_merge_file_destroy(&merge_files[i]);
4562
4563 if (indexes[i]->type & DICT_FTS) {
4564 row_fts_psort_info_destroy(psort_info, merge_info);
4565 fts_psort_initiated = false;
4566 } else if (error != DB_SUCCESS || !online) {
4567 /* Do not apply any online log. */
4568 } else if (old_table != new_table) {
4569 ut_ad(!sort_idx->online_log);
4570 ut_ad(sort_idx->online_status
4571 == ONLINE_INDEX_COMPLETE);
4572 } else {
4573 ut_ad(need_flush_observer);
4574
4575 flush_observer->flush();
4576 row_merge_write_redo(indexes[i]);
4577
4578 DEBUG_SYNC_C("row_log_apply_before");
4579 error = row_log_apply(trx, sort_idx, table, stage);
4580 DEBUG_SYNC_C("row_log_apply_after");
4581 }
4582
4583 if (error != DB_SUCCESS) {
4584 trx->error_key_num = key_numbers[i];
4585 goto func_exit;
4586 }
4587
4588 if (indexes[i]->type & DICT_FTS && fts_enable_diag_print) {
4589 ib::info() << "Finished building full-text index "
4590 << indexes[i]->name;
4591 }
4592 }
4593
4594 func_exit:
4595 DBUG_EXECUTE_IF(
4596 "ib_build_indexes_too_many_concurrent_trxs",
4597 error = DB_TOO_MANY_CONCURRENT_TRXS;
4598 trx->error_state = error;);
4599
4600 if (fts_psort_initiated) {
4601 /* Clean up FTS psort related resource */
4602 row_fts_psort_info_destroy(psort_info, merge_info);
4603 fts_psort_initiated = false;
4604 }
4605
4606 row_merge_file_destroy_low(tmpfd);
4607
4608 for (i = 0; i < n_indexes; i++) {
4609 row_merge_file_destroy(&merge_files[i]);
4610 }
4611
4612 if (fts_sort_idx) {
4613 dict_mem_index_free(fts_sort_idx);
4614 }
4615
4616 ut_free(merge_files);
4617
4618 alloc.deallocate_large(block, &block_pfx);
4619
4620 DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID);
4621
4622 if (online && old_table == new_table && error != DB_SUCCESS) {
4623 /* On error, flag all online secondary index creation
4624 as aborted. */
4625 for (i = 0; i < n_indexes; i++) {
4626 ut_ad(!(indexes[i]->type & DICT_FTS));
4627 ut_ad(!indexes[i]->is_committed());
4628 ut_ad(!dict_index_is_clust(indexes[i]));
4629
4630 /* Completed indexes should be dropped as
4631 well, and indexes whose creation was aborted
4632 should be dropped from the persistent
4633 storage. However, at this point we can only
4634 set some flags in the not-yet-published
4635 indexes. These indexes will be dropped later
4636 in row_merge_drop_indexes(), called by
4637 rollback_inplace_alter_table(). */
4638
4639 switch (dict_index_get_online_status(indexes[i])) {
4640 case ONLINE_INDEX_COMPLETE:
4641 break;
4642 case ONLINE_INDEX_CREATION:
4643 rw_lock_x_lock(
4644 dict_index_get_lock(indexes[i]));
4645 row_log_abort_sec(indexes[i]);
4646 indexes[i]->type |= DICT_CORRUPT;
4647 rw_lock_x_unlock(
4648 dict_index_get_lock(indexes[i]));
4649 new_table->drop_aborted = TRUE;
4650 /* fall through */
4651 case ONLINE_INDEX_ABORTED_DROPPED:
4652 case ONLINE_INDEX_ABORTED:
4653 MONITOR_ATOMIC_INC(
4654 MONITOR_BACKGROUND_DROP_INDEX);
4655 }
4656 }
4657 }
4658
4659 DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
4660
4661 if (flush_observer != NULL) {
4662 ut_ad(need_flush_observer);
4663
4664 DBUG_EXECUTE_IF("ib_index_build_fail_before_flush",
4665 error = DB_FAIL;
4666 );
4667
4668 if (error != DB_SUCCESS) {
4669 flush_observer->interrupted();
4670 }
4671
4672 flush_observer->flush();
4673
4674 UT_DELETE(flush_observer);
4675
4676 if (trx_is_interrupted(trx)) {
4677 error = DB_INTERRUPTED;
4678 }
4679
4680 if (error == DB_SUCCESS && old_table != new_table) {
4681 for (const dict_index_t* index
4682 = dict_table_get_first_index(new_table);
4683 index != NULL;
4684 index = dict_table_get_next_index(index)) {
4685 row_merge_write_redo(index);
4686 }
4687 }
4688 }
4689
4690 DBUG_RETURN(error);
4691 }
4692