1 /*****************************************************************************
2
3 Copyright (c) 2005, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0merge.cc
29 New index creation routines using a merge sort
30
31 Created 12/4/2005 Jan Lindstrom
32 Completed by Sunny Bains and Marko Makela
33 *******************************************************/
34
35 #include <math.h>
36
37 #include "ha_prototypes.h"
38
39 #include "row0merge.h"
40 #include "row0ext.h"
41 #include "row0log.h"
42 #include "row0ins.h"
43 #include "row0sel.h"
44 #include "dict0crea.h"
45 #include "trx0purge.h"
46 #include "lock0lock.h"
47 #include "pars0pars.h"
48 #include "ut0sort.h"
49 #include "row0ftsort.h"
50 #include "row0import.h"
51 #include "handler0alter.h"
52 #include "btr0bulk.h"
53 #include "fsp0sysspace.h"
54 #include "ut0new.h"
55 #include "ut0stage.h"
56 #include "os0file.h"
57 #include "my_aes.h"
58
59 /* Ignore posix_fadvise() on those platforms where it does not exist */
60 #if defined _WIN32
61 # define posix_fadvise(fd, offset, len, advice) /* nothing */
62 #endif /* _WIN32 */
63
64 /* Whether to disable file system cache */
65 char srv_disable_sort_file_cache;
66
67 /** Class that caches index row tuples made from a single cluster
68 index page scan, and then insert into corresponding index tree */
69 class index_tuple_info_t {
70 public:
71 /** constructor
72 @param[in] heap memory heap
73 @param[in] index index to be created */
index_tuple_info_t(mem_heap_t * heap,dict_index_t * index)74 index_tuple_info_t(
75 mem_heap_t* heap,
76 dict_index_t* index) UNIV_NOTHROW
77 {
78 m_heap = heap;
79 m_index = index;
80 m_dtuple_vec = UT_NEW_NOKEY(idx_tuple_vec());
81 }
82
83 /** destructor */
~index_tuple_info_t()84 ~index_tuple_info_t()
85 {
86 UT_DELETE(m_dtuple_vec);
87 }
88
89 /** Get the index object
90 @return the index object */
get_index()91 dict_index_t* get_index() UNIV_NOTHROW
92 {
93 return(m_index);
94 }
95
96 /** Caches an index row into index tuple vector
97 @param[in] row table row
98 @param[in] ext externally stored column
99 prefixes, or NULL */
add(const dtuple_t * row,const row_ext_t * ext)100 void add(
101 const dtuple_t* row,
102 const row_ext_t* ext) UNIV_NOTHROW
103 {
104 dtuple_t* dtuple;
105
106 dtuple = row_build_index_entry(row, ext, m_index, m_heap);
107
108 ut_ad(dtuple);
109
110 m_dtuple_vec->push_back(dtuple);
111 }
112
113 /** Insert spatial index rows cached in vector into spatial index
114 @param[in] trx_id transaction id
115 @param[in,out] row_heap memory heap
116 @param[in] pcur cluster index scanning cursor
117 @param[in,out] scan_mtr mini-transaction for pcur
118 @param[out] mtr_committed whether scan_mtr got committed
119 @return DB_SUCCESS if successful, else error number */
insert(trx_id_t trx_id,mem_heap_t * row_heap,btr_pcur_t * pcur,mtr_t * scan_mtr,bool * mtr_committed)120 dberr_t insert(
121 trx_id_t trx_id,
122 mem_heap_t* row_heap,
123 btr_pcur_t* pcur,
124 mtr_t* scan_mtr,
125 bool* mtr_committed)
126 {
127 big_rec_t* big_rec;
128 rec_t* rec;
129 btr_cur_t ins_cur;
130 mtr_t mtr;
131 rtr_info_t rtr_info;
132 ulint* ins_offsets = NULL;
133 dberr_t error = DB_SUCCESS;
134 dtuple_t* dtuple;
135 ulint count = 0;
136 const ulint flag = BTR_NO_UNDO_LOG_FLAG
137 | BTR_NO_LOCKING_FLAG
138 | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG;
139
140 ut_ad(dict_index_is_spatial(m_index));
141
142 DBUG_EXECUTE_IF("row_merge_instrument_log_check_flush",
143 log_sys->check_flush_or_checkpoint = true;
144 );
145
146 for (idx_tuple_vec::iterator it = m_dtuple_vec->begin();
147 it != m_dtuple_vec->end();
148 ++it) {
149 dtuple = *it;
150 ut_ad(dtuple);
151
152 if (log_sys->check_flush_or_checkpoint) {
153 if (!(*mtr_committed)) {
154 btr_pcur_move_to_prev_on_page(pcur);
155 btr_pcur_store_position(pcur, scan_mtr);
156 mtr_commit(scan_mtr);
157 *mtr_committed = true;
158 }
159
160 log_free_check();
161 }
162
163 mtr.start();
164 mtr.set_named_space(m_index->space);
165
166 ins_cur.index = m_index;
167 rtr_init_rtr_info(&rtr_info, false, &ins_cur, m_index,
168 false);
169 rtr_info_update_btr(&ins_cur, &rtr_info);
170
171 btr_cur_search_to_nth_level(m_index, 0, dtuple,
172 PAGE_CUR_RTREE_INSERT,
173 BTR_MODIFY_LEAF, &ins_cur,
174 0, __FILE__, __LINE__,
175 &mtr);
176
177 /* It need to update MBR in parent entry,
178 so change search mode to BTR_MODIFY_TREE */
179 if (rtr_info.mbr_adj) {
180 mtr_commit(&mtr);
181 rtr_clean_rtr_info(&rtr_info, true);
182 rtr_init_rtr_info(&rtr_info, false, &ins_cur,
183 m_index, false);
184 rtr_info_update_btr(&ins_cur, &rtr_info);
185 mtr_start(&mtr);
186 mtr.set_named_space(m_index->space);
187 btr_cur_search_to_nth_level(
188 m_index, 0, dtuple,
189 PAGE_CUR_RTREE_INSERT,
190 BTR_MODIFY_TREE, &ins_cur, 0,
191 __FILE__, __LINE__, &mtr);
192 }
193
194 error = btr_cur_optimistic_insert(
195 flag, &ins_cur, &ins_offsets, &row_heap,
196 dtuple, &rec, &big_rec, 0, NULL, &mtr);
197
198 if (error == DB_FAIL) {
199 ut_ad(!big_rec);
200 mtr.commit();
201 mtr.start();
202 mtr.set_named_space(m_index->space);
203
204 rtr_clean_rtr_info(&rtr_info, true);
205 rtr_init_rtr_info(&rtr_info, false,
206 &ins_cur, m_index, false);
207
208 rtr_info_update_btr(&ins_cur, &rtr_info);
209 btr_cur_search_to_nth_level(
210 m_index, 0, dtuple,
211 PAGE_CUR_RTREE_INSERT,
212 BTR_MODIFY_TREE,
213 &ins_cur, 0,
214 __FILE__, __LINE__, &mtr);
215
216
217 error = btr_cur_pessimistic_insert(
218 flag, &ins_cur, &ins_offsets,
219 &row_heap, dtuple, &rec,
220 &big_rec, 0, NULL, &mtr);
221 }
222
223 DBUG_EXECUTE_IF(
224 "row_merge_ins_spatial_fail",
225 error = DB_FAIL;
226 );
227
228 if (error == DB_SUCCESS) {
229 if (rtr_info.mbr_adj) {
230 error = rtr_ins_enlarge_mbr(
231 &ins_cur, NULL, &mtr);
232 }
233
234 if (error == DB_SUCCESS) {
235 page_update_max_trx_id(
236 btr_cur_get_block(&ins_cur),
237 btr_cur_get_page_zip(&ins_cur),
238 trx_id, &mtr);
239 }
240 }
241
242 mtr_commit(&mtr);
243
244 rtr_clean_rtr_info(&rtr_info, true);
245 count++;
246 }
247
248 m_dtuple_vec->clear();
249
250 return(error);
251 }
252
253 private:
254 /** Cache index rows made from a cluster index scan. Usually
255 for rows on single cluster index page */
256 typedef std::vector<dtuple_t*, ut_allocator<dtuple_t*> >
257 idx_tuple_vec;
258
259 /** vector used to cache index rows made from cluster index scan */
260 idx_tuple_vec* m_dtuple_vec;
261
262 /** the index being built */
263 dict_index_t* m_index;
264
265 /** memory heap for creating index tuples */
266 mem_heap_t* m_heap;
267 };
268
269 /* Maximum pending doc memory limit in bytes for a fts tokenization thread */
270 #define FTS_PENDING_DOC_MEMORY_LIMIT 1000000
271
272 /** Insert sorted data tuples to the index.
273 @param[in] trx_id transaction identifier
274 @param[in] index index to be inserted
275 @param[in] old_table old table
276 @param[in] fd file descriptor
277 @param[in,out] block file buffer
278 @param[in,out] crypt_block encrypted file buffer
279 @param[in] space_id tablespace id
280 @param[in] row_buf row_buf the sorted data tuples,
281 or NULL if fd, block will be used instead
282 @param[in,out] btr_bulk btr bulk instance
283 @param[in,out] stage performance schema accounting object, used by
284 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
285 and then stage->inc() will be called for each record that is processed.
286 @return DB_SUCCESS or error number */
287 static MY_ATTRIBUTE((warn_unused_result))
288 dberr_t
289 row_merge_insert_index_tuples(
290 trx_id_t trx_id,
291 dict_index_t* index,
292 const dict_table_t* old_table,
293 int fd,
294 row_merge_block_t* block,
295 row_merge_block_t* crypt_block,
296 ulint space_id,
297 const row_merge_buf_t* row_buf,
298 BtrBulk* btr_bulk,
299 ut_stage_alter_t* stage = NULL);
300
301 /******************************************************//**
302 Encode an index record. */
303 static
304 void
row_merge_buf_encode(byte ** b,const dict_index_t * index,const mtuple_t * entry,ulint n_fields)305 row_merge_buf_encode(
306 /*=================*/
307 byte** b, /*!< in/out: pointer to
308 current end of output buffer */
309 const dict_index_t* index, /*!< in: index */
310 const mtuple_t* entry, /*!< in: index fields
311 of the record to encode */
312 ulint n_fields) /*!< in: number of fields
313 in the entry */
314 {
315 ulint size;
316 ulint extra_size;
317
318 size = rec_get_converted_size_temp(
319 index, entry->fields, n_fields, NULL, &extra_size);
320 ut_ad(size >= extra_size);
321
322 /* Encode extra_size + 1 */
323 if (extra_size + 1 < 0x80) {
324 *(*b)++ = (byte) (extra_size + 1);
325 } else {
326 ut_ad((extra_size + 1) < 0x8000);
327 *(*b)++ = (byte) (0x80 | ((extra_size + 1) >> 8));
328 *(*b)++ = (byte) (extra_size + 1);
329 }
330
331 rec_convert_dtuple_to_temp(*b + extra_size, index,
332 entry->fields, n_fields, NULL);
333
334 *b += size;
335 }
336
337 /******************************************************//**
338 Allocate a sort buffer.
339 @return own: sort buffer */
340 static MY_ATTRIBUTE((malloc))
341 row_merge_buf_t*
row_merge_buf_create_low(mem_heap_t * heap,dict_index_t * index,ulint max_tuples,ulint buf_size)342 row_merge_buf_create_low(
343 /*=====================*/
344 mem_heap_t* heap, /*!< in: heap where allocated */
345 dict_index_t* index, /*!< in: secondary index */
346 ulint max_tuples, /*!< in: maximum number of
347 data tuples */
348 ulint buf_size) /*!< in: size of the buffer,
349 in bytes */
350 {
351 row_merge_buf_t* buf;
352
353 ut_ad(max_tuples > 0);
354
355 ut_ad(max_tuples <= srv_sort_buf_size);
356
357 buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
358 buf->heap = heap;
359 buf->index = index;
360 buf->max_tuples = max_tuples;
361 buf->tuples = static_cast<mtuple_t*>(
362 ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
363 buf->tmp_tuples = buf->tuples + max_tuples;
364
365 return(buf);
366 }
367
368 /******************************************************//**
369 Allocate a sort buffer.
370 @return own: sort buffer */
371 row_merge_buf_t*
row_merge_buf_create(dict_index_t * index)372 row_merge_buf_create(
373 /*=================*/
374 dict_index_t* index) /*!< in: secondary index */
375 {
376 row_merge_buf_t* buf;
377 ulint max_tuples;
378 ulint buf_size;
379 mem_heap_t* heap;
380
381 max_tuples = static_cast<ulint>(srv_sort_buf_size)
382 / ut_max(static_cast<ulint>(1),
383 dict_index_get_min_size(index));
384
385 buf_size = (sizeof *buf);
386
387 heap = mem_heap_create(buf_size);
388
389 buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
390
391 return(buf);
392 }
393
394 /******************************************************//**
395 Empty a sort buffer.
396 @return sort buffer */
397 row_merge_buf_t*
row_merge_buf_empty(row_merge_buf_t * buf)398 row_merge_buf_empty(
399 /*================*/
400 row_merge_buf_t* buf) /*!< in,own: sort buffer */
401 {
402 ulint buf_size = sizeof *buf;
403 ulint max_tuples = buf->max_tuples;
404 mem_heap_t* heap = buf->heap;
405 dict_index_t* index = buf->index;
406 mtuple_t* tuples = buf->tuples;
407
408 mem_heap_empty(heap);
409
410 buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
411 buf->heap = heap;
412 buf->index = index;
413 buf->max_tuples = max_tuples;
414 buf->tuples = tuples;
415 buf->tmp_tuples = buf->tuples + max_tuples;
416
417 return(buf);
418 }
419
420 /******************************************************//**
421 Deallocate a sort buffer. */
422 void
row_merge_buf_free(row_merge_buf_t * buf)423 row_merge_buf_free(
424 /*===============*/
425 row_merge_buf_t* buf) /*!< in,own: sort buffer to be freed */
426 {
427 ut_free(buf->tuples);
428 mem_heap_free(buf->heap);
429 }
430
431 /** Convert the field data from compact to redundant format.
432 @param[in] row_field field to copy from
433 @param[out] field field to copy to
434 @param[in] len length of the field data
435 @param[in] zip_size compressed BLOB page size,
436 zero for uncompressed BLOBs
437 @param[in,out] heap memory heap where to allocate data when
438 converting to ROW_FORMAT=REDUNDANT, or NULL
439 when not to invoke
440 row_merge_buf_redundant_convert(). */
441 static
442 void
row_merge_buf_redundant_convert(const dfield_t * row_field,dfield_t * field,ulint len,const page_size_t & page_size,mem_heap_t * heap)443 row_merge_buf_redundant_convert(
444 const dfield_t* row_field,
445 dfield_t* field,
446 ulint len,
447 const page_size_t& page_size,
448 mem_heap_t* heap)
449 {
450 ut_ad(DATA_MBMINLEN(field->type.mbminmaxlen) == 1);
451 ut_ad(DATA_MBMAXLEN(field->type.mbminmaxlen) > 1);
452
453 byte* buf = (byte*) mem_heap_alloc(heap, len);
454 ulint field_len = row_field->len;
455 ut_ad(field_len <= len);
456
457 if (row_field->ext) {
458 const byte* field_data = static_cast<byte*>(
459 dfield_get_data(row_field));
460 ulint ext_len;
461
462 ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
463 ut_a(memcmp(field_data + field_len - BTR_EXTERN_FIELD_REF_SIZE,
464 field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
465
466 byte* data = btr_copy_externally_stored_field(
467 &ext_len, field_data, page_size, field_len, heap);
468
469 ut_ad(ext_len < len);
470
471 memcpy(buf, data, ext_len);
472 field_len = ext_len;
473 } else {
474 memcpy(buf, row_field->data, field_len);
475 }
476
477 memset(buf + field_len, 0x20, len - field_len);
478
479 dfield_set_data(field, buf, len);
480 }
481
482 /** Insert a data tuple into a sort buffer.
483 @param[in,out] buf sort buffer
484 @param[in] fts_index fts index to be created
485 @param[in] old_table original table
486 @param[in] new_table new table
487 @param[in,out] psort_info parallel sort info
488 @param[in] row table row
489 @param[in] ext cache of externally stored
490 column prefixes, or NULL
491 @param[in,out] doc_id Doc ID if we are creating
492 FTS index
493 @param[in,out] conv_heap memory heap where to allocate data when
494 converting to ROW_FORMAT=REDUNDANT, or NULL
495 when not to invoke
496 row_merge_buf_redundant_convert()
497 @param[in,out] err set if error occurs
498 @param[in,out] v_heap heap memory to process data for virtual column
499 @param[in,out] my_table mysql table object
500 @param[in] trx transaction object
501 @param[in] prebuilt compress_heap must be taken from here
502 @return number of rows added, 0 if out of space */
503 static
504 ulint
row_merge_buf_add(row_merge_buf_t * buf,dict_index_t * fts_index,const dict_table_t * old_table,const dict_table_t * new_table,fts_psort_t * psort_info,const dtuple_t * row,const row_ext_t * ext,doc_id_t * doc_id,mem_heap_t * conv_heap,dberr_t * err,mem_heap_t ** v_heap,TABLE * my_table,trx_t * trx,row_prebuilt_t * prebuilt)505 row_merge_buf_add(
506 row_merge_buf_t* buf,
507 dict_index_t* fts_index,
508 const dict_table_t* old_table,
509 const dict_table_t* new_table,
510 fts_psort_t* psort_info,
511 const dtuple_t* row,
512 const row_ext_t* ext,
513 doc_id_t* doc_id,
514 mem_heap_t* conv_heap,
515 dberr_t* err,
516 mem_heap_t** v_heap,
517 TABLE* my_table,
518 trx_t* trx,
519 row_prebuilt_t* prebuilt)
520 {
521 ulint i;
522 const dict_index_t* index;
523 mtuple_t* entry;
524 dfield_t* field;
525 const dict_field_t* ifield;
526 ulint n_fields;
527 ulint data_size;
528 ulint extra_size;
529 ulint bucket = 0;
530 doc_id_t write_doc_id;
531 ulint n_row_added = 0;
532 DBUG_ENTER("row_merge_buf_add");
533
534 if (buf->n_tuples >= buf->max_tuples) {
535 DBUG_RETURN(0);
536 }
537
538 DBUG_EXECUTE_IF(
539 "ib_row_merge_buf_add_two",
540 if (buf->n_tuples >= 2) DBUG_RETURN(0););
541
542 UNIV_PREFETCH_R(row->fields);
543
544 /* If we are building FTS index, buf->index points to
545 the 'fts_sort_idx', and real FTS index is stored in
546 fts_index */
547 index = (buf->index->type & DICT_FTS) ? fts_index : buf->index;
548
549 /* create spatial index should not come here */
550 ut_ad(!dict_index_is_spatial(index));
551
552 n_fields = dict_index_get_n_fields(index);
553
554 entry = &buf->tuples[buf->n_tuples];
555 field = entry->fields = static_cast<dfield_t*>(
556 mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
557
558 data_size = 0;
559 extra_size = UT_BITS_IN_BYTES(index->n_nullable);
560
561 ifield = dict_index_get_nth_field(index, 0);
562
563 for (i = 0; i < n_fields; i++, field++, ifield++) {
564 ulint len;
565 const dict_col_t* col;
566 const dict_v_col_t* v_col = NULL;
567 ulint col_no;
568 ulint fixed_len;
569 const dfield_t* row_field;
570
571 col = ifield->col;
572 if (dict_col_is_virtual(col)) {
573 v_col = reinterpret_cast<const dict_v_col_t*>(col);
574 }
575
576 col_no = dict_col_get_no(col);
577
578 /* Process the Doc ID column */
579 if (*doc_id > 0
580 && col_no == index->table->fts->doc_col
581 && !dict_col_is_virtual(col)) {
582 fts_write_doc_id((byte*) &write_doc_id, *doc_id);
583
584 /* Note: field->data now points to a value on the
585 stack: &write_doc_id after dfield_set_data(). Because
586 there is only one doc_id per row, it shouldn't matter.
587 We allocate a new buffer before we leave the function
588 later below. */
589
590 dfield_set_data(
591 field, &write_doc_id, sizeof(write_doc_id));
592
593 field->type.mtype = ifield->col->mtype;
594 field->type.prtype = ifield->col->prtype;
595 field->type.mbminmaxlen = DATA_MBMINMAXLEN(0, 0);
596 field->type.len = ifield->col->len;
597 } else {
598 /* Use callback to get the virtual column value */
599 if (dict_col_is_virtual(col)) {
600 dict_index_t* clust_index
601 = dict_table_get_first_index(new_table);
602
603 row_field = innobase_get_computed_value(
604 row, v_col, clust_index,
605 v_heap, NULL, ifield, trx->mysql_thd,
606 my_table, old_table, NULL, NULL,
607 prebuilt);
608
609 if (row_field == NULL) {
610 *err = DB_COMPUTE_VALUE_FAILED;
611 DBUG_RETURN(0);
612 }
613 dfield_copy(field, row_field);
614 } else {
615 row_field = dtuple_get_nth_field(row, col_no);
616 dfield_copy(field, row_field);
617 }
618
619
620 /* Tokenize and process data for FTS */
621 if (index->type & DICT_FTS) {
622 fts_doc_item_t* doc_item;
623 byte* value;
624 void* ptr;
625 const ulint max_trial_count = 10000;
626 ulint trial_count = 0;
627
628 /* fetch Doc ID if it already exists
629 in the row, and not supplied by the
630 caller. Even if the value column is
631 NULL, we still need to get the Doc
632 ID so to maintain the correct max
633 Doc ID */
634 if (*doc_id == 0) {
635 const dfield_t* doc_field;
636 doc_field = dtuple_get_nth_field(
637 row,
638 index->table->fts->doc_col);
639 *doc_id = (doc_id_t) mach_read_from_8(
640 static_cast<byte*>(
641 dfield_get_data(doc_field)));
642
643 if (*doc_id == 0) {
644 ib::warn() << "FTS Doc ID is"
645 " zero. Record"
646 " skipped";
647 DBUG_RETURN(0);
648 }
649 }
650
651 if (dfield_is_null(field)) {
652 n_row_added = 1;
653 continue;
654 }
655
656 ptr = ut_malloc_nokey(sizeof(*doc_item)
657 + field->len);
658
659 doc_item = static_cast<fts_doc_item_t*>(ptr);
660 value = static_cast<byte*>(ptr)
661 + sizeof(*doc_item);
662 memcpy(value, field->data, field->len);
663 field->data = value;
664
665 doc_item->field = field;
666 doc_item->doc_id = *doc_id;
667
668 bucket = *doc_id % fts_sort_pll_degree;
669
670 /* Add doc item to fts_doc_list */
671 mutex_enter(&psort_info[bucket].mutex);
672
673 if (psort_info[bucket].error == DB_SUCCESS) {
674 UT_LIST_ADD_LAST(
675 psort_info[bucket].fts_doc_list,
676 doc_item);
677 psort_info[bucket].memory_used +=
678 sizeof(*doc_item) + field->len;
679 } else {
680 ut_free(doc_item);
681 }
682
683 mutex_exit(&psort_info[bucket].mutex);
684
685 /* Sleep when memory used exceeds limit*/
686 while (psort_info[bucket].memory_used
687 > FTS_PENDING_DOC_MEMORY_LIMIT
688 && trial_count++ < max_trial_count) {
689 os_thread_sleep(1000);
690 }
691
692 n_row_added = 1;
693 continue;
694 }
695
696 if (field->len != UNIV_SQL_NULL
697 && col->mtype == DATA_MYSQL
698 && col->len != field->len) {
699 if (conv_heap != NULL) {
700 row_merge_buf_redundant_convert(
701 row_field, field, col->len,
702 dict_table_page_size(old_table),
703 conv_heap);
704 } else {
705 /* Field length mismatch should not
706 happen when rebuilding redundant row
707 format table. */
708 ut_ad(dict_table_is_comp(index->table));
709 }
710 }
711 }
712
713 len = dfield_get_len(field);
714
715 if (dfield_is_null(field)) {
716 ut_ad(!(col->prtype & DATA_NOT_NULL));
717 continue;
718 } else if (!ext) {
719 } else if (dict_index_is_clust(index)) {
720 /* Flag externally stored fields. */
721 const byte* buf = row_ext_lookup(ext, col_no,
722 &len);
723 if (UNIV_LIKELY_NULL(buf)) {
724 ut_a(buf != field_ref_zero);
725 if (i < dict_index_get_n_unique(index)) {
726 dfield_set_data(field, buf, len);
727 } else {
728 dfield_set_ext(field);
729 len = dfield_get_len(field);
730 }
731 }
732 } else if (!dict_col_is_virtual(col)) {
733 /* Only non-virtual column are stored externally */
734 const byte* buf = row_ext_lookup(ext, col_no,
735 &len);
736 if (UNIV_LIKELY_NULL(buf)) {
737 ut_a(buf != field_ref_zero);
738 dfield_set_data(field, buf, len);
739 }
740 }
741
742 /* If a column prefix index, take only the prefix */
743
744 if (ifield->prefix_len) {
745 len = dtype_get_at_most_n_mbchars(
746 col->prtype,
747 col->mbminmaxlen,
748 ifield->prefix_len,
749 len,
750 static_cast<char*>(dfield_get_data(field)));
751 dfield_set_len(field, len);
752 }
753
754 ut_ad(len <= col->len
755 || DATA_LARGE_MTYPE(col->mtype)
756 || (col->mtype == DATA_POINT
757 && len == DATA_MBR_LEN)
758 || ((col->mtype == DATA_VARCHAR
759 || col->mtype == DATA_BINARY
760 || col->mtype == DATA_VARMYSQL)
761 && (col->len == 0
762 || len <= col->len +
763 prtype_get_compression_extra(col->prtype))));
764
765 fixed_len = ifield->fixed_len;
766 if (fixed_len && !dict_table_is_comp(index->table)
767 && DATA_MBMINLEN(col->mbminmaxlen)
768 != DATA_MBMAXLEN(col->mbminmaxlen)) {
769 /* CHAR in ROW_FORMAT=REDUNDANT is always
770 fixed-length, but in the temporary file it is
771 variable-length for variable-length character
772 sets. */
773 fixed_len = 0;
774 }
775
776 if (fixed_len) {
777 #ifdef UNIV_DEBUG
778 ulint mbminlen = DATA_MBMINLEN(col->mbminmaxlen);
779 ulint mbmaxlen = DATA_MBMAXLEN(col->mbminmaxlen);
780
781 /* len should be between size calcualted base on
782 mbmaxlen and mbminlen */
783 ut_ad(len <= fixed_len);
784 ut_ad(!mbmaxlen || len >= mbminlen
785 * (fixed_len / mbmaxlen));
786
787 ut_ad(!dfield_is_ext(field));
788 #endif /* UNIV_DEBUG */
789 } else if (dfield_is_ext(field)) {
790 extra_size += 2;
791 } else if (len < 128
792 || (!DATA_BIG_COL(col))) {
793 extra_size++;
794 } else {
795 /* For variable-length columns, we look up the
796 maximum length from the column itself. If this
797 is a prefix index column shorter than 256 bytes,
798 this will waste one byte. */
799 extra_size += 2;
800 }
801 data_size += len;
802 }
803
804 /* If this is FTS index, we already populated the sort buffer, return
805 here */
806 if (index->type & DICT_FTS) {
807 DBUG_RETURN(n_row_added);
808 }
809
810 #ifdef UNIV_DEBUG
811 {
812 ulint size;
813 ulint extra;
814
815 size = rec_get_converted_size_temp(
816 index, entry->fields, n_fields, NULL, &extra);
817
818 ut_ad(data_size + extra_size == size);
819 ut_ad(extra_size == extra);
820 }
821 #endif /* UNIV_DEBUG */
822
823 /* Add to the total size of the record in row_merge_block_t
824 the encoded length of extra_size and the extra bytes (extra_size).
825 See row_merge_buf_write() for the variable-length encoding
826 of extra_size. */
827 data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
828
829 /* Record size can exceed page size while converting to
830 redundant row format. But there is assert
831 ut_ad(size < UNIV_PAGE_SIZE) in rec_offs_data_size().
832 It may hit the assert before attempting to insert the row. */
833 if (conv_heap != NULL && data_size > UNIV_PAGE_SIZE) {
834 *err = DB_TOO_BIG_RECORD;
835 }
836
837 ut_ad(data_size < srv_sort_buf_size);
838
839 /* Reserve one byte for the end marker of row_merge_block_t. */
840 if (buf->total_size + data_size >= srv_sort_buf_size - 1) {
841 DBUG_RETURN(0);
842 }
843
844 buf->total_size += data_size;
845 buf->n_tuples++;
846 n_row_added++;
847
848 field = entry->fields;
849
850 /* Copy the data fields. */
851
852 do {
853 dfield_dup(field++, buf->heap);
854 } while (--n_fields);
855
856 if (conv_heap != NULL) {
857 mem_heap_empty(conv_heap);
858 }
859
860 DBUG_RETURN(n_row_added);
861 }
862
863 /*************************************************************//**
864 Report a duplicate key. */
865 void
row_merge_dup_report(row_merge_dup_t * dup,const dfield_t * entry)866 row_merge_dup_report(
867 /*=================*/
868 row_merge_dup_t* dup, /*!< in/out: for reporting duplicates */
869 const dfield_t* entry) /*!< in: duplicate index entry */
870 {
871 if (!dup->n_dup++) {
872 /* Only report the first duplicate record,
873 but count all duplicate records. */
874 innobase_fields_to_mysql(dup->table, dup->index, entry);
875 }
876 }
877
878 /*************************************************************//**
879 Compare two tuples.
880 @return positive, 0, negative if a is greater, equal, less, than b,
881 respectively */
882 static MY_ATTRIBUTE((warn_unused_result))
883 int
row_merge_tuple_cmp(ulint n_uniq,ulint n_field,const mtuple_t & a,const mtuple_t & b,row_merge_dup_t * dup)884 row_merge_tuple_cmp(
885 /*================*/
886 ulint n_uniq, /*!< in: number of unique fields */
887 ulint n_field,/*!< in: number of fields */
888 const mtuple_t& a, /*!< in: first tuple to be compared */
889 const mtuple_t& b, /*!< in: second tuple to be compared */
890 row_merge_dup_t* dup) /*!< in/out: for reporting duplicates,
891 NULL if non-unique index */
892 {
893 int cmp;
894 const dfield_t* af = a.fields;
895 const dfield_t* bf = b.fields;
896 ulint n = n_uniq;
897
898 ut_ad(n_uniq > 0);
899 ut_ad(n_uniq <= n_field);
900
901 /* Compare the fields of the tuples until a difference is
902 found or we run out of fields to compare. If !cmp at the
903 end, the tuples are equal. */
904 do {
905 cmp = cmp_dfield_dfield(af++, bf++);
906 } while (!cmp && --n);
907
908 if (cmp) {
909 return(cmp);
910 }
911
912 if (dup) {
913 /* Report a duplicate value error if the tuples are
914 logically equal. NULL columns are logically inequal,
915 although they are equal in the sorting order. Find
916 out if any of the fields are NULL. */
917 for (const dfield_t* df = a.fields; df != af; df++) {
918 if (dfield_is_null(df)) {
919 goto no_report;
920 }
921 }
922
923 row_merge_dup_report(dup, a.fields);
924 }
925
926 no_report:
927 /* The n_uniq fields were equal, but we compare all fields so
928 that we will get the same (internal) order as in the B-tree. */
929 for (n = n_field - n_uniq + 1; --n; ) {
930 cmp = cmp_dfield_dfield(af++, bf++);
931 if (cmp) {
932 return(cmp);
933 }
934 }
935
936 /* This should never be reached, except in a secondary index
937 when creating a secondary index and a PRIMARY KEY, and there
938 is a duplicate in the PRIMARY KEY that has not been detected
939 yet. Internally, an index must never contain duplicates. */
940 return(cmp);
941 }
942
943 /** Wrapper for row_merge_tuple_sort() to inject some more context to
944 UT_SORT_FUNCTION_BODY().
945 @param tuples array of tuples that being sorted
946 @param aux work area, same size as tuples[]
947 @param low lower bound of the sorting area, inclusive
948 @param high upper bound of the sorting area, inclusive */
949 #define row_merge_tuple_sort_ctx(tuples, aux, low, high) \
950 row_merge_tuple_sort(n_uniq, n_field, dup, tuples, aux, low, high)
951 /** Wrapper for row_merge_tuple_cmp() to inject some more context to
952 UT_SORT_FUNCTION_BODY().
953 @param a first tuple to be compared
954 @param b second tuple to be compared
955 @return positive, 0, negative, if a is greater, equal, less, than b,
956 respectively */
957 #define row_merge_tuple_cmp_ctx(a,b) \
958 row_merge_tuple_cmp(n_uniq, n_field, a, b, dup)
959
960 /**********************************************************************//**
961 Merge sort the tuple buffer in main memory. */
962 static
963 void
row_merge_tuple_sort(ulint n_uniq,ulint n_field,row_merge_dup_t * dup,mtuple_t * tuples,mtuple_t * aux,ulint low,ulint high)964 row_merge_tuple_sort(
965 /*=================*/
966 ulint n_uniq, /*!< in: number of unique fields */
967 ulint n_field,/*!< in: number of fields */
968 row_merge_dup_t* dup, /*!< in/out: reporter of duplicates
969 (NULL if non-unique index) */
970 mtuple_t* tuples, /*!< in/out: tuples */
971 mtuple_t* aux, /*!< in/out: work area */
972 ulint low, /*!< in: lower bound of the
973 sorting area, inclusive */
974 ulint high) /*!< in: upper bound of the
975 sorting area, exclusive */
976 {
977 ut_ad(n_field > 0);
978 ut_ad(n_uniq <= n_field);
979
980 UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
981 tuples, aux, low, high, row_merge_tuple_cmp_ctx);
982 }
983
984 /******************************************************//**
985 Sort a buffer. */
986 void
row_merge_buf_sort(row_merge_buf_t * buf,row_merge_dup_t * dup)987 row_merge_buf_sort(
988 /*===============*/
989 row_merge_buf_t* buf, /*!< in/out: sort buffer */
990 row_merge_dup_t* dup) /*!< in/out: reporter of duplicates
991 (NULL if non-unique index) */
992 {
993 ut_ad(!dict_index_is_spatial(buf->index));
994
995 row_merge_tuple_sort(dict_index_get_n_unique(buf->index),
996 dict_index_get_n_fields(buf->index),
997 dup,
998 buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
999 }
1000
1001 /******************************************************//**
1002 Write a buffer to a block. */
1003 void
row_merge_buf_write(const row_merge_buf_t * buf,const merge_file_t * of UNIV_UNUSED,row_merge_block_t * block)1004 row_merge_buf_write(
1005 /*================*/
1006 const row_merge_buf_t* buf, /*!< in: sorted buffer */
1007 const merge_file_t* of UNIV_UNUSED,
1008 /*!< in: output file */
1009 row_merge_block_t* block) /*!< out: buffer for writing to file */
1010 {
1011 const dict_index_t* index = buf->index;
1012 ulint n_fields= dict_index_get_n_fields(index);
1013 byte* b = &block[0];
1014
1015 DBUG_ENTER("row_merge_buf_write");
1016
1017 for (ulint i = 0; i < buf->n_tuples; i++) {
1018 const mtuple_t* entry = &buf->tuples[i];
1019
1020 row_merge_buf_encode(&b, index, entry, n_fields);
1021 ut_ad(b < &block[srv_sort_buf_size]);
1022
1023 DBUG_PRINT("ib_merge_sort",
1024 ("%p,fd=%d,%lu %lu: %s",
1025 reinterpret_cast<const void*>(b), of->fd,
1026 ulong(of->offset), ulong(i),
1027 rec_printer(entry->fields,
1028 n_fields).str().c_str()));
1029 }
1030
1031 /* Write an "end-of-chunk" marker. */
1032 ut_a(b < &block[srv_sort_buf_size]);
1033 ut_a(b == &block[0] + buf->total_size);
1034 *b++ = 0;
1035 #ifdef UNIV_DEBUG_VALGRIND
1036 /* The rest of the block is uninitialized. Initialize it
1037 to avoid bogus warnings. */
1038 memset(b, 0xff, &block[srv_sort_buf_size] - b);
1039 #endif /* UNIV_DEBUG_VALGRIND */
1040 DBUG_PRINT("ib_merge_sort",
1041 ("write %p,%d,%lu EOF",
1042 reinterpret_cast<const void*>(b), of->fd,
1043 ulong(of->offset)));
1044 DBUG_VOID_RETURN;
1045 }
1046
1047 /******************************************************//**
1048 Create a memory heap and allocate space for row_merge_rec_offsets()
1049 and mrec_buf_t[3].
1050 @return memory heap */
1051 static
1052 mem_heap_t*
row_merge_heap_create(const dict_index_t * index,mrec_buf_t ** buf,ulint ** offsets1,ulint ** offsets2)1053 row_merge_heap_create(
1054 /*==================*/
1055 const dict_index_t* index, /*!< in: record descriptor */
1056 mrec_buf_t** buf, /*!< out: 3 buffers */
1057 ulint** offsets1, /*!< out: offsets */
1058 ulint** offsets2) /*!< out: offsets */
1059 {
1060 ulint i = 1 + REC_OFFS_HEADER_SIZE
1061 + dict_index_get_n_fields(index);
1062 mem_heap_t* heap = mem_heap_create(2 * i * sizeof **offsets1
1063 + 3 * sizeof **buf);
1064
1065 *buf = static_cast<mrec_buf_t*>(
1066 mem_heap_alloc(heap, 3 * sizeof **buf));
1067 *offsets1 = static_cast<ulint*>(
1068 mem_heap_alloc(heap, i * sizeof **offsets1));
1069 *offsets2 = static_cast<ulint*>(
1070 mem_heap_alloc(heap, i * sizeof **offsets2));
1071
1072 (*offsets1)[0] = (*offsets2)[0] = i;
1073 (*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
1074
1075 return(heap);
1076 }
1077
1078 /********************************************************************//**
1079 Read a merge block from the file system.
1080 @return TRUE if request was successful, FALSE if fail */
1081 ibool
row_merge_read(int fd,ulint offset,row_merge_block_t * buf,row_merge_block_t * crypt_buf,ulint space_id)1082 row_merge_read(
1083 /*===========*/
1084 int fd, /*!< in: file descriptor */
1085 ulint offset, /*!< in: offset where to read
1086 in number of row_merge_block_t
1087 elements */
1088 row_merge_block_t* buf, /*!< out: data */
1089 row_merge_block_t* crypt_buf, /*!< in: crypt buf or NULL */
1090 ulint space_id) /*!< in: tablespace id */
1091 {
1092 os_offset_t ofs = ((os_offset_t) offset) * srv_sort_buf_size;
1093 dberr_t err;
1094
1095 DBUG_ENTER("row_merge_read");
1096 DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
1097 DBUG_EXECUTE_IF("row_merge_read_failure", DBUG_RETURN(FALSE););
1098
1099 IORequest request;
1100
1101 /* Merge sort pages are never compressed. */
1102 request.disable_compression();
1103
1104 err = os_file_read_no_error_handling_int_fd(
1105 request,
1106 fd, buf, ofs, srv_sort_buf_size, NULL);
1107
1108 /* For encrypted tables, decrypt data after reading and copy data */
1109 if (err == DB_SUCCESS && log_tmp_is_encrypted()) {
1110
1111 if (log_tmp_block_decrypt(static_cast<const byte*>(buf),
1112 srv_sort_buf_size, static_cast<byte*>(crypt_buf),
1113 offset, space_id)) {
1114 srv_stats.n_merge_blocks_decrypted.inc();
1115 memcpy(buf, crypt_buf, srv_sort_buf_size);
1116 } else {
1117 err = DB_IO_DECRYPT_FAIL;
1118 }
1119 }
1120
1121 #ifdef POSIX_FADV_DONTNEED
1122 /* Each block is read exactly once. Free up the file cache. */
1123 posix_fadvise(fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
1124 #endif /* POSIX_FADV_DONTNEED */
1125
1126 if (err != DB_SUCCESS) {
1127 ib::error() << "Failed to read merge block at " << ofs;
1128 }
1129
1130 DBUG_RETURN(err == DB_SUCCESS);
1131 }
1132
1133 /********************************************************************//**
1134 Write a merge block to the file system.
1135 @return TRUE if request was successful, FALSE if fail */
1136 ibool
row_merge_write(int fd,ulint offset,void * buf,void * crypt_buf,ulint space_id)1137 row_merge_write(
1138 /*============*/
1139 int fd, /*!< in: file descriptor */
1140 ulint offset, /*!< in: offset where to write,
1141 in number of row_merge_block_t
1142 elements */
1143 void* buf, /*!< in/out: data */
1144 void* crypt_buf, /*!< in: crypt buf or NULL */
1145 ulint space_id) /*!< in: tablespace id */
1146 {
1147 size_t buf_len = srv_sort_buf_size;
1148 os_offset_t ofs = buf_len * (os_offset_t) offset;
1149 dberr_t err;
1150 void* out_buf = buf;
1151
1152 DBUG_ENTER("row_merge_write");
1153 DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
1154 DBUG_EXECUTE_IF("row_merge_write_failure", DBUG_RETURN(FALSE););
1155
1156 IORequest request(IORequest::WRITE);
1157
1158 request.disable_compression();
1159
1160 /* For encrypted tables, encrypt data before writing */
1161 if (log_tmp_is_encrypted()) {
1162 if (!log_tmp_block_encrypt(static_cast<const byte*>(buf),
1163 srv_sort_buf_size,
1164 static_cast<byte*>(crypt_buf),
1165 offset, space_id)) {
1166 ib::error() << "Failed encrypt block at " << offset;
1167 DBUG_RETURN(FALSE);
1168 }
1169 srv_stats.n_merge_blocks_encrypted.inc();
1170 out_buf = crypt_buf;
1171 }
1172
1173 err = os_file_write_int_fd(
1174 request,
1175 "(merge)", fd, out_buf, ofs, buf_len);
1176
1177 #ifdef POSIX_FADV_DONTNEED
1178 /* The block will be needed on the next merge pass,
1179 but it can be evicted from the file cache meanwhile. */
1180 posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
1181 #endif /* POSIX_FADV_DONTNEED */
1182
1183 DBUG_RETURN(err == DB_SUCCESS);
1184 }
1185
1186 /********************************************************************//**
1187 Read a merge record.
1188 @return pointer to next record, or NULL on I/O error or end of list */
1189 const byte*
row_merge_read_rec(row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,mrec_buf_t * buf,const byte * b,const dict_index_t * index,int fd,ulint * foffs,const mrec_t ** mrec,ulint * offsets)1190 row_merge_read_rec(
1191 /*===============*/
1192 row_merge_block_t* block, /*!< in/out: file buffer */
1193 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
1194 ulint space_id, /*!< in: tablespace id */
1195 mrec_buf_t* buf, /*!< in/out: secondary buffer */
1196 const byte* b, /*!< in: pointer to record */
1197 const dict_index_t* index, /*!< in: index of the record */
1198 int fd, /*!< in: file descriptor */
1199 ulint* foffs, /*!< in/out: file offset */
1200 const mrec_t** mrec, /*!< out: pointer to merge
1201 record, or NULL on end of list
1202 (non-NULL on I/O error) */
1203 ulint* offsets) /*!< out: offsets of mrec */
1204 {
1205 ulint extra_size;
1206 ulint data_size;
1207 ulint avail_size;
1208
1209 ut_ad(block);
1210 ut_ad(buf);
1211 ut_ad(b >= &block[0]);
1212 ut_ad(b < &block[srv_sort_buf_size]);
1213 ut_ad(index);
1214 ut_ad(foffs);
1215 ut_ad(mrec);
1216 ut_ad(offsets);
1217
1218 ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
1219 + dict_index_get_n_fields(index));
1220
1221 DBUG_ENTER("row_merge_read_rec");
1222
1223 extra_size = *b++;
1224
1225 if (UNIV_UNLIKELY(!extra_size)) {
1226 /* End of list */
1227 *mrec = NULL;
1228 DBUG_PRINT("ib_merge_sort",
1229 ("read %p,%p,%d,%lu EOF\n",
1230 reinterpret_cast<const void*>(b),
1231 reinterpret_cast<const void*>(block),
1232 fd, ulong(*foffs)));
1233 DBUG_RETURN(NULL);
1234 }
1235
1236 if (extra_size >= 0x80) {
1237 /* Read another byte of extra_size. */
1238
1239 if (UNIV_UNLIKELY(b >= &block[srv_sort_buf_size])) {
1240 if (!row_merge_read(fd, ++(*foffs), block,
1241 crypt_block, space_id)) {
1242 err_exit:
1243 /* Signal I/O error. */
1244 *mrec = b;
1245 DBUG_RETURN(NULL);
1246 }
1247
1248 /* Wrap around to the beginning of the buffer. */
1249 b = &block[0];
1250 }
1251
1252 extra_size = (extra_size & 0x7f) << 8;
1253 extra_size |= *b++;
1254 }
1255
1256 /* Normalize extra_size. Above, value 0 signals "end of list". */
1257 extra_size--;
1258
1259 /* Read the extra bytes. */
1260
1261 if (UNIV_UNLIKELY(b + extra_size >= &block[srv_sort_buf_size])) {
1262 /* The record spans two blocks. Copy the entire record
1263 to the auxiliary buffer and handle this as a special
1264 case. */
1265
1266 avail_size = &block[srv_sort_buf_size] - b;
1267 ut_ad(avail_size < sizeof *buf);
1268 memcpy(*buf, b, avail_size);
1269
1270 if (!row_merge_read(fd, ++(*foffs), block, crypt_block,
1271 space_id)) {
1272
1273 goto err_exit;
1274 }
1275
1276 /* Wrap around to the beginning of the buffer. */
1277 b = &block[0];
1278
1279 /* Copy the record. */
1280 memcpy(*buf + avail_size, b, extra_size - avail_size);
1281 b += extra_size - avail_size;
1282
1283 *mrec = *buf + extra_size;
1284
1285 rec_init_offsets_temp(*mrec, index, offsets);
1286
1287 data_size = rec_offs_data_size(offsets);
1288
1289 /* These overflows should be impossible given that
1290 records are much smaller than either buffer, and
1291 the record starts near the beginning of each buffer. */
1292 ut_a(extra_size + data_size < sizeof *buf);
1293 ut_a(b + data_size < &block[srv_sort_buf_size]);
1294
1295 /* Copy the data bytes. */
1296 memcpy(*buf + extra_size, b, data_size);
1297 b += data_size;
1298
1299 goto func_exit;
1300 }
1301
1302 *mrec = b + extra_size;
1303
1304 rec_init_offsets_temp(*mrec, index, offsets);
1305
1306 data_size = rec_offs_data_size(offsets);
1307 ut_ad(extra_size + data_size < sizeof *buf);
1308
1309 b += extra_size + data_size;
1310
1311 if (UNIV_LIKELY(b < &block[srv_sort_buf_size])) {
1312 /* The record fits entirely in the block.
1313 This is the normal case. */
1314 goto func_exit;
1315 }
1316
1317 /* The record spans two blocks. Copy it to buf. */
1318
1319 b -= extra_size + data_size;
1320 avail_size = &block[srv_sort_buf_size] - b;
1321 memcpy(*buf, b, avail_size);
1322 *mrec = *buf + extra_size;
1323
1324 /* We cannot invoke rec_offs_make_valid() here, because there
1325 are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
1326 Similarly, rec_offs_validate() would fail, because it invokes
1327 rec_get_status(). */
1328 ut_d(offsets[2] = (ulint) *mrec);
1329 ut_d(offsets[3] = (ulint) index);
1330
1331 if (!row_merge_read(fd, ++(*foffs), block, crypt_block, space_id)) {
1332
1333 goto err_exit;
1334 }
1335
1336 /* Wrap around to the beginning of the buffer. */
1337 b = &block[0];
1338
1339 /* Copy the rest of the record. */
1340 memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
1341 b += extra_size + data_size - avail_size;
1342
1343 func_exit:
1344 DBUG_PRINT("ib_merge_sort",
1345 ("%p,%p,fd=%d,%lu: %s",
1346 reinterpret_cast<const void*>(b),
1347 reinterpret_cast<const void*>(block),
1348 fd, ulong(*foffs),
1349 rec_printer(*mrec, 0, offsets).str().c_str()));
1350 DBUG_RETURN(b);
1351 }
1352
1353 /********************************************************************//**
1354 Write a merge record. */
1355 static
1356 void
row_merge_write_rec_low(byte * b,ulint e,ulint size,int fd,ulint foffs,const mrec_t * mrec,const ulint * offsets)1357 row_merge_write_rec_low(
1358 /*====================*/
1359 byte* b, /*!< out: buffer */
1360 ulint e, /*!< in: encoded extra_size */
1361 #ifndef NDEBUG
1362 ulint size, /*!< in: total size to write */
1363 int fd, /*!< in: file descriptor */
1364 ulint foffs, /*!< in: file offset */
1365 #endif /* !NDEBUG */
1366 const mrec_t* mrec, /*!< in: record to write */
1367 const ulint* offsets)/*!< in: offsets of mrec */
1368 #ifdef NDEBUG
1369 # define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets) \
1370 row_merge_write_rec_low(b, e, mrec, offsets)
1371 #endif /* NDEBUG */
1372 {
1373 DBUG_ENTER("row_merge_write_rec_low");
1374
1375 #ifndef NDEBUG
1376 const byte* const end = b + size;
1377 #endif /* NDEBUG */
1378 assert(e == rec_offs_extra_size(offsets) + 1);
1379 DBUG_PRINT("ib_merge_sort",
1380 ("%p,fd=%d,%lu: %s",
1381 reinterpret_cast<const void*>(b), fd, ulong(foffs),
1382 rec_printer(mrec, 0, offsets).str().c_str()));
1383
1384 if (e < 0x80) {
1385 *b++ = (byte) e;
1386 } else {
1387 *b++ = (byte) (0x80 | (e >> 8));
1388 *b++ = (byte) e;
1389 }
1390
1391 memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
1392 assert(b + rec_offs_size(offsets) == end);
1393 DBUG_VOID_RETURN;
1394 }
1395
1396 /********************************************************************//**
1397 Write a merge record.
1398 @return pointer to end of block, or NULL on error */
1399 static
1400 byte*
row_merge_write_rec(row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,mrec_buf_t * buf,byte * b,int fd,ulint * foffs,const mrec_t * mrec,const ulint * offsets)1401 row_merge_write_rec(
1402 /*================*/
1403 row_merge_block_t* block, /*!< in/out: file buffer */
1404 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
1405 ulint space_id, /*!< in: tablespace id */
1406 mrec_buf_t* buf, /*!< in/out: secondary buffer */
1407 byte* b, /*!< in: pointer to end of
1408 block */
1409 int fd, /*!< in: file descriptor */
1410 ulint* foffs, /*!< in/out: file offset */
1411 const mrec_t* mrec, /*!< in: record to write */
1412 const ulint* offsets) /*!< in: offsets of mrec */
1413 {
1414 ulint extra_size;
1415 ulint size;
1416 ulint avail_size;
1417
1418 ut_ad(block);
1419 ut_ad(buf);
1420 ut_ad(b >= &block[0]);
1421 ut_ad(b < &block[srv_sort_buf_size]);
1422 ut_ad(mrec);
1423 ut_ad(foffs);
1424 ut_ad(mrec < &block[0] || mrec > &block[srv_sort_buf_size]);
1425 ut_ad(mrec < buf[0] || mrec > buf[1]);
1426
1427 /* Normalize extra_size. Value 0 signals "end of list". */
1428 extra_size = rec_offs_extra_size(offsets) + 1;
1429
1430 size = extra_size + (extra_size >= 0x80)
1431 + rec_offs_data_size(offsets);
1432
1433 if (UNIV_UNLIKELY(b + size >= &block[srv_sort_buf_size])) {
1434 /* The record spans two blocks.
1435 Copy it to the temporary buffer first. */
1436 avail_size = &block[srv_sort_buf_size] - b;
1437
1438 row_merge_write_rec_low(buf[0],
1439 extra_size, size, fd, *foffs,
1440 mrec, offsets);
1441
1442 /* Copy the head of the temporary buffer, write
1443 the completed block, and copy the tail of the
1444 record to the head of the new block. */
1445 memcpy(b, buf[0], avail_size);
1446
1447 if (!row_merge_write(fd, (*foffs)++, block, crypt_block,
1448 space_id)) {
1449 return(NULL);
1450 }
1451
1452 UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1453
1454 if (crypt_block) {
1455 UNIV_MEM_INVALID(&crypt_block[0], srv_sort_buf_size);
1456 }
1457
1458 /* Copy the rest. */
1459 b = &block[0];
1460 memcpy(b, buf[0] + avail_size, size - avail_size);
1461 b += size - avail_size;
1462 } else {
1463 row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
1464 mrec, offsets);
1465 b += size;
1466 }
1467
1468 return(b);
1469 }
1470
1471 /********************************************************************//**
1472 Write an end-of-list marker.
1473 @return pointer to end of block, or NULL on error */
1474 static
1475 byte*
row_merge_write_eof(row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,byte * b,int fd,ulint * foffs)1476 row_merge_write_eof(
1477 /*================*/
1478 row_merge_block_t* block, /*!< in/out: file buffer */
1479 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
1480 ulint space_id, /*!< in: tablespace id */
1481 byte* b, /*!< in: pointer to end of
1482 block */
1483 int fd, /*!< in: file descriptor */
1484 ulint* foffs) /*!< in/out: file offset */
1485 {
1486 ut_ad(block);
1487 ut_ad(b >= &block[0]);
1488 ut_ad(b < &block[srv_sort_buf_size]);
1489 ut_ad(foffs);
1490
1491 DBUG_ENTER("row_merge_write_eof");
1492 DBUG_PRINT("ib_merge_sort",
1493 ("%p,%p,fd=%d,%lu",
1494 reinterpret_cast<const void*>(b),
1495 reinterpret_cast<const void*>(block),
1496 fd, ulong(*foffs)));
1497
1498 *b++ = 0;
1499 UNIV_MEM_ASSERT_RW(&block[0], b - &block[0]);
1500 UNIV_MEM_ASSERT_W(&block[0], srv_sort_buf_size);
1501 #ifdef UNIV_DEBUG_VALGRIND
1502 /* The rest of the block is uninitialized. Initialize it
1503 to avoid bogus warnings. */
1504 memset(b, 0xff, &block[srv_sort_buf_size] - b);
1505 #endif /* UNIV_DEBUG_VALGRIND */
1506
1507 if (!row_merge_write(fd, (*foffs)++, block, crypt_block, space_id)) {
1508 DBUG_RETURN(NULL);
1509 }
1510
1511 UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1512 DBUG_RETURN(&block[0]);
1513 }
1514
1515 /** Create a temporary file if it has not been created already.
1516 @param[in,out] tmpfd temporary file handle
1517 @param[in] path location for creating temporary file
1518 @return file descriptor, or -1 on failure */
1519 static MY_ATTRIBUTE((warn_unused_result))
1520 int
row_merge_tmpfile_if_needed(int * tmpfd,const char * path)1521 row_merge_tmpfile_if_needed(
1522 int* tmpfd,
1523 const char* path)
1524 {
1525 if (*tmpfd < 0) {
1526 *tmpfd = row_merge_file_create_low(path);
1527 if (*tmpfd >= 0) {
1528 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1529 }
1530 }
1531
1532 return(*tmpfd);
1533 }
1534
1535 /** Create a temporary file for merge sort if it was not created already.
1536 @param[in,out] file merge file structure
1537 @param[in] nrec number of records in the file
1538 @param[in] path location for creating temporary file
1539 @return file descriptor, or -1 on failure */
1540 static MY_ATTRIBUTE((warn_unused_result))
1541 int
row_merge_file_create_if_needed(merge_file_t * file,int * tmpfd,ulint nrec,const char * path)1542 row_merge_file_create_if_needed(
1543 merge_file_t* file,
1544 int* tmpfd,
1545 ulint nrec,
1546 const char* path)
1547 {
1548 ut_ad(file->fd < 0 || *tmpfd >=0);
1549 if (file->fd < 0 && row_merge_file_create(file, path) >= 0) {
1550 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1551 if (row_merge_tmpfile_if_needed(tmpfd, path) < 0) {
1552 return(-1);
1553 }
1554
1555 file->n_rec = nrec;
1556 }
1557
1558 ut_ad(file->fd < 0 || *tmpfd >=0);
1559 return(file->fd);
1560 }
1561
1562 /** Copy the merge data tuple from another merge data tuple.
1563 @param[in] mtuple source merge data tuple
1564 @param[in,out] prev_mtuple destination merge data tuple
1565 @param[in] n_unique number of unique fields exist in the mtuple
1566 @param[in,out] heap memory heap where last_mtuple allocated */
1567 static
1568 void
row_mtuple_create(const mtuple_t * mtuple,mtuple_t * prev_mtuple,ulint n_unique,mem_heap_t * heap)1569 row_mtuple_create(
1570 const mtuple_t* mtuple,
1571 mtuple_t* prev_mtuple,
1572 ulint n_unique,
1573 mem_heap_t* heap)
1574 {
1575 memcpy(prev_mtuple->fields, mtuple->fields,
1576 n_unique * sizeof *mtuple->fields);
1577
1578 dfield_t* field = prev_mtuple->fields;
1579
1580 for (ulint i = 0; i < n_unique; i++) {
1581 dfield_dup(field++, heap);
1582 }
1583 }
1584
1585 /** Compare two merge data tuples.
1586 @param[in] prev_mtuple merge data tuple
1587 @param[in] current_mtuple merge data tuple
1588 @param[in,out] dup reporter of duplicates
1589 @retval positive, 0, negative if current_mtuple is greater, equal, less, than
1590 last_mtuple. */
1591 static
1592 int
row_mtuple_cmp(const mtuple_t * prev_mtuple,const mtuple_t * current_mtuple,row_merge_dup_t * dup)1593 row_mtuple_cmp(
1594 const mtuple_t* prev_mtuple,
1595 const mtuple_t* current_mtuple,
1596 row_merge_dup_t* dup)
1597 {
1598 ut_ad(dict_index_is_clust(dup->index));
1599 const ulint n_unique = dict_index_get_n_unique(dup->index);
1600
1601 return(row_merge_tuple_cmp(
1602 n_unique, n_unique, *current_mtuple, *prev_mtuple, dup));
1603 }
1604
1605 /** Insert cached spatial index rows.
1606 @param[in] trx_id transaction id
1607 @param[in] sp_tuples cached spatial rows
1608 @param[in] num_spatial number of spatial indexes
1609 @param[in,out] row_heap heap for insert
1610 @param[in,out] sp_heap heap for tuples
1611 @param[in,out] pcur cluster index cursor
1612 @param[in,out] mtr mini transaction
1613 @param[in,out] mtr_committed whether scan_mtr got committed
1614 @return DB_SUCCESS or error number */
1615 static
1616 dberr_t
row_merge_spatial_rows(trx_id_t trx_id,index_tuple_info_t ** sp_tuples,ulint num_spatial,mem_heap_t * row_heap,mem_heap_t * sp_heap,btr_pcur_t * pcur,mtr_t * mtr,bool * mtr_committed)1617 row_merge_spatial_rows(
1618 trx_id_t trx_id,
1619 index_tuple_info_t** sp_tuples,
1620 ulint num_spatial,
1621 mem_heap_t* row_heap,
1622 mem_heap_t* sp_heap,
1623 btr_pcur_t* pcur,
1624 mtr_t* mtr,
1625 bool* mtr_committed)
1626 {
1627 dberr_t err = DB_SUCCESS;
1628
1629 if (sp_tuples == NULL) {
1630 return(DB_SUCCESS);
1631 }
1632
1633 ut_ad(sp_heap != NULL);
1634
1635 for (ulint j = 0; j < num_spatial; j++) {
1636 err = sp_tuples[j]->insert(
1637 trx_id, row_heap,
1638 pcur, mtr, mtr_committed);
1639
1640 if (err != DB_SUCCESS) {
1641 return(err);
1642 }
1643 }
1644
1645 mem_heap_empty(sp_heap);
1646
1647 return(err);
1648 }
1649
1650 /** Check if the geometry field is valid.
1651 @param[in] row the row
1652 @param[in] index spatial index
1653 @return true if it's valid, false if it's invalid. */
1654 static
1655 bool
row_geo_field_is_valid(const dtuple_t * row,dict_index_t * index)1656 row_geo_field_is_valid(
1657 const dtuple_t* row,
1658 dict_index_t* index)
1659 {
1660 const dict_field_t* ind_field
1661 = dict_index_get_nth_field(index, 0);
1662 const dict_col_t* col
1663 = ind_field->col;
1664 ulint col_no
1665 = dict_col_get_no(col);
1666 const dfield_t* dfield
1667 = dtuple_get_nth_field(row, col_no);
1668
1669 if (dfield_is_null(dfield)
1670 || dfield_get_len(dfield) < GEO_DATA_HEADER_SIZE) {
1671 return(false);
1672 }
1673
1674 return(true);
1675 }
1676
1677 /** Reads clustered index of the table and create temporary files
1678 containing the index entries for the indexes to be built.
1679 @param[in] trx transaction
1680 @param[in,out] table MySQL table object, for reporting erroneous
1681 records
1682 @param[in] old_table table where rows are read from
1683 @param[in] new_table table where indexes are created; identical to
1684 old_table unless creating a PRIMARY KEY
1685 @param[in] online true if creating indexes online
1686 @param[in] index indexes to be created
1687 @param[in] fts_sort_idx full-text index to be created, or NULL
1688 @param[in] psort_info parallel sort info for fts_sort_idx creation,
1689 or NULL
1690 @param[in] files temporary files
1691 @param[in] key_numbers MySQL key numbers to create
1692 @param[in] n_index number of indexes to create
1693 @param[in] add_cols default values of added columns, or NULL
1694 @param[in] add_v newly added virtual columns along with indexes
1695 @param[in] col_map mapping of old column numbers to new ones, or
1696 NULL if old_table == new_table
1697 @param[in] add_autoinc number of added AUTO_INCREMENT columns, or
1698 ULINT_UNDEFINED if none is added
1699 @param[in,out] sequence autoinc sequence
1700 @param[in,out] block file buffer
1701 @param[in,out] crypt_block encrypted file buffer
1702 @param[in] skip_pk_sort whether the new PRIMARY KEY will follow
1703 existing order
1704 @param[in,out] tmpfd temporary file handle
1705 @param[in,out] stage performance schema accounting object, used by
1706 ALTER TABLE. stage->n_pk_recs_inc() will be called for each record read and
1707 stage->inc() will be called for each page read.
1708 @param[in] eval_table mysql table used to evaluate virtual column
1709 value, see innobase_get_computed_value().
1710 @param[in] prebuilt compress_heap must be taken from here
1711 @return DB_SUCCESS or error */
1712 static MY_ATTRIBUTE((warn_unused_result))
1713 dberr_t
row_merge_read_clustered_index(trx_t * trx,struct TABLE * table,const dict_table_t * old_table,const dict_table_t * new_table,bool online,dict_index_t ** index,dict_index_t * fts_sort_idx,fts_psort_t * psort_info,merge_file_t * files,const ulint * key_numbers,ulint n_index,const dtuple_t * add_cols,const dict_add_v_col_t * add_v,const ulint * col_map,ulint add_autoinc,ib_sequence_t & sequence,row_merge_block_t * block,row_merge_block_t * crypt_block,bool skip_pk_sort,int * tmpfd,ut_stage_alter_t * stage,struct TABLE * eval_table,row_prebuilt_t * prebuilt)1714 row_merge_read_clustered_index(
1715 trx_t* trx,
1716 struct TABLE* table,
1717 const dict_table_t* old_table,
1718 const dict_table_t* new_table,
1719 bool online,
1720 dict_index_t** index,
1721 dict_index_t* fts_sort_idx,
1722 fts_psort_t* psort_info,
1723 merge_file_t* files,
1724 const ulint* key_numbers,
1725 ulint n_index,
1726 const dtuple_t* add_cols,
1727 const dict_add_v_col_t* add_v,
1728 const ulint* col_map,
1729 ulint add_autoinc,
1730 ib_sequence_t& sequence,
1731 row_merge_block_t* block,
1732 row_merge_block_t* crypt_block,
1733 bool skip_pk_sort,
1734 int* tmpfd,
1735 ut_stage_alter_t* stage,
1736 struct TABLE* eval_table,
1737 row_prebuilt_t* prebuilt)
1738 {
1739 dict_index_t* clust_index; /* Clustered index */
1740 mem_heap_t* row_heap; /* Heap memory to create
1741 clustered index tuples */
1742 row_merge_buf_t** merge_buf; /* Temporary list for records*/
1743 mem_heap_t* v_heap = NULL; /* Heap memory to process large
1744 data for virtual column */
1745 btr_pcur_t pcur; /* Cursor on the clustered
1746 index */
1747 mtr_t mtr; /* Mini transaction */
1748 dberr_t err = DB_SUCCESS;/* Return code */
1749 ulint n_nonnull = 0; /* number of columns
1750 changed to NOT NULL */
1751 ulint* nonnull = NULL; /* NOT NULL columns */
1752 dict_index_t* fts_index = NULL;/* FTS index */
1753 doc_id_t doc_id = 0;
1754 doc_id_t max_doc_id = 0;
1755 ibool add_doc_id = FALSE;
1756 os_event_t fts_parallel_sort_event = NULL;
1757 ibool fts_pll_sort = FALSE;
1758 int64_t sig_count = 0;
1759 index_tuple_info_t** sp_tuples = NULL;
1760 mem_heap_t* sp_heap = NULL;
1761 ulint num_spatial = 0;
1762 BtrBulk* clust_btr_bulk = NULL;
1763 bool clust_temp_file = false;
1764 mem_heap_t* mtuple_heap = NULL;
1765 mtuple_t prev_mtuple;
1766 mem_heap_t* conv_heap = NULL;
1767 FlushObserver* observer = trx->flush_observer;
1768 DBUG_ENTER("row_merge_read_clustered_index");
1769
1770 ut_ad((old_table == new_table) == !col_map);
1771 ut_ad(!add_cols || col_map);
1772
1773 trx->op_info = "reading clustered index";
1774
1775 #ifdef FTS_INTERNAL_DIAG_PRINT
1776 DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
1777 #endif
1778
1779 /* Create and initialize memory for record buffers */
1780
1781 merge_buf = static_cast<row_merge_buf_t**>(
1782 ut_malloc_nokey(n_index * sizeof *merge_buf));
1783
1784 row_merge_dup_t clust_dup = {index[0], table, col_map, 0};
1785 dfield_t* prev_fields;
1786 const ulint n_uniq = dict_index_get_n_unique(index[0]);
1787
1788 ut_ad(trx->mysql_thd != NULL);
1789
1790 const char* path = thd_innodb_tmpdir(trx->mysql_thd);
1791
1792 ut_ad(!skip_pk_sort || dict_index_is_clust(index[0]));
1793 /* There is no previous tuple yet. */
1794 prev_mtuple.fields = NULL;
1795
1796 for (ulint i = 0; i < n_index; i++) {
1797 if (index[i]->type & DICT_FTS) {
1798
1799 /* We are building a FT index, make sure
1800 we have the temporary 'fts_sort_idx' */
1801 ut_a(fts_sort_idx);
1802
1803 fts_index = index[i];
1804
1805 merge_buf[i] = row_merge_buf_create(fts_sort_idx);
1806
1807 add_doc_id = DICT_TF2_FLAG_IS_SET(
1808 new_table, DICT_TF2_FTS_ADD_DOC_ID);
1809
1810 /* If Doc ID does not exist in the table itself,
1811 fetch the first FTS Doc ID */
1812 if (add_doc_id) {
1813 fts_get_next_doc_id(
1814 (dict_table_t*) new_table,
1815 &doc_id);
1816 ut_ad(doc_id > 0);
1817 }
1818
1819 fts_pll_sort = TRUE;
1820 row_fts_start_psort(psort_info);
1821 fts_parallel_sort_event =
1822 psort_info[0].psort_common->sort_event;
1823 } else {
1824 if (dict_index_is_spatial(index[i])) {
1825 num_spatial++;
1826 }
1827
1828 merge_buf[i] = row_merge_buf_create(index[i]);
1829 }
1830 }
1831
1832 if (num_spatial > 0) {
1833 ulint count = 0;
1834
1835 sp_heap = mem_heap_create(512);
1836
1837 sp_tuples = static_cast<index_tuple_info_t**>(
1838 ut_malloc_nokey(num_spatial
1839 * sizeof(*sp_tuples)));
1840
1841 for (ulint i = 0; i < n_index; i++) {
1842 if (dict_index_is_spatial(index[i])) {
1843 sp_tuples[count]
1844 = UT_NEW_NOKEY(
1845 index_tuple_info_t(
1846 sp_heap,
1847 index[i]));
1848 count++;
1849 }
1850 }
1851
1852 ut_ad(count == num_spatial);
1853 }
1854
1855 mtr_start(&mtr);
1856
1857 /* Find the clustered index and create a persistent cursor
1858 based on that. */
1859
1860 clust_index = dict_table_get_first_index(old_table);
1861
1862 btr_pcur_open_at_index_side(
1863 true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
1864
1865 if (old_table != new_table) {
1866 /* The table is being rebuilt. Identify the columns
1867 that were flagged NOT NULL in the new table, so that
1868 we can quickly check that the records in the old table
1869 do not violate the added NOT NULL constraints. */
1870
1871 nonnull = static_cast<ulint*>(
1872 ut_malloc_nokey(dict_table_get_n_cols(new_table)
1873 * sizeof *nonnull));
1874
1875 for (ulint i = 0; i < dict_table_get_n_cols(old_table); i++) {
1876 if (dict_table_get_nth_col(old_table, i)->prtype
1877 & DATA_NOT_NULL) {
1878 continue;
1879 }
1880
1881 const ulint j = col_map[i];
1882
1883 if (j == ULINT_UNDEFINED) {
1884 /* The column was dropped. */
1885 continue;
1886 }
1887
1888 if (dict_table_get_nth_col(new_table, j)->prtype
1889 & DATA_NOT_NULL) {
1890 nonnull[n_nonnull++] = j;
1891 }
1892 }
1893
1894 if (!n_nonnull) {
1895 ut_free(nonnull);
1896 nonnull = NULL;
1897 }
1898 }
1899
1900 row_heap = mem_heap_create(sizeof(mrec_buf_t));
1901
1902 if (dict_table_is_comp(old_table)
1903 && !dict_table_is_comp(new_table)) {
1904 conv_heap = mem_heap_create(sizeof(mrec_buf_t));
1905 }
1906
1907 if (skip_pk_sort) {
1908 prev_fields = static_cast<dfield_t*>(
1909 ut_malloc_nokey(n_uniq * sizeof *prev_fields));
1910 mtuple_heap = mem_heap_create(sizeof(mrec_buf_t));
1911 } else {
1912 prev_fields = NULL;
1913 }
1914
1915 /* Scan the clustered index. */
1916 for (;;) {
1917 const rec_t* rec;
1918 ulint* offsets;
1919 const dtuple_t* row;
1920 row_ext_t* ext;
1921 page_cur_t* cur = btr_pcur_get_page_cur(&pcur);
1922
1923 mem_heap_empty(row_heap);
1924
1925 /* Do not continue if table pages are still encrypted */
1926 if (!old_table->is_readable() ||
1927 !new_table->is_readable()) {
1928 err = DB_DECRYPTION_FAILED;
1929 trx->error_key_num = 0;
1930 goto func_exit;
1931 }
1932
1933 page_cur_move_to_next(cur);
1934
1935 stage->n_pk_recs_inc();
1936
1937 if (page_cur_is_after_last(cur)) {
1938
1939 stage->inc();
1940
1941 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
1942 err = DB_INTERRUPTED;
1943 trx->error_key_num = 0;
1944 goto func_exit;
1945 }
1946
1947 if (online && old_table != new_table) {
1948 err = row_log_table_get_error(clust_index);
1949 if (err != DB_SUCCESS) {
1950 trx->error_key_num = 0;
1951 goto func_exit;
1952 }
1953 }
1954
1955 #ifdef NDEBUG
1956 # define dbug_run_purge false
1957 #else /* NDEBUG */
1958 bool dbug_run_purge = false;
1959 #endif /* NDEBUG */
1960 DBUG_EXECUTE_IF(
1961 "ib_purge_on_create_index_page_switch",
1962 dbug_run_purge = true;);
1963
1964 /* Insert the cached spatial index rows. */
1965 bool mtr_committed = false;
1966
1967 err = row_merge_spatial_rows(
1968 trx->id, sp_tuples, num_spatial,
1969 row_heap, sp_heap, &pcur,
1970 &mtr, &mtr_committed);
1971
1972 if (err != DB_SUCCESS) {
1973 goto func_exit;
1974 }
1975
1976 if (mtr_committed) {
1977 goto scan_next;
1978 }
1979
1980 if (dbug_run_purge
1981 || rw_lock_get_waiters(
1982 dict_index_get_lock(clust_index))) {
1983 /* There are waiters on the clustered
1984 index tree lock, likely the purge
1985 thread. Store and restore the cursor
1986 position, and yield so that scanning a
1987 large table will not starve other
1988 threads. */
1989
1990 /* Store the cursor position on the last user
1991 record on the page. */
1992 btr_pcur_move_to_prev_on_page(&pcur);
1993 /* Leaf pages must never be empty, unless
1994 this is the only page in the index tree. */
1995 ut_ad(btr_pcur_is_on_user_rec(&pcur)
1996 || btr_pcur_get_block(
1997 &pcur)->page.id.page_no()
1998 == clust_index->page);
1999
2000 btr_pcur_store_position(&pcur, &mtr);
2001 mtr_commit(&mtr);
2002
2003 if (dbug_run_purge) {
2004 /* This is for testing
2005 purposes only (see
2006 DBUG_EXECUTE_IF above). We
2007 signal the purge thread and
2008 hope that the purge batch will
2009 complete before we execute
2010 btr_pcur_restore_position(). */
2011 trx_purge_run();
2012 os_thread_sleep(1000000);
2013 }
2014
2015 /* Give the waiters a chance to proceed. */
2016 os_thread_yield();
2017 scan_next:
2018 mtr_start(&mtr);
2019 /* Restore position on the record, or its
2020 predecessor if the record was purged
2021 meanwhile. */
2022 btr_pcur_restore_position(
2023 BTR_SEARCH_LEAF, &pcur, &mtr);
2024 /* Move to the successor of the
2025 original record. */
2026 if (!btr_pcur_move_to_next_user_rec(
2027 &pcur, &mtr)) {
2028 end_of_index:
2029 row = NULL;
2030 mtr_commit(&mtr);
2031 mem_heap_free(row_heap);
2032 ut_free(nonnull);
2033 goto write_buffers;
2034 }
2035 } else {
2036 ulint next_page_no;
2037 buf_block_t* block;
2038
2039 next_page_no = btr_page_get_next(
2040 page_cur_get_page(cur), &mtr);
2041
2042 if (next_page_no == FIL_NULL) {
2043 goto end_of_index;
2044 }
2045
2046 block = page_cur_get_block(cur);
2047 block = btr_block_get(
2048 page_id_t(block->page.id.space(),
2049 next_page_no),
2050 block->page.size,
2051 BTR_SEARCH_LEAF,
2052 clust_index, &mtr);
2053
2054 btr_leaf_page_release(page_cur_get_block(cur),
2055 BTR_SEARCH_LEAF, &mtr);
2056 page_cur_set_before_first(block, cur);
2057 page_cur_move_to_next(cur);
2058
2059 ut_ad(!page_cur_is_after_last(cur));
2060 }
2061 }
2062
2063 rec = page_cur_get_rec(cur);
2064
2065 SRV_CORRUPT_TABLE_CHECK(rec,
2066 {
2067 err = DB_CORRUPTION;
2068 goto func_exit;
2069 });
2070
2071 offsets = rec_get_offsets(rec, clust_index, NULL,
2072 ULINT_UNDEFINED, &row_heap);
2073
2074 if (online) {
2075 /* Perform a REPEATABLE READ.
2076
2077 When rebuilding the table online,
2078 row_log_table_apply() must not see a newer
2079 state of the table when applying the log.
2080 This is mainly to prevent false duplicate key
2081 errors, because the log will identify records
2082 by the PRIMARY KEY, and also to prevent unsafe
2083 BLOB access.
2084
2085 When creating a secondary index online, this
2086 table scan must not see records that have only
2087 been inserted to the clustered index, but have
2088 not been written to the online_log of
2089 index[]. If we performed READ UNCOMMITTED, it
2090 could happen that the ADD INDEX reaches
2091 ONLINE_INDEX_COMPLETE state between the time
2092 the DML thread has updated the clustered index
2093 but has not yet accessed secondary index. */
2094 ut_ad(MVCC::is_view_active(trx->read_view));
2095
2096 if (!trx->read_view->changes_visible(
2097 row_get_rec_trx_id(
2098 rec, clust_index, offsets),
2099 old_table->name)) {
2100 rec_t* old_vers;
2101
2102 row_vers_build_for_consistent_read(
2103 rec, &mtr, clust_index, &offsets,
2104 trx->read_view, &row_heap,
2105 row_heap, &old_vers, NULL);
2106
2107 rec = old_vers;
2108
2109 if (!rec) {
2110 continue;
2111 }
2112 }
2113
2114 if (rec_get_deleted_flag(
2115 rec,
2116 dict_table_is_comp(old_table))) {
2117 /* This record was deleted in the latest
2118 committed version, or it was deleted and
2119 then reinserted-by-update before purge
2120 kicked in. Skip it. */
2121 continue;
2122 }
2123
2124 ut_ad(!rec_offs_any_null_extern(rec, offsets));
2125 } else if (rec_get_deleted_flag(
2126 rec, dict_table_is_comp(old_table))) {
2127 /* Skip delete-marked records.
2128
2129 Skipping delete-marked records will make the
2130 created indexes unuseable for transactions
2131 whose read views were created before the index
2132 creation completed, but preserving the history
2133 would make it tricky to detect duplicate
2134 keys. */
2135 continue;
2136 }
2137
2138 /* When !online, we are holding a lock on old_table, preventing
2139 any inserts that could have written a record 'stub' before
2140 writing out off-page columns. */
2141 ut_ad(!rec_offs_any_null_extern(rec, offsets));
2142
2143 /* Build a row based on the clustered index. */
2144
2145 row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index,
2146 rec, offsets, new_table,
2147 add_cols, add_v, col_map, &ext,
2148 row_heap);
2149 ut_ad(row);
2150
2151 for (ulint i = 0; i < n_nonnull; i++) {
2152 const dfield_t* field = &row->fields[nonnull[i]];
2153
2154 ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL);
2155
2156 if (dfield_is_null(field)) {
2157 err = DB_INVALID_NULL;
2158 trx->error_key_num = 0;
2159 goto func_exit;
2160 }
2161 }
2162
2163 /* Get the next Doc ID */
2164 if (add_doc_id) {
2165 doc_id++;
2166 } else {
2167 doc_id = 0;
2168 }
2169
2170 if (add_autoinc != ULINT_UNDEFINED) {
2171
2172 ut_ad(add_autoinc
2173 < dict_table_get_n_user_cols(new_table));
2174
2175 const dfield_t* dfield;
2176
2177 dfield = dtuple_get_nth_field(row, add_autoinc);
2178 if (dfield_is_null(dfield)) {
2179 goto write_buffers;
2180 }
2181
2182 const dtype_t* dtype = dfield_get_type(dfield);
2183 byte* b = static_cast<byte*>(dfield_get_data(dfield));
2184
2185 if (sequence.eof()) {
2186 err = DB_ERROR;
2187 trx->error_key_num = 0;
2188
2189 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
2190 ER_AUTOINC_READ_FAILED, "[NULL]");
2191
2192 goto func_exit;
2193 }
2194
2195 ulonglong value = sequence++;
2196
2197 switch (dtype_get_mtype(dtype)) {
2198 case DATA_INT: {
2199 ibool usign;
2200 ulint len = dfield_get_len(dfield);
2201
2202 usign = dtype_get_prtype(dtype) & DATA_UNSIGNED;
2203 mach_write_ulonglong(b, value, len, usign);
2204
2205 break;
2206 }
2207
2208 case DATA_FLOAT:
2209 mach_float_write(
2210 b, static_cast<float>(value));
2211 break;
2212
2213 case DATA_DOUBLE:
2214 mach_double_write(
2215 b, static_cast<double>(value));
2216 break;
2217
2218 default:
2219 ut_ad(0);
2220 }
2221 }
2222
2223 write_buffers:
2224 /* Build all entries for all the indexes to be created
2225 in a single scan of the clustered index. */
2226
2227 ulint s_idx_cnt = 0;
2228 bool skip_sort = skip_pk_sort
2229 && dict_index_is_clust(merge_buf[0]->index);
2230
2231 for (ulint i = 0; i < n_index; i++, skip_sort = false) {
2232 row_merge_buf_t* buf = merge_buf[i];
2233 merge_file_t* file = &files[i];
2234 ulint rows_added = 0;
2235
2236 if (dict_index_is_spatial(buf->index)) {
2237 if (!row) {
2238 continue;
2239 }
2240
2241 ut_ad(sp_tuples[s_idx_cnt]->get_index()
2242 == buf->index);
2243
2244 /* If the geometry field is invalid, report
2245 error. */
2246 if (!row_geo_field_is_valid(row, buf->index)) {
2247 err = DB_CANT_CREATE_GEOMETRY_OBJECT;
2248 break;
2249 }
2250
2251 sp_tuples[s_idx_cnt]->add(row, ext);
2252 s_idx_cnt++;
2253
2254 continue;
2255 }
2256
2257 if (UNIV_LIKELY
2258 (row && (rows_added = row_merge_buf_add(
2259 buf, fts_index, old_table, new_table,
2260 psort_info, row, ext, &doc_id,
2261 conv_heap, &err, &v_heap, eval_table,
2262 trx, prebuilt)))) {
2263
2264 /* If we are creating FTS index,
2265 a single row can generate more
2266 records for tokenized word */
2267 file->n_rec += rows_added;
2268
2269 if (err != DB_SUCCESS) {
2270 ut_ad(err == DB_TOO_BIG_RECORD);
2271 break;
2272 }
2273
2274 if (doc_id > max_doc_id) {
2275 max_doc_id = doc_id;
2276 }
2277
2278 if (buf->index->type & DICT_FTS) {
2279 /* Check if error occurs in child thread */
2280 for (ulint j = 0;
2281 j < fts_sort_pll_degree; j++) {
2282 if (psort_info[j].error
2283 != DB_SUCCESS) {
2284 err = psort_info[j].error;
2285 trx->error_key_num = i;
2286 break;
2287 }
2288 }
2289
2290 if (err != DB_SUCCESS) {
2291 break;
2292 }
2293 }
2294
2295 if (skip_sort) {
2296 ut_ad(buf->n_tuples > 0);
2297 const mtuple_t* curr =
2298 &buf->tuples[buf->n_tuples - 1];
2299
2300 ut_ad(i == 0);
2301 ut_ad(dict_index_is_clust(merge_buf[0]->index));
2302 /* Detect duplicates by comparing the
2303 current record with previous record.
2304 When temp file is not used, records
2305 should be in sorted order. */
2306 if (prev_mtuple.fields != NULL
2307 && (row_mtuple_cmp(
2308 &prev_mtuple, curr,
2309 &clust_dup) == 0)) {
2310
2311 err = DB_DUPLICATE_KEY;
2312 trx->error_key_num
2313 = key_numbers[0];
2314 goto func_exit;
2315 }
2316
2317 prev_mtuple.fields = curr->fields;
2318 }
2319
2320 continue;
2321 }
2322
2323 if (err == DB_COMPUTE_VALUE_FAILED) {
2324 trx->error_key_num = i;
2325 goto func_exit;
2326 }
2327
2328 if (buf->index->type & DICT_FTS) {
2329 if (!row || !doc_id) {
2330 continue;
2331 }
2332 }
2333
2334 /* The buffer must be sufficiently large
2335 to hold at least one record. It may only
2336 be empty when we reach the end of the
2337 clustered index. row_merge_buf_add()
2338 must not have been called in this loop. */
2339 ut_ad(buf->n_tuples || row == NULL);
2340
2341 /* We have enough data tuples to form a block.
2342 Sort them and write to disk if temp file is used
2343 or insert into index if temp file is not used. */
2344 ut_ad(old_table == new_table
2345 ? !dict_index_is_clust(buf->index)
2346 : (i == 0) == dict_index_is_clust(buf->index));
2347
2348 /* We have enough data tuples to form a block.
2349 Sort them (if !skip_sort) and write to disk. */
2350
2351 if (buf->n_tuples) {
2352 if (skip_sort) {
2353 /* Temporary File is not used.
2354 so insert sorted block to the index */
2355 if (row != NULL) {
2356 bool mtr_committed = false;
2357
2358 /* We have to do insert the
2359 cached spatial index rows, since
2360 after the mtr_commit, the cluster
2361 index page could be updated, then
2362 the data in cached rows become
2363 invalid. */
2364 err = row_merge_spatial_rows(
2365 trx->id, sp_tuples,
2366 num_spatial,
2367 row_heap, sp_heap,
2368 &pcur, &mtr,
2369 &mtr_committed);
2370
2371 if (err != DB_SUCCESS) {
2372 goto func_exit;
2373 }
2374
2375 /* We are not at the end of
2376 the scan yet. We must
2377 mtr_commit() in order to be
2378 able to call log_free_check()
2379 in row_merge_insert_index_tuples().
2380 Due to mtr_commit(), the
2381 current row will be invalid, and
2382 we must reread it on the next
2383 loop iteration. */
2384 if (!mtr_committed) {
2385 btr_pcur_move_to_prev_on_page(
2386 &pcur);
2387 btr_pcur_store_position(
2388 &pcur, &mtr);
2389
2390 mtr_commit(&mtr);
2391 }
2392 }
2393
2394 mem_heap_empty(mtuple_heap);
2395 prev_mtuple.fields = prev_fields;
2396
2397 row_mtuple_create(
2398 &buf->tuples[buf->n_tuples - 1],
2399 &prev_mtuple, n_uniq,
2400 mtuple_heap);
2401
2402 if (clust_btr_bulk == NULL) {
2403 clust_btr_bulk = UT_NEW_NOKEY(
2404 BtrBulk(index[i],
2405 trx->id,
2406 observer));
2407
2408 clust_btr_bulk->init();
2409 } else {
2410 clust_btr_bulk->latch();
2411 }
2412
2413 err = row_merge_insert_index_tuples(
2414 trx->id, index[i], old_table,
2415 -1, NULL, NULL,
2416 new_table->space, buf,
2417 clust_btr_bulk);
2418
2419 if (row == NULL) {
2420 err = clust_btr_bulk->finish(
2421 err);
2422 UT_DELETE(clust_btr_bulk);
2423 clust_btr_bulk = NULL;
2424 } else {
2425 /* Release latches for possible
2426 log_free_chck in spatial index
2427 build. */
2428 clust_btr_bulk->release();
2429 }
2430
2431 if (err != DB_SUCCESS) {
2432 break;
2433 }
2434
2435 if (row != NULL) {
2436 /* Restore the cursor on the
2437 previous clustered index record,
2438 and empty the buffer. The next
2439 iteration of the outer loop will
2440 advance the cursor and read the
2441 next record (the one which we
2442 had to ignore due to the buffer
2443 overflow). */
2444 mtr_start(&mtr);
2445 btr_pcur_restore_position(
2446 BTR_SEARCH_LEAF, &pcur,
2447 &mtr);
2448 buf = row_merge_buf_empty(buf);
2449 /* Restart the outer loop on the
2450 record. We did not insert it
2451 into any index yet. */
2452 ut_ad(i == 0);
2453 break;
2454 }
2455 } else if (dict_index_is_unique(buf->index)) {
2456 row_merge_dup_t dup = {
2457 buf->index, table, col_map, 0};
2458
2459 row_merge_buf_sort(buf, &dup);
2460
2461 if (dup.n_dup) {
2462 err = DB_DUPLICATE_KEY;
2463 trx->error_key_num
2464 = key_numbers[i];
2465 break;
2466 }
2467 } else {
2468 row_merge_buf_sort(buf, NULL);
2469 }
2470 } else if (online && new_table == old_table) {
2471 /* Note the newest transaction that
2472 modified this index when the scan was
2473 completed. We prevent older readers
2474 from accessing this index, to ensure
2475 read consistency. */
2476
2477 trx_id_t max_trx_id;
2478
2479 ut_a(row == NULL);
2480 rw_lock_x_lock(
2481 dict_index_get_lock(buf->index));
2482 ut_a(dict_index_get_online_status(buf->index)
2483 == ONLINE_INDEX_CREATION);
2484
2485 max_trx_id = row_log_get_max_trx(buf->index);
2486
2487 if (max_trx_id > buf->index->trx_id) {
2488 buf->index->trx_id = max_trx_id;
2489 }
2490
2491 rw_lock_x_unlock(
2492 dict_index_get_lock(buf->index));
2493 }
2494
2495 /* Secondary index and clustered index which is
2496 not in sorted order can use the temporary file.
2497 Fulltext index should not use the temporary file. */
2498 if (!skip_sort && !(buf->index->type & DICT_FTS)) {
2499 /* In case we can have all rows in sort buffer,
2500 we can insert directly into the index without
2501 temporary file if clustered index does not uses
2502 temporary file. */
2503 if (row == NULL && file->fd == -1
2504 && !clust_temp_file) {
2505 DBUG_EXECUTE_IF(
2506 "row_merge_write_failure",
2507 err = DB_TEMP_FILE_WRITE_FAIL;
2508 trx->error_key_num = i;
2509 goto all_done;);
2510
2511 DBUG_EXECUTE_IF(
2512 "row_merge_tmpfile_fail",
2513 err = DB_OUT_OF_MEMORY;
2514 trx->error_key_num = i;
2515 goto all_done;);
2516
2517 BtrBulk btr_bulk(index[i], trx->id,
2518 observer);
2519 btr_bulk.init();
2520
2521 err = row_merge_insert_index_tuples(
2522 trx->id, index[i], old_table,
2523 -1, NULL, NULL,
2524 new_table->space, buf,
2525 &btr_bulk);
2526
2527 err = btr_bulk.finish(err);
2528
2529 DBUG_EXECUTE_IF(
2530 "row_merge_insert_big_row",
2531 err = DB_TOO_BIG_RECORD;);
2532
2533 if (err != DB_SUCCESS) {
2534 break;
2535 }
2536 } else {
2537 if (row_merge_file_create_if_needed(
2538 file, tmpfd,
2539 buf->n_tuples, path) < 0) {
2540 err = DB_OUT_OF_MEMORY;
2541 trx->error_key_num = i;
2542 goto func_exit;
2543 }
2544
2545 /* Ensure that duplicates in the
2546 clustered index will be detected before
2547 inserting secondary index records. */
2548 if (dict_index_is_clust(buf->index)) {
2549 clust_temp_file = true;
2550 }
2551
2552 ut_ad(file->n_rec > 0);
2553
2554 row_merge_buf_write(buf, file, block);
2555
2556 if (!row_merge_write(
2557 file->fd, file->offset++,
2558 block, crypt_block,
2559 new_table->space)) {
2560 err = DB_TEMP_FILE_WRITE_FAIL;
2561 trx->error_key_num = i;
2562 break;
2563 }
2564
2565 UNIV_MEM_INVALID(
2566 &block[0], srv_sort_buf_size);
2567 }
2568 }
2569 merge_buf[i] = row_merge_buf_empty(buf);
2570
2571 if (UNIV_LIKELY(row != NULL)) {
2572 /* Try writing the record again, now
2573 that the buffer has been written out
2574 and emptied. */
2575
2576 if (UNIV_UNLIKELY
2577 (!(rows_added = row_merge_buf_add(
2578 buf, fts_index, old_table,
2579 new_table, psort_info, row, ext,
2580 &doc_id, conv_heap,
2581 &err, &v_heap, table, trx,
2582 prebuilt)))) {
2583 /* An empty buffer should have enough
2584 room for at least one record. */
2585 ut_error;
2586 }
2587
2588 if (err != DB_SUCCESS) {
2589 break;
2590 }
2591
2592 file->n_rec += rows_added;
2593 }
2594 }
2595
2596 if (row == NULL) {
2597 goto all_done;
2598 }
2599
2600 if (err != DB_SUCCESS) {
2601 goto func_exit;
2602 }
2603
2604 if (v_heap) {
2605 mem_heap_empty(v_heap);
2606 }
2607 }
2608
2609 func_exit:
2610 /* row_merge_spatial_rows may have committed
2611 the mtr before an error occurs. */
2612 if (mtr.is_active()) {
2613 mtr_commit(&mtr);
2614 }
2615 mem_heap_free(row_heap);
2616 ut_free(nonnull);
2617
2618 all_done:
2619 if (clust_btr_bulk != NULL) {
2620 ut_ad(err != DB_SUCCESS);
2621 clust_btr_bulk->latch();
2622 err = clust_btr_bulk->finish(
2623 err);
2624 UT_DELETE(clust_btr_bulk);
2625 }
2626
2627 if (prev_fields != NULL) {
2628 ut_free(prev_fields);
2629 mem_heap_free(mtuple_heap);
2630 }
2631
2632 if (v_heap) {
2633 mem_heap_free(v_heap);
2634 }
2635
2636 if (conv_heap != NULL) {
2637 mem_heap_free(conv_heap);
2638 }
2639
2640 #ifdef FTS_INTERNAL_DIAG_PRINT
2641 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
2642 #endif
2643 if (fts_pll_sort) {
2644 bool all_exit = false;
2645 ulint trial_count = 0;
2646 const ulint max_trial_count = 10000;
2647
2648 wait_again:
2649 /* Check if error occurs in child thread */
2650 for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2651 if (psort_info[j].error != DB_SUCCESS) {
2652 err = psort_info[j].error;
2653 trx->error_key_num = j;
2654 break;
2655 }
2656 }
2657
2658 /* Tell all children that parent has done scanning */
2659 for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2660 if (err == DB_SUCCESS) {
2661 psort_info[i].state = FTS_PARENT_COMPLETE;
2662 } else {
2663 psort_info[i].state = FTS_PARENT_EXITING;
2664 }
2665 }
2666
2667 /* Now wait all children to report back to be completed */
2668 os_event_wait_time_low(fts_parallel_sort_event,
2669 1000000, sig_count);
2670
2671 for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2672 if (psort_info[i].child_status != FTS_CHILD_COMPLETE
2673 && psort_info[i].child_status != FTS_CHILD_EXITING) {
2674 sig_count = os_event_reset(
2675 fts_parallel_sort_event);
2676 goto wait_again;
2677 }
2678 }
2679
2680 /* Now all children should complete, wait a bit until
2681 they all finish setting the event, before we free everything.
2682 This has a 10 second timeout */
2683 do {
2684 all_exit = true;
2685
2686 for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2687 if (psort_info[j].child_status
2688 != FTS_CHILD_EXITING) {
2689 all_exit = false;
2690 os_thread_sleep(1000);
2691 break;
2692 }
2693 }
2694 trial_count++;
2695 } while (!all_exit && trial_count < max_trial_count);
2696
2697 if (!all_exit) {
2698 ib::fatal() << "Not all child sort threads exited"
2699 " when creating FTS index '"
2700 << fts_sort_idx->name << "'";
2701 }
2702 }
2703
2704 #ifdef FTS_INTERNAL_DIAG_PRINT
2705 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Tokenization\n");
2706 #endif
2707 for (ulint i = 0; i < n_index; i++) {
2708 row_merge_buf_free(merge_buf[i]);
2709 }
2710
2711 row_fts_free_pll_merge_buf(psort_info);
2712
2713 ut_free(merge_buf);
2714
2715 btr_pcur_close(&pcur);
2716
2717 if (sp_tuples != NULL) {
2718 for (ulint i = 0; i < num_spatial; i++) {
2719 UT_DELETE(sp_tuples[i]);
2720 }
2721 ut_free(sp_tuples);
2722
2723 if (sp_heap) {
2724 mem_heap_free(sp_heap);
2725 }
2726 }
2727
2728 /* Update the next Doc ID we used. Table should be locked, so
2729 no concurrent DML */
2730 if (max_doc_id && err == DB_SUCCESS) {
2731 /* Sync fts cache for other fts indexes to keep all
2732 fts indexes consistent in sync_doc_id. */
2733 err = fts_sync_table(const_cast<dict_table_t*>(new_table),
2734 false, true, false);
2735
2736 if (err == DB_SUCCESS) {
2737 fts_update_next_doc_id(
2738 0, new_table,
2739 old_table->name.m_name, max_doc_id);
2740 }
2741 }
2742
2743 trx->op_info = "";
2744
2745 DBUG_RETURN(err);
2746 }
2747
2748 /** Write a record via buffer 2 and read the next record to buffer N.
2749 @param N number of the buffer (0 or 1)
2750 @param INDEX record descriptor
2751 @param AT_END statement to execute at end of input */
2752 #define ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END) \
2753 do { \
2754 b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], \
2755 crypt_block ? \
2756 &crypt_block[2 * srv_sort_buf_size] :\
2757 NULL, \
2758 space_id, \
2759 &buf[2], b2, \
2760 of->fd, &of->offset, \
2761 mrec##N, offsets##N); \
2762 if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) { \
2763 goto corrupt; \
2764 } \
2765 b##N = row_merge_read_rec(&block[N * srv_sort_buf_size],\
2766 crypt_block ? \
2767 &crypt_block[N * srv_sort_buf_size] :\
2768 NULL, \
2769 space_id, \
2770 &buf[N], b##N, INDEX, \
2771 file->fd, foffs##N, \
2772 &mrec##N, offsets##N); \
2773 if (UNIV_UNLIKELY(!b##N)) { \
2774 if (mrec##N) { \
2775 goto corrupt; \
2776 } \
2777 AT_END; \
2778 } \
2779 } while (0)
2780
2781 #ifdef HAVE_PSI_STAGE_INTERFACE
2782 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
2783 do { \
2784 if (stage != NULL) { \
2785 stage->inc(); \
2786 } \
2787 ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END); \
2788 } while (0)
2789 #else /* HAVE_PSI_STAGE_INTERFACE */
2790 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
2791 ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END)
2792 #endif /* HAVE_PSI_STAGE_INTERFACE */
2793
2794 /** Merge two blocks of records on disk and write a bigger block.
2795 @param[in] dup descriptor of index being created
2796 @param[in] file file containing index entries
2797 @param[in,out] block 3 buffers
2798 @param[in,out] crypt_block encrypted file buffer
2799 @param[in] space_id tablespace id
2800 @param[in,out] foffs0 offset of first source list in the file
2801 @param[in,out] foffs1 offset of second source list in the file
2802 @param[in,out] of output file
2803 @param[in,out] stage performance schema accounting object, used by
2804 ALTER TABLE. If not NULL stage->inc() will be called for each record
2805 processed.
2806 @return DB_SUCCESS or error code */
2807 static MY_ATTRIBUTE((warn_unused_result))
2808 dberr_t
row_merge_blocks(const row_merge_dup_t * dup,const merge_file_t * file,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,ulint * foffs0,ulint * foffs1,merge_file_t * of,ut_stage_alter_t * stage)2809 row_merge_blocks(
2810 const row_merge_dup_t* dup,
2811 const merge_file_t* file,
2812 row_merge_block_t* block,
2813 row_merge_block_t* crypt_block,
2814 ulint space_id,
2815 ulint* foffs0,
2816 ulint* foffs1,
2817 merge_file_t* of,
2818 ut_stage_alter_t* stage)
2819 {
2820 mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */
2821
2822 mrec_buf_t* buf; /*!< buffer for handling
2823 split mrec in block[] */
2824 const byte* b0; /*!< pointer to block[0] */
2825 const byte* b1; /*!< pointer to block[srv_sort_buf_size] */
2826 byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */
2827 const mrec_t* mrec0; /*!< merge rec, points to block[0] or buf[0] */
2828 const mrec_t* mrec1; /*!< merge rec, points to
2829 block[srv_sort_buf_size] or buf[1] */
2830 ulint* offsets0;/* offsets of mrec0 */
2831 ulint* offsets1;/* offsets of mrec1 */
2832
2833 DBUG_ENTER("row_merge_blocks");
2834 DBUG_PRINT("ib_merge_sort",
2835 ("fd=%d,%lu+%lu to fd=%d,%lu",
2836 file->fd, ulong(*foffs0), ulong(*foffs1),
2837 of->fd, ulong(of->offset)));
2838
2839 heap = row_merge_heap_create(dup->index, &buf, &offsets0, &offsets1);
2840
2841 /* Write a record and read the next record. Split the output
2842 file in two halves, which can be merged on the following pass. */
2843
2844 if (!row_merge_read(file->fd, *foffs0, &block[0],
2845 &crypt_block[0], space_id)
2846 || !row_merge_read(file->fd, *foffs1, &block[srv_sort_buf_size],
2847 crypt_block ? &crypt_block[srv_sort_buf_size] : NULL,
2848 space_id)) {
2849 corrupt:
2850 mem_heap_free(heap);
2851 DBUG_RETURN(DB_CORRUPTION);
2852 }
2853
2854 b0 = &block[0];
2855 b1 = &block[srv_sort_buf_size];
2856 b2 = &block[2 * srv_sort_buf_size];
2857
2858 b0 = row_merge_read_rec(
2859 &block[0], &crypt_block[0], space_id, &buf[0], b0, dup->index,
2860 file->fd, foffs0, &mrec0, offsets0);
2861 b1 = row_merge_read_rec(
2862 &block[srv_sort_buf_size],
2863 crypt_block ? &crypt_block[srv_sort_buf_size] : NULL,
2864 space_id, &buf[srv_sort_buf_size], b1, dup->index,
2865 file->fd, foffs1, &mrec1, offsets1);
2866 if (UNIV_UNLIKELY(!b0 && mrec0)
2867 || UNIV_UNLIKELY(!b1 && mrec1)) {
2868
2869 goto corrupt;
2870 }
2871
2872 while (mrec0 && mrec1) {
2873 int cmp = cmp_rec_rec_simple(
2874 mrec0, mrec1, offsets0, offsets1,
2875 dup->index, dup->table);
2876 if (cmp < 0) {
2877 ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto merged);
2878 } else if (cmp) {
2879 ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto merged);
2880 } else {
2881 mem_heap_free(heap);
2882 DBUG_RETURN(DB_DUPLICATE_KEY);
2883 }
2884 }
2885
2886 merged:
2887 if (mrec0) {
2888 /* append all mrec0 to output */
2889 for (;;) {
2890 ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto done0);
2891 }
2892 }
2893 done0:
2894 if (mrec1) {
2895 /* append all mrec1 to output */
2896 for (;;) {
2897 ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto done1);
2898 }
2899 }
2900 done1:
2901
2902 mem_heap_free(heap);
2903 b2 = row_merge_write_eof(&block[2 * srv_sort_buf_size],
2904 crypt_block ?
2905 &crypt_block[2 * srv_sort_buf_size] : NULL,
2906 space_id, b2, of->fd, &of->offset);
2907 DBUG_RETURN(b2 ? DB_SUCCESS : DB_CORRUPTION);
2908 }
2909
2910 /** Copy a block of index entries.
2911 @param[in] index index being created
2912 @param[in] file input file
2913 @param[in,out] block 3 buffers
2914 @param[in,out] crypt_block encrypted file buffer
2915 @param[in] space_id tablespace id
2916 @param[in,out] foffs0 input file offset
2917 @param[in,out] of output file
2918 @param[in,out] stage performance schema accounting object, used by
2919 ALTER TABLE. If not NULL stage->inc() will be called for each record
2920 processed.
2921 @return TRUE on success, FALSE on failure */
2922 static MY_ATTRIBUTE((warn_unused_result))
2923 ibool
row_merge_blocks_copy(const dict_index_t * index,const merge_file_t * file,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,ulint * foffs0,merge_file_t * of,ut_stage_alter_t * stage)2924 row_merge_blocks_copy(
2925 const dict_index_t* index,
2926 const merge_file_t* file,
2927 row_merge_block_t* block,
2928 row_merge_block_t* crypt_block,
2929 ulint space_id,
2930 ulint* foffs0,
2931 merge_file_t* of,
2932 ut_stage_alter_t* stage)
2933 {
2934 mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */
2935
2936 mrec_buf_t* buf; /*!< buffer for handling
2937 split mrec in block[] */
2938 const byte* b0; /*!< pointer to block[0] */
2939 byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */
2940 const mrec_t* mrec0; /*!< merge rec, points to block[0] */
2941 ulint* offsets0;/* offsets of mrec0 */
2942 ulint* offsets1;/* dummy offsets */
2943
2944 DBUG_ENTER("row_merge_blocks_copy");
2945 DBUG_PRINT("ib_merge_sort",
2946 ("fd=%d," ULINTPF " to fd=%d," ULINTPF,
2947 file->fd, *foffs0,
2948 of->fd, of->offset));
2949
2950 heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
2951
2952 /* Write a record and read the next record. Split the output
2953 file in two halves, which can be merged on the following pass. */
2954
2955 if (!row_merge_read(file->fd, *foffs0, &block[0], &crypt_block[0],
2956 space_id)) {
2957 corrupt:
2958 mem_heap_free(heap);
2959 DBUG_RETURN(FALSE);
2960 }
2961
2962 b0 = &block[0];
2963
2964 b2 = &block[2 * srv_sort_buf_size];
2965
2966 b0 = row_merge_read_rec(&block[0], &crypt_block[0], space_id, &buf[0],
2967 b0, index, file->fd, foffs0, &mrec0, offsets0);
2968 if (UNIV_UNLIKELY(!b0 && mrec0)) {
2969
2970 goto corrupt;
2971 }
2972
2973 if (mrec0) {
2974 /* append all mrec0 to output */
2975 for (;;) {
2976 ROW_MERGE_WRITE_GET_NEXT(0, index, goto done0);
2977 }
2978 }
2979 done0:
2980
2981 /* The file offset points to the beginning of the last page
2982 that has been read. Update it to point to the next block. */
2983 (*foffs0)++;
2984
2985 mem_heap_free(heap);
2986 DBUG_RETURN(row_merge_write_eof(&block[2 * srv_sort_buf_size],
2987 crypt_block ?
2988 &crypt_block[2 * srv_sort_buf_size] :
2989 NULL, space_id,
2990 b2, of->fd, &of->offset)
2991 != NULL);
2992 }
2993
2994 /** Merge disk files.
2995 @param[in] trx transaction
2996 @param[in] dup descriptor of index being created
2997 @param[in,out] file file containing index entries
2998 @param[in,out] block 3 buffers
2999 @param[in,out] crypt_block encrypted file buffer
3000 @param[in] space_id tablespace id
3001 @param[in,out] tmpfd temporary file handle
3002 @param[in,out] num_run Number of runs that remain to be merged
3003 @param[in,out] run_offset Array that contains the first offset number
3004 for each merge run
3005 @param[in,out] stage performance schema accounting object, used by
3006 ALTER TABLE. If not NULL stage->inc() will be called for each record
3007 processed.
3008 @return DB_SUCCESS or error code */
3009 static
3010 dberr_t
row_merge(trx_t * trx,const row_merge_dup_t * dup,merge_file_t * file,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,int * tmpfd,ulint * num_run,ulint * run_offset,ut_stage_alter_t * stage)3011 row_merge(
3012 trx_t* trx,
3013 const row_merge_dup_t* dup,
3014 merge_file_t* file,
3015 row_merge_block_t* block,
3016 row_merge_block_t* crypt_block,
3017 ulint space_id,
3018 int* tmpfd,
3019 ulint* num_run,
3020 ulint* run_offset,
3021 ut_stage_alter_t* stage)
3022 {
3023 ulint foffs0; /*!< first input offset */
3024 ulint foffs1; /*!< second input offset */
3025 dberr_t error; /*!< error code */
3026 merge_file_t of; /*!< output file */
3027 const ulint ihalf = run_offset[*num_run / 2];
3028 /*!< half the input file */
3029 ulint n_run = 0;
3030 /*!< num of runs generated from this merge */
3031
3032 UNIV_MEM_ASSERT_W(&block[0], 3 * srv_sort_buf_size);
3033 if (crypt_block) {
3034 UNIV_MEM_ASSERT_W(&crypt_block[0], 3 * srv_sort_buf_size);
3035 }
3036
3037 ut_ad(ihalf < file->offset);
3038
3039 of.fd = *tmpfd;
3040 of.offset = 0;
3041 of.n_rec = 0;
3042
3043 #ifdef POSIX_FADV_SEQUENTIAL
3044 /* The input file will be read sequentially, starting from the
3045 beginning and the middle. In Linux, the POSIX_FADV_SEQUENTIAL
3046 affects the entire file. Each block will be read exactly once. */
3047 posix_fadvise(file->fd, 0, 0,
3048 POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
3049 #endif /* POSIX_FADV_SEQUENTIAL */
3050
3051 /* Merge blocks to the output file. */
3052 foffs0 = 0;
3053 foffs1 = ihalf;
3054
3055 UNIV_MEM_INVALID(run_offset, *num_run * sizeof *run_offset);
3056
3057 for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
3058
3059 if (trx_is_interrupted(trx)) {
3060 return(DB_INTERRUPTED);
3061 }
3062
3063 /* Remember the offset number for this run */
3064 run_offset[n_run++] = of.offset;
3065
3066 error = row_merge_blocks(dup, file, block, crypt_block,
3067 space_id, &foffs0, &foffs1, &of,
3068 stage);
3069
3070 if (error != DB_SUCCESS) {
3071 return(error);
3072 }
3073
3074 }
3075
3076 /* Copy the last blocks, if there are any. */
3077
3078 while (foffs0 < ihalf) {
3079 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
3080 return(DB_INTERRUPTED);
3081 }
3082
3083 /* Remember the offset number for this run */
3084 run_offset[n_run++] = of.offset;
3085
3086 if (!row_merge_blocks_copy(dup->index, file, block, crypt_block,
3087 space_id, &foffs0, &of, stage)) {
3088 return(DB_CORRUPTION);
3089 }
3090 }
3091
3092 ut_ad(foffs0 == ihalf);
3093
3094 while (foffs1 < file->offset) {
3095 if (trx_is_interrupted(trx)) {
3096 return(DB_INTERRUPTED);
3097 }
3098
3099 /* Remember the offset number for this run */
3100 run_offset[n_run++] = of.offset;
3101
3102 if (!row_merge_blocks_copy(dup->index, file, block, crypt_block,
3103 space_id, &foffs1, &of, stage)) {
3104 return(DB_CORRUPTION);
3105 }
3106 }
3107
3108 ut_ad(foffs1 == file->offset);
3109
3110 if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
3111 return(DB_CORRUPTION);
3112 }
3113
3114 ut_ad(n_run <= *num_run);
3115
3116 *num_run = n_run;
3117
3118 /* Each run can contain one or more offsets. As merge goes on,
3119 the number of runs (to merge) will reduce until we have one
3120 single run. So the number of runs will always be smaller than
3121 the number of offsets in file */
3122 ut_ad((*num_run) <= file->offset);
3123
3124 /* The number of offsets in output file is always equal or
3125 smaller than input file */
3126 ut_ad(of.offset <= file->offset);
3127
3128 /* Swap file descriptors for the next pass. */
3129 *tmpfd = file->fd;
3130 *file = of;
3131
3132 UNIV_MEM_INVALID(&block[0], 3 * srv_sort_buf_size);
3133
3134 return(DB_SUCCESS);
3135 }
3136
3137 /** Merge disk files.
3138 @param[in] trx transaction
3139 @param[in] dup descriptor of index being created
3140 @param[in,out] file file containing index entries
3141 @param[in,out] block 3 buffers
3142 @param[in,out] crypt_block crypt buf or NULL
3143 @param[in] space_id tablespace id
3144 @param[in,out] tmpfd temporary file handle
3145 @param[in,out] stage performance schema accounting object, used by
3146 ALTER TABLE. If not NULL, stage->begin_phase_sort() will be called initially
3147 and then stage->inc() will be called for each record processed.
3148 @return DB_SUCCESS or error code */
3149 dberr_t
row_merge_sort(trx_t * trx,const row_merge_dup_t * dup,merge_file_t * file,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,int * tmpfd,ut_stage_alter_t * stage)3150 row_merge_sort(
3151 trx_t* trx,
3152 const row_merge_dup_t* dup,
3153 merge_file_t* file,
3154 row_merge_block_t* block,
3155 row_merge_block_t* crypt_block,
3156 ulint space_id,
3157 int* tmpfd,
3158 ut_stage_alter_t* stage /* = NULL */)
3159 {
3160 const ulint half = file->offset / 2;
3161 ulint num_runs;
3162 ulint* run_offset;
3163 dberr_t error = DB_SUCCESS;
3164 DBUG_ENTER("row_merge_sort");
3165
3166 /* Record the number of merge runs we need to perform */
3167 num_runs = file->offset;
3168
3169 if (stage != NULL) {
3170 stage->begin_phase_sort(log2(num_runs));
3171 }
3172
3173 /* If num_runs are less than 1, nothing to merge */
3174 if (num_runs <= 1) {
3175 DBUG_RETURN(error);
3176 }
3177
3178 /* "run_offset" records each run's first offset number */
3179 run_offset = (ulint*) ut_malloc_nokey(file->offset * sizeof(ulint));
3180
3181 /* This tells row_merge() where to start for the first round
3182 of merge. */
3183 run_offset[half] = half;
3184
3185 /* The file should always contain at least one byte (the end
3186 of file marker). Thus, it must be at least one block. */
3187 ut_ad(file->offset > 0);
3188
3189 /* Merge the runs until we have one big run */
3190 do {
3191 error = row_merge(trx, dup, file, block, crypt_block, space_id,
3192 tmpfd, &num_runs, run_offset, stage);
3193
3194 if (error != DB_SUCCESS) {
3195 break;
3196 }
3197
3198 UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
3199 } while (num_runs > 1);
3200
3201 ut_free(run_offset);
3202
3203 DBUG_RETURN(error);
3204 }
3205
3206 /** Copy externally stored columns to the data tuple.
3207 @param[in] mrec record containing BLOB pointers,
3208 or NULL to use tuple instead
3209 @param[in] offsets offsets of mrec
3210 @param[in] zip_size compressed page size in bytes, or 0
3211 @param[in,out] tuple data tuple
3212 @param[in,out] heap memory heap */
3213 static
3214 void
row_merge_copy_blobs(const mrec_t * mrec,const ulint * offsets,const page_size_t & page_size,dtuple_t * tuple,mem_heap_t * heap)3215 row_merge_copy_blobs(
3216 const mrec_t* mrec,
3217 const ulint* offsets,
3218 const page_size_t& page_size,
3219 dtuple_t* tuple,
3220 mem_heap_t* heap)
3221 {
3222 ut_ad(mrec == NULL || rec_offs_any_extern(offsets));
3223
3224 for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
3225 ulint len;
3226 const void* data;
3227 dfield_t* field = dtuple_get_nth_field(tuple, i);
3228 ulint field_len;
3229 const byte* field_data;
3230
3231 if (!dfield_is_ext(field)) {
3232 continue;
3233 }
3234
3235 ut_ad(!dfield_is_null(field));
3236
3237 /* During the creation of a PRIMARY KEY, the table is
3238 X-locked, and we skip copying records that have been
3239 marked for deletion. Therefore, externally stored
3240 columns cannot possibly be freed between the time the
3241 BLOB pointers are read (row_merge_read_clustered_index())
3242 and dereferenced (below). */
3243 if (mrec == NULL) {
3244 field_data
3245 = static_cast<byte*>(dfield_get_data(field));
3246 field_len = dfield_get_len(field);
3247
3248 ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
3249
3250 ut_a(memcmp(field_data + field_len
3251 - BTR_EXTERN_FIELD_REF_SIZE,
3252 field_ref_zero,
3253 BTR_EXTERN_FIELD_REF_SIZE));
3254
3255 data = btr_copy_externally_stored_field(
3256 &len, field_data, page_size, field_len, heap);
3257 } else {
3258 data = btr_rec_copy_externally_stored_field(
3259 mrec, offsets, page_size, i, &len, heap);
3260 }
3261
3262 /* Because we have locked the table, any records
3263 written by incomplete transactions must have been
3264 rolled back already. There must not be any incomplete
3265 BLOB columns. */
3266 ut_a(data);
3267
3268 dfield_set_data(field, data, len);
3269 }
3270 }
3271
3272 /** Convert a merge record to a typed data tuple. Note that externally
3273 stored fields are not copied to heap.
3274 @param[in,out] index index on the table
3275 @param[in] mtuple merge record
3276 @param[in] heap memory heap from which memory needed is allocated
3277 @return index entry built. */
3278 static
3279 void
row_merge_mtuple_to_dtuple(dict_index_t * index,dtuple_t * dtuple,const mtuple_t * mtuple)3280 row_merge_mtuple_to_dtuple(
3281 dict_index_t* index,
3282 dtuple_t* dtuple,
3283 const mtuple_t* mtuple)
3284 {
3285 ut_ad(!dict_index_is_ibuf(index));
3286
3287 memcpy(dtuple->fields, mtuple->fields,
3288 dtuple->n_fields * sizeof *mtuple->fields);
3289 }
3290
3291 /** Insert sorted data tuples to the index.
3292 @param[in] trx_id transaction identifier
3293 @param[in] index index to be inserted
3294 @param[in] old_table old table
3295 @param[in] fd file descriptor
3296 @param[in,out] block file buffer
3297 @param[in,out] crypt_block crypt buf or NULL
3298 @param[in] space_id tablespace id
3299 @param[in] row_buf row_buf the sorted data tuples,
3300 or NULL if fd, block will be used instead
3301 @param[in,out] btr_bulk btr bulk instance
3302 @param[in,out] stage performance schema accounting object, used by
3303 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
3304 and then stage->inc() will be called for each record that is processed.
3305 @return DB_SUCCESS or error number */
3306 static MY_ATTRIBUTE((warn_unused_result))
3307 dberr_t
row_merge_insert_index_tuples(trx_id_t trx_id,dict_index_t * index,const dict_table_t * old_table,int fd,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,const row_merge_buf_t * row_buf,BtrBulk * btr_bulk,ut_stage_alter_t * stage)3308 row_merge_insert_index_tuples(
3309 trx_id_t trx_id,
3310 dict_index_t* index,
3311 const dict_table_t* old_table,
3312 int fd,
3313 row_merge_block_t* block,
3314 row_merge_block_t* crypt_block,
3315 ulint space_id,
3316 const row_merge_buf_t* row_buf,
3317 BtrBulk* btr_bulk,
3318 ut_stage_alter_t* stage /* = NULL */)
3319 {
3320 const byte* b;
3321 mem_heap_t* heap;
3322 mem_heap_t* tuple_heap;
3323 dberr_t error = DB_SUCCESS;
3324 ulint foffs = 0;
3325 ulint* offsets;
3326 mrec_buf_t* buf;
3327 ulint n_rows = 0;
3328 dtuple_t* dtuple;
3329 DBUG_ENTER("row_merge_insert_index_tuples");
3330
3331 ut_ad(!srv_read_only_mode);
3332 ut_ad(!(index->type & DICT_FTS));
3333 ut_ad(!dict_index_is_spatial(index));
3334 ut_ad(trx_id);
3335
3336 if (stage != NULL) {
3337 stage->begin_phase_insert();
3338 }
3339
3340 tuple_heap = mem_heap_create(1000);
3341
3342 {
3343 ulint i = 1 + REC_OFFS_HEADER_SIZE
3344 + dict_index_get_n_fields(index);
3345 heap = mem_heap_create(sizeof *buf + i * sizeof *offsets);
3346 offsets = static_cast<ulint*>(
3347 mem_heap_alloc(heap, i * sizeof *offsets));
3348 offsets[0] = i;
3349 offsets[1] = dict_index_get_n_fields(index);
3350 }
3351
3352 if (row_buf != NULL) {
3353 ut_ad(fd == -1);
3354 ut_ad(block == NULL);
3355 DBUG_EXECUTE_IF("row_merge_read_failure",
3356 error = DB_CORRUPTION;
3357 goto err_exit;);
3358 buf = NULL;
3359 b = NULL;
3360 dtuple = dtuple_create(
3361 heap, dict_index_get_n_fields(index));
3362 dtuple_set_n_fields_cmp(
3363 dtuple, dict_index_get_n_unique_in_tree(index));
3364 } else {
3365 b = block;
3366 dtuple = NULL;
3367
3368 if (!row_merge_read(fd, foffs, block, crypt_block, space_id)) {
3369 error = DB_CORRUPTION;
3370 goto err_exit;
3371 } else {
3372 buf = static_cast<mrec_buf_t*>(
3373 mem_heap_alloc(heap, sizeof *buf));
3374 }
3375 }
3376
3377
3378 for (;;) {
3379 const mrec_t* mrec;
3380 ulint n_ext;
3381 mtr_t mtr;
3382
3383 if (stage != NULL) {
3384 stage->inc();
3385 }
3386
3387 if (row_buf != NULL) {
3388 if (n_rows >= row_buf->n_tuples) {
3389 break;
3390 }
3391
3392 /* Convert merge tuple record from
3393 row buffer to data tuple record */
3394 row_merge_mtuple_to_dtuple(
3395 index, dtuple, &row_buf->tuples[n_rows]);
3396
3397 n_ext = dtuple_get_n_ext(dtuple);
3398 n_rows++;
3399 /* BLOB pointers must be copied from dtuple */
3400 mrec = NULL;
3401 } else {
3402 b = row_merge_read_rec(block, crypt_block, space_id,
3403 buf, b, index, fd, &foffs, &mrec,
3404 offsets);
3405 if (UNIV_UNLIKELY(!b)) {
3406 /* End of list, or I/O error */
3407 if (mrec) {
3408 error = DB_CORRUPTION;
3409 }
3410 break;
3411 }
3412
3413 dtuple = row_rec_to_index_entry_low(
3414 mrec, index, offsets, &n_ext, tuple_heap);
3415 }
3416
3417 dict_index_t* old_index
3418 = dict_table_get_first_index(old_table);
3419
3420 if (dict_index_is_clust(index)
3421 && dict_index_is_online_ddl(old_index)) {
3422 error = row_log_table_get_error(old_index);
3423 if (error != DB_SUCCESS) {
3424 break;
3425 }
3426 }
3427
3428 if (!n_ext) {
3429 /* There are no externally stored columns. */
3430 } else {
3431 ut_ad(dict_index_is_clust(index));
3432 /* Off-page columns can be fetched safely
3433 when concurrent modifications to the table
3434 are disabled. (Purge can process delete-marked
3435 records, but row_merge_read_clustered_index()
3436 would have skipped them.)
3437
3438 When concurrent modifications are enabled,
3439 row_merge_read_clustered_index() will
3440 only see rows from transactions that were
3441 committed before the ALTER TABLE started
3442 (REPEATABLE READ).
3443
3444 Any modifications after the
3445 row_merge_read_clustered_index() scan
3446 will go through row_log_table_apply().
3447 Any modifications to off-page columns
3448 will be tracked by
3449 row_log_table_blob_alloc() and
3450 row_log_table_blob_free(). */
3451 row_merge_copy_blobs(
3452 mrec, offsets,
3453 dict_table_page_size(old_table),
3454 dtuple, tuple_heap);
3455 }
3456
3457 ut_ad(dtuple_validate(dtuple));
3458
3459 error = btr_bulk->insert(dtuple);
3460
3461 if (error != DB_SUCCESS) {
3462 goto err_exit;
3463 }
3464
3465 mem_heap_empty(tuple_heap);
3466 }
3467
3468 err_exit:
3469 mem_heap_free(tuple_heap);
3470 mem_heap_free(heap);
3471
3472 DBUG_RETURN(error);
3473 }
3474
3475 /*********************************************************************//**
3476 Sets an exclusive lock on a table, for the duration of creating indexes.
3477 @return error code or DB_SUCCESS */
3478 dberr_t
row_merge_lock_table(trx_t * trx,dict_table_t * table,enum lock_mode mode)3479 row_merge_lock_table(
3480 /*=================*/
3481 trx_t* trx, /*!< in/out: transaction */
3482 dict_table_t* table, /*!< in: table to lock */
3483 enum lock_mode mode) /*!< in: LOCK_X or LOCK_S */
3484 {
3485 ut_ad(!srv_read_only_mode);
3486 ut_ad(mode == LOCK_X || mode == LOCK_S);
3487
3488 trx->op_info = "setting table lock for creating or dropping index";
3489 trx->ddl = true;
3490 /* Trx for DDL should not be forced to rollback for now */
3491 trx->in_innodb |= TRX_FORCE_ROLLBACK_DISABLE;
3492
3493 return(lock_table_for_trx(table, trx, mode));
3494 }
3495
3496 /*********************************************************************//**
3497 Drop an index that was created before an error occurred.
3498 The data dictionary must have been locked exclusively by the caller,
3499 because the transaction will not be committed. */
3500 static
3501 void
row_merge_drop_index_dict(trx_t * trx,index_id_t index_id)3502 row_merge_drop_index_dict(
3503 /*======================*/
3504 trx_t* trx, /*!< in/out: dictionary transaction */
3505 index_id_t index_id)/*!< in: index identifier */
3506 {
3507 static const char sql[] =
3508 "PROCEDURE DROP_INDEX_PROC () IS\n"
3509 "BEGIN\n"
3510 "DELETE FROM SYS_FIELDS WHERE INDEX_ID=:indexid;\n"
3511 "DELETE FROM SYS_INDEXES WHERE ID=:indexid;\n"
3512 "END;\n";
3513 dberr_t error;
3514 pars_info_t* info;
3515
3516 ut_ad(!srv_read_only_mode);
3517 ut_ad(mutex_own(&dict_sys->mutex));
3518 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3519 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3520 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3521
3522 info = pars_info_create();
3523 pars_info_add_ull_literal(info, "indexid", index_id);
3524 trx->op_info = "dropping index from dictionary";
3525 error = que_eval_sql(info, sql, FALSE, trx);
3526
3527 if (error != DB_SUCCESS) {
3528 /* Even though we ensure that DDL transactions are WAIT
3529 and DEADLOCK free, we could encounter other errors e.g.,
3530 DB_TOO_MANY_CONCURRENT_TRXS. */
3531 trx->error_state = DB_SUCCESS;
3532
3533 ib::error() << "row_merge_drop_index_dict failed with error "
3534 << error;
3535 }
3536
3537 trx->op_info = "";
3538 }
3539
3540 /*********************************************************************//**
3541 Drop indexes that were created before an error occurred.
3542 The data dictionary must have been locked exclusively by the caller,
3543 because the transaction will not be committed. */
3544 void
row_merge_drop_indexes_dict(trx_t * trx,table_id_t table_id)3545 row_merge_drop_indexes_dict(
3546 /*========================*/
3547 trx_t* trx, /*!< in/out: dictionary transaction */
3548 table_id_t table_id)/*!< in: table identifier */
3549 {
3550 static const char sql[] =
3551 "PROCEDURE DROP_INDEXES_PROC () IS\n"
3552 "ixid CHAR;\n"
3553 "found INT;\n"
3554
3555 "DECLARE CURSOR index_cur IS\n"
3556 " SELECT ID FROM SYS_INDEXES\n"
3557 " WHERE TABLE_ID=:tableid AND\n"
3558 " SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
3559 "FOR UPDATE;\n"
3560
3561 "BEGIN\n"
3562 "found := 1;\n"
3563 "OPEN index_cur;\n"
3564 "WHILE found = 1 LOOP\n"
3565 " FETCH index_cur INTO ixid;\n"
3566 " IF (SQL % NOTFOUND) THEN\n"
3567 " found := 0;\n"
3568 " ELSE\n"
3569 " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
3570 " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
3571 " END IF;\n"
3572 "END LOOP;\n"
3573 "CLOSE index_cur;\n"
3574
3575 "END;\n";
3576 dberr_t error;
3577 pars_info_t* info;
3578
3579 ut_ad(!srv_read_only_mode);
3580 ut_ad(mutex_own(&dict_sys->mutex));
3581 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3582 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3583 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3584
3585 /* It is possible that table->n_ref_count > 1 when
3586 locked=TRUE. In this case, all code that should have an open
3587 handle to the table be waiting for the next statement to execute,
3588 or waiting for a meta-data lock.
3589
3590 A concurrent purge will be prevented by dict_operation_lock. */
3591
3592 info = pars_info_create();
3593 pars_info_add_ull_literal(info, "tableid", table_id);
3594 trx->op_info = "dropping indexes";
3595 error = que_eval_sql(info, sql, FALSE, trx);
3596
3597 switch (error) {
3598 case DB_SUCCESS:
3599 break;
3600 default:
3601 /* Even though we ensure that DDL transactions are WAIT
3602 and DEADLOCK free, we could encounter other errors e.g.,
3603 DB_TOO_MANY_CONCURRENT_TRXS. */
3604 ib::error() << "row_merge_drop_indexes_dict failed with error "
3605 << error;
3606 /* fall through */
3607 case DB_TOO_MANY_CONCURRENT_TRXS:
3608 trx->error_state = DB_SUCCESS;
3609 }
3610
3611 trx->op_info = "";
3612 }
3613
3614 /*********************************************************************//**
3615 Drop indexes that were created before an error occurred.
3616 The data dictionary must have been locked exclusively by the caller,
3617 because the transaction will not be committed. */
3618 void
row_merge_drop_indexes(trx_t * trx,dict_table_t * table,ibool locked)3619 row_merge_drop_indexes(
3620 /*===================*/
3621 trx_t* trx, /*!< in/out: dictionary transaction */
3622 dict_table_t* table, /*!< in/out: table containing the indexes */
3623 ibool locked) /*!< in: TRUE=table locked,
3624 FALSE=may need to do a lazy drop */
3625 {
3626 dict_index_t* index;
3627 dict_index_t* next_index;
3628
3629 ut_ad(!srv_read_only_mode);
3630 ut_ad(mutex_own(&dict_sys->mutex));
3631 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3632 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3633 ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3634
3635 index = dict_table_get_first_index(table);
3636 ut_ad(dict_index_is_clust(index));
3637 ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE);
3638
3639 /* the caller should have an open handle to the table */
3640 ut_ad(table->get_ref_count() >= 1);
3641
3642 /* It is possible that table->n_ref_count > 1 when
3643 locked=TRUE. In this case, all code that should have an open
3644 handle to the table be waiting for the next statement to execute,
3645 or waiting for a meta-data lock.
3646
3647 A concurrent purge will be prevented by dict_operation_lock. */
3648
3649 if (!locked && table->get_ref_count() > 1) {
3650 /* We will have to drop the indexes later, when the
3651 table is guaranteed to be no longer in use. Mark the
3652 indexes as incomplete and corrupted, so that other
3653 threads will stop using them. Let dict_table_close()
3654 or crash recovery or the next invocation of
3655 prepare_inplace_alter_table() take care of dropping
3656 the indexes. */
3657
3658 while ((index = dict_table_get_next_index(index)) != NULL) {
3659 ut_ad(!dict_index_is_clust(index));
3660
3661 switch (dict_index_get_online_status(index)) {
3662 case ONLINE_INDEX_ABORTED_DROPPED:
3663 continue;
3664 case ONLINE_INDEX_COMPLETE:
3665 if (index->is_committed()) {
3666 /* Do nothing to already
3667 published indexes. */
3668 } else if (index->type & DICT_FTS) {
3669 /* Drop a completed FULLTEXT
3670 index, due to a timeout during
3671 MDL upgrade for
3672 commit_inplace_alter_table().
3673 Because only concurrent reads
3674 are allowed (and they are not
3675 seeing this index yet) we
3676 are safe to drop the index. */
3677 dict_index_t* prev = UT_LIST_GET_PREV(
3678 indexes, index);
3679 /* At least there should be
3680 the clustered index before
3681 this one. */
3682 ut_ad(prev);
3683 ut_a(table->fts);
3684 fts_drop_index(table, index, trx);
3685 /* Since
3686 INNOBASE_SHARE::idx_trans_tbl
3687 is shared between all open
3688 ha_innobase handles to this
3689 table, no thread should be
3690 accessing this dict_index_t
3691 object. Also, we should be
3692 holding LOCK=SHARED MDL on the
3693 table even after the MDL
3694 upgrade timeout. */
3695
3696 /* We can remove a DICT_FTS
3697 index from the cache, because
3698 we do not allow ADD FULLTEXT INDEX
3699 with LOCK=NONE. If we allowed that,
3700 we should exclude FTS entries from
3701 prebuilt->ins_node->entry_list
3702 in ins_node_create_entry_list(). */
3703 dict_index_remove_from_cache(
3704 table, index);
3705 index = prev;
3706 } else {
3707 rw_lock_x_lock(
3708 dict_index_get_lock(index));
3709 dict_index_set_online_status(
3710 index, ONLINE_INDEX_ABORTED);
3711 index->type |= DICT_CORRUPT;
3712 table->drop_aborted = TRUE;
3713 goto drop_aborted;
3714 }
3715 continue;
3716 case ONLINE_INDEX_CREATION:
3717 rw_lock_x_lock(dict_index_get_lock(index));
3718 ut_ad(!index->is_committed());
3719 row_log_abort_sec(index);
3720 drop_aborted:
3721 rw_lock_x_unlock(dict_index_get_lock(index));
3722
3723 DEBUG_SYNC_C("merge_drop_index_after_abort");
3724 /* covered by dict_sys->mutex */
3725 MONITOR_INC(MONITOR_BACKGROUND_DROP_INDEX);
3726 /* fall through */
3727 case ONLINE_INDEX_ABORTED:
3728 /* Drop the index tree from the
3729 data dictionary and free it from
3730 the tablespace, but keep the object
3731 in the data dictionary cache. */
3732 row_merge_drop_index_dict(trx, index->id);
3733 rw_lock_x_lock(dict_index_get_lock(index));
3734 dict_index_set_online_status(
3735 index, ONLINE_INDEX_ABORTED_DROPPED);
3736 rw_lock_x_unlock(dict_index_get_lock(index));
3737 table->drop_aborted = TRUE;
3738 continue;
3739 }
3740 ut_error;
3741 }
3742
3743 return;
3744 }
3745
3746 row_merge_drop_indexes_dict(trx, table->id);
3747
3748 /* Invalidate all row_prebuilt_t::ins_graph that are referring
3749 to this table. That is, force row_get_prebuilt_insert_row() to
3750 rebuild prebuilt->ins_node->entry_list). */
3751 ut_ad(table->def_trx_id <= trx->id);
3752 table->def_trx_id = trx->id;
3753
3754 next_index = dict_table_get_next_index(index);
3755
3756 while ((index = next_index) != NULL) {
3757 /* read the next pointer before freeing the index */
3758 next_index = dict_table_get_next_index(index);
3759
3760 ut_ad(!dict_index_is_clust(index));
3761
3762 if (!index->is_committed()) {
3763 /* If it is FTS index, drop from table->fts
3764 and also drop its auxiliary tables */
3765 if (index->type & DICT_FTS) {
3766 ut_a(table->fts);
3767 fts_drop_index(table, index, trx);
3768 }
3769
3770 switch (dict_index_get_online_status(index)) {
3771 case ONLINE_INDEX_CREATION:
3772 /* This state should only be possible
3773 when prepare_inplace_alter_table() fails
3774 after invoking row_merge_create_index().
3775 In inplace_alter_table(),
3776 row_merge_build_indexes()
3777 should never leave the index in this state.
3778 It would invoke row_log_abort_sec() on
3779 failure. */
3780 case ONLINE_INDEX_COMPLETE:
3781 /* In these cases, we are able to drop
3782 the index straight. The DROP INDEX was
3783 never deferred. */
3784 break;
3785 case ONLINE_INDEX_ABORTED:
3786 case ONLINE_INDEX_ABORTED_DROPPED:
3787 /* covered by dict_sys->mutex */
3788 MONITOR_DEC(MONITOR_BACKGROUND_DROP_INDEX);
3789 }
3790
3791 dict_index_remove_from_cache(table, index);
3792 }
3793 }
3794
3795 table->drop_aborted = FALSE;
3796 ut_d(dict_table_check_for_dup_indexes(table, CHECK_ALL_COMPLETE));
3797 }
3798
3799 /*********************************************************************//**
3800 Drop all partially created indexes during crash recovery. */
3801 void
row_merge_drop_temp_indexes(void)3802 row_merge_drop_temp_indexes(void)
3803 /*=============================*/
3804 {
3805 static const char sql[] =
3806 "PROCEDURE DROP_TEMP_INDEXES_PROC () IS\n"
3807 "ixid CHAR;\n"
3808 "found INT;\n"
3809
3810 "DECLARE CURSOR index_cur IS\n"
3811 " SELECT ID FROM SYS_INDEXES\n"
3812 " WHERE SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
3813 "FOR UPDATE;\n"
3814
3815 "BEGIN\n"
3816 "found := 1;\n"
3817 "OPEN index_cur;\n"
3818 "WHILE found = 1 LOOP\n"
3819 " FETCH index_cur INTO ixid;\n"
3820 " IF (SQL % NOTFOUND) THEN\n"
3821 " found := 0;\n"
3822 " ELSE\n"
3823 " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
3824 " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
3825 " END IF;\n"
3826 "END LOOP;\n"
3827 "CLOSE index_cur;\n"
3828 "END;\n";
3829 trx_t* trx;
3830 dberr_t error;
3831
3832 /* Load the table definitions that contain partially defined
3833 indexes, so that the data dictionary information can be checked
3834 when accessing the tablename.ibd files. */
3835 trx = trx_allocate_for_background();
3836 trx->op_info = "dropping partially created indexes";
3837 row_mysql_lock_data_dictionary(trx);
3838 /* Ensure that this transaction will be rolled back and locks
3839 will be released, if the server gets killed before the commit
3840 gets written to the redo log. */
3841 trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
3842
3843 trx->op_info = "dropping indexes";
3844 error = que_eval_sql(NULL, sql, FALSE, trx);
3845
3846 if (error != DB_SUCCESS) {
3847 /* Even though we ensure that DDL transactions are WAIT
3848 and DEADLOCK free, we could encounter other errors e.g.,
3849 DB_TOO_MANY_CONCURRENT_TRXS. */
3850 trx->error_state = DB_SUCCESS;
3851
3852 ib::error() << "row_merge_drop_temp_indexes failed with error"
3853 << error;
3854 }
3855
3856 trx_commit_for_mysql(trx);
3857 row_mysql_unlock_data_dictionary(trx);
3858 trx_free_for_background(trx);
3859 }
3860
3861
3862 /** Create temporary merge files in the given paramater path, and if
3863 UNIV_PFS_IO defined, register the file descriptor with Performance Schema.
3864 @param[in] path location for creating temporary merge files.
3865 @return File descriptor */
3866 int
row_merge_file_create_low(const char * path)3867 row_merge_file_create_low(
3868 const char* path)
3869 {
3870 int fd;
3871 if (path == NULL) {
3872 path = innobase_mysql_tmpdir();
3873 }
3874 #ifdef UNIV_PFS_IO
3875 /* This temp file open does not go through normal
3876 file APIs, add instrumentation to register with
3877 performance schema */
3878 struct PSI_file_locker* locker = NULL;
3879 Datafile df;
3880 df.make_filepath(path, "Innodb Merge Temp File", NO_EXT);
3881
3882 PSI_file_locker_state state;
3883 locker = PSI_FILE_CALL(get_thread_file_name_locker)(
3884 &state, innodb_temp_file_key.m_value, PSI_FILE_OPEN, df.filepath(),
3885 &locker);
3886 if (locker != NULL) {
3887 PSI_FILE_CALL(start_file_open_wait)(locker,
3888 __FILE__,
3889 __LINE__);
3890 }
3891 #endif
3892 fd = innobase_mysql_tmpfile(path);
3893 #ifdef UNIV_PFS_IO
3894 if (locker != NULL) {
3895 PSI_FILE_CALL(end_file_open_wait_and_bind_to_descriptor)(
3896 locker, fd);
3897 }
3898 #endif
3899
3900 if (fd < 0) {
3901 ib::error() << "Cannot create temporary merge file";
3902 return(-1);
3903 }
3904 return(fd);
3905 }
3906
3907
3908 /** Create a merge file in the given location.
3909 @param[out] merge_file merge file structure
3910 @param[in] path location for creating temporary file
3911 @return file descriptor, or -1 on failure */
3912 int
row_merge_file_create(merge_file_t * merge_file,const char * path)3913 row_merge_file_create(
3914 merge_file_t* merge_file,
3915 const char* path)
3916 {
3917 merge_file->fd = row_merge_file_create_low(path);
3918 merge_file->offset = 0;
3919 merge_file->n_rec = 0;
3920
3921 if (merge_file->fd >= 0) {
3922 if (srv_disable_sort_file_cache) {
3923 os_file_set_nocache(merge_file->fd,
3924 "row0merge.cc", "sort");
3925 }
3926 }
3927 return(merge_file->fd);
3928 }
3929
3930 /*********************************************************************//**
3931 Destroy a merge file. And de-register the file from Performance Schema
3932 if UNIV_PFS_IO is defined. */
3933 void
row_merge_file_destroy_low(int fd)3934 row_merge_file_destroy_low(
3935 /*=======================*/
3936 int fd) /*!< in: merge file descriptor */
3937 {
3938 #ifdef UNIV_PFS_IO
3939 struct PSI_file_locker* locker = NULL;
3940 PSI_file_locker_state state;
3941 locker = PSI_FILE_CALL(get_thread_file_descriptor_locker)(
3942 &state, fd, PSI_FILE_CLOSE);
3943 if (locker != NULL) {
3944 PSI_FILE_CALL(start_file_wait)(
3945 locker, 0, __FILE__, __LINE__);
3946 }
3947 #endif
3948 if (fd >= 0) {
3949 close(fd);
3950 }
3951 #ifdef UNIV_PFS_IO
3952 if (locker != NULL) {
3953 PSI_FILE_CALL(end_file_wait)(locker, 0);
3954 }
3955 #endif
3956 }
3957 /*********************************************************************//**
3958 Destroy a merge file. */
3959 void
row_merge_file_destroy(merge_file_t * merge_file)3960 row_merge_file_destroy(
3961 /*===================*/
3962 merge_file_t* merge_file) /*!< in/out: merge file structure */
3963 {
3964 ut_ad(!srv_read_only_mode);
3965
3966 if (merge_file->fd != -1) {
3967 row_merge_file_destroy_low(merge_file->fd);
3968 merge_file->fd = -1;
3969 }
3970 }
3971
3972 /*********************************************************************//**
3973 Rename an index in the dictionary that was created. The data
3974 dictionary must have been locked exclusively by the caller, because
3975 the transaction will not be committed.
3976 @return DB_SUCCESS if all OK */
3977 dberr_t
row_merge_rename_index_to_add(trx_t * trx,table_id_t table_id,index_id_t index_id)3978 row_merge_rename_index_to_add(
3979 /*==========================*/
3980 trx_t* trx, /*!< in/out: transaction */
3981 table_id_t table_id, /*!< in: table identifier */
3982 index_id_t index_id) /*!< in: index identifier */
3983 {
3984 dberr_t err = DB_SUCCESS;
3985 pars_info_t* info = pars_info_create();
3986
3987 /* We use the private SQL parser of Innobase to generate the
3988 query graphs needed in renaming indexes. */
3989
3990 static const char rename_index[] =
3991 "PROCEDURE RENAME_INDEX_PROC () IS\n"
3992 "BEGIN\n"
3993 "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
3994 "WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
3995 "END;\n";
3996
3997 ut_ad(trx);
3998 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
3999 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
4000
4001 trx->op_info = "renaming index to add";
4002
4003 pars_info_add_ull_literal(info, "tableid", table_id);
4004 pars_info_add_ull_literal(info, "indexid", index_id);
4005
4006 err = que_eval_sql(info, rename_index, FALSE, trx);
4007
4008 if (err != DB_SUCCESS) {
4009 /* Even though we ensure that DDL transactions are WAIT
4010 and DEADLOCK free, we could encounter other errors e.g.,
4011 DB_TOO_MANY_CONCURRENT_TRXS. */
4012 trx->error_state = DB_SUCCESS;
4013
4014 ib::error() << "row_merge_rename_index_to_add failed with"
4015 " error " << err;
4016 }
4017
4018 trx->op_info = "";
4019
4020 return(err);
4021 }
4022
4023 /*********************************************************************//**
4024 Rename an index in the dictionary that is to be dropped. The data
4025 dictionary must have been locked exclusively by the caller, because
4026 the transaction will not be committed.
4027 @return DB_SUCCESS if all OK */
4028 dberr_t
row_merge_rename_index_to_drop(trx_t * trx,table_id_t table_id,index_id_t index_id)4029 row_merge_rename_index_to_drop(
4030 /*===========================*/
4031 trx_t* trx, /*!< in/out: transaction */
4032 table_id_t table_id, /*!< in: table identifier */
4033 index_id_t index_id) /*!< in: index identifier */
4034 {
4035 dberr_t err;
4036 pars_info_t* info = pars_info_create();
4037
4038 ut_ad(!srv_read_only_mode);
4039
4040 /* We use the private SQL parser of Innobase to generate the
4041 query graphs needed in renaming indexes. */
4042
4043 static const char rename_index[] =
4044 "PROCEDURE RENAME_INDEX_PROC () IS\n"
4045 "BEGIN\n"
4046 "UPDATE SYS_INDEXES SET NAME=CONCAT('"
4047 TEMP_INDEX_PREFIX_STR "',NAME)\n"
4048 "WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
4049 "END;\n";
4050
4051 ut_ad(trx);
4052 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
4053 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
4054
4055 trx->op_info = "renaming index to drop";
4056
4057 pars_info_add_ull_literal(info, "tableid", table_id);
4058 pars_info_add_ull_literal(info, "indexid", index_id);
4059
4060 err = que_eval_sql(info, rename_index, FALSE, trx);
4061
4062 if (err != DB_SUCCESS) {
4063 /* Even though we ensure that DDL transactions are WAIT
4064 and DEADLOCK free, we could encounter other errors e.g.,
4065 DB_TOO_MANY_CONCURRENT_TRXS. */
4066 trx->error_state = DB_SUCCESS;
4067
4068 ib::error() << "row_merge_rename_index_to_drop failed with"
4069 " error " << err;
4070 }
4071
4072 trx->op_info = "";
4073
4074 return(err);
4075 }
4076
4077 /*********************************************************************//**
4078 Provide a new pathname for a table that is being renamed if it belongs to
4079 a file-per-table tablespace. The caller is responsible for freeing the
4080 memory allocated for the return value.
4081 @return new pathname of tablespace file, or NULL if space = 0 */
4082 char*
row_make_new_pathname(dict_table_t * table,const char * new_name)4083 row_make_new_pathname(
4084 /*==================*/
4085 dict_table_t* table, /*!< in: table to be renamed */
4086 const char* new_name) /*!< in: new name */
4087 {
4088 char* new_path;
4089 char* old_path;
4090
4091 ut_ad(!is_system_tablespace(table->space));
4092
4093 old_path = fil_space_get_first_path(table->space);
4094 ut_a(old_path);
4095
4096 new_path = os_file_make_new_pathname(old_path, new_name);
4097
4098 ut_free(old_path);
4099
4100 return(new_path);
4101 }
4102
4103 /*********************************************************************//**
4104 Rename the tables in the data dictionary. The data dictionary must
4105 have been locked exclusively by the caller, because the transaction
4106 will not be committed.
4107 @return error code or DB_SUCCESS */
4108 dberr_t
row_merge_rename_tables_dict(dict_table_t * old_table,dict_table_t * new_table,const char * tmp_name,trx_t * trx)4109 row_merge_rename_tables_dict(
4110 /*=========================*/
4111 dict_table_t* old_table, /*!< in/out: old table, renamed to
4112 tmp_name */
4113 dict_table_t* new_table, /*!< in/out: new table, renamed to
4114 old_table->name */
4115 const char* tmp_name, /*!< in: new name for old_table */
4116 trx_t* trx) /*!< in/out: dictionary transaction */
4117 {
4118 dberr_t err = DB_ERROR;
4119 pars_info_t* info;
4120
4121 ut_ad(!srv_read_only_mode);
4122 ut_ad(old_table != new_table);
4123 ut_ad(mutex_own(&dict_sys->mutex));
4124 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
4125 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_TABLE
4126 || trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
4127
4128 trx->op_info = "renaming tables";
4129
4130 /* We use the private SQL parser of Innobase to generate the query
4131 graphs needed in updating the dictionary data in system tables. */
4132
4133 info = pars_info_create();
4134
4135 pars_info_add_str_literal(info, "new_name", new_table->name.m_name);
4136 pars_info_add_str_literal(info, "old_name", old_table->name.m_name);
4137 pars_info_add_str_literal(info, "tmp_name", tmp_name);
4138
4139 err = que_eval_sql(info,
4140 "PROCEDURE RENAME_TABLES () IS\n"
4141 "BEGIN\n"
4142 "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
4143 " WHERE NAME = :old_name;\n"
4144 "UPDATE SYS_TABLES SET NAME = :old_name\n"
4145 " WHERE NAME = :new_name;\n"
4146 "END;\n", FALSE, trx);
4147
4148 /* Update SYS_TABLESPACES and SYS_DATAFILES if the old table being
4149 renamed is a single-table tablespace, which must be implicitly
4150 renamed along with the table. */
4151 if (err == DB_SUCCESS
4152 && dict_table_is_file_per_table(old_table)
4153 && !old_table->file_unreadable) {
4154 /* Make pathname to update SYS_DATAFILES. */
4155 char* tmp_path = row_make_new_pathname(old_table, tmp_name);
4156
4157 info = pars_info_create();
4158
4159 pars_info_add_str_literal(info, "tmp_name", tmp_name);
4160 pars_info_add_str_literal(info, "tmp_path", tmp_path);
4161 pars_info_add_int4_literal(info, "old_space",
4162 (lint) old_table->space);
4163
4164 err = que_eval_sql(info,
4165 "PROCEDURE RENAME_OLD_SPACE () IS\n"
4166 "BEGIN\n"
4167 "UPDATE SYS_TABLESPACES"
4168 " SET NAME = :tmp_name\n"
4169 " WHERE SPACE = :old_space;\n"
4170 "UPDATE SYS_DATAFILES"
4171 " SET PATH = :tmp_path\n"
4172 " WHERE SPACE = :old_space;\n"
4173 "END;\n", FALSE, trx);
4174
4175 ut_free(tmp_path);
4176 }
4177
4178 /* Update SYS_TABLESPACES and SYS_DATAFILES if the new table being
4179 renamed is a single-table tablespace, which must be implicitly
4180 renamed along with the table. */
4181 if (err == DB_SUCCESS
4182 && dict_table_is_file_per_table(new_table)) {
4183 /* Make pathname to update SYS_DATAFILES. */
4184 char* old_path = row_make_new_pathname(
4185 new_table, old_table->name.m_name);
4186
4187 info = pars_info_create();
4188
4189 pars_info_add_str_literal(info, "old_name",
4190 old_table->name.m_name);
4191 pars_info_add_str_literal(info, "old_path", old_path);
4192 pars_info_add_int4_literal(info, "new_space",
4193 (lint) new_table->space);
4194
4195 err = que_eval_sql(info,
4196 "PROCEDURE RENAME_NEW_SPACE () IS\n"
4197 "BEGIN\n"
4198 "UPDATE SYS_TABLESPACES"
4199 " SET NAME = :old_name\n"
4200 " WHERE SPACE = :new_space;\n"
4201 "UPDATE SYS_DATAFILES"
4202 " SET PATH = :old_path\n"
4203 " WHERE SPACE = :new_space;\n"
4204 "END;\n", FALSE, trx);
4205
4206 ut_free(old_path);
4207 }
4208
4209 if (err == DB_SUCCESS && dict_table_is_discarded(new_table)) {
4210 err = row_import_update_discarded_flag(
4211 trx, new_table->id, true, true);
4212 }
4213
4214 trx->op_info = "";
4215
4216 return(err);
4217 }
4218
4219 /** Create and execute a query graph for creating an index.
4220 @param[in,out] trx trx
4221 @param[in,out] table table
4222 @param[in,out] index index
4223 @param[in] add_v new virtual columns added along with add index call
4224 @return DB_SUCCESS or error code */
4225 static MY_ATTRIBUTE((warn_unused_result))
4226 dberr_t
row_merge_create_index_graph(trx_t * trx,dict_table_t * table,dict_index_t * index,const dict_add_v_col_t * add_v)4227 row_merge_create_index_graph(
4228 trx_t* trx,
4229 dict_table_t* table,
4230 dict_index_t* index,
4231 const dict_add_v_col_t* add_v)
4232 {
4233 ind_node_t* node; /*!< Index creation node */
4234 mem_heap_t* heap; /*!< Memory heap */
4235 que_thr_t* thr; /*!< Query thread */
4236 dberr_t err;
4237
4238 DBUG_ENTER("row_merge_create_index_graph");
4239
4240 ut_ad(trx);
4241 ut_ad(table);
4242 ut_ad(index);
4243
4244 heap = mem_heap_create(512);
4245
4246 index->table = table;
4247 node = ind_create_graph_create(index, heap, add_v);
4248 thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
4249
4250 ut_a(thr == que_fork_start_command(
4251 static_cast<que_fork_t*>(que_node_get_parent(thr))));
4252
4253 que_run_threads(thr);
4254
4255 err = trx->error_state;
4256
4257 que_graph_free((que_t*) que_node_get_parent(thr));
4258
4259 DBUG_RETURN(err);
4260 }
4261
4262 /** Create the index and load in to the dictionary.
4263 @param[in,out] trx trx (sets error_state)
4264 @param[in,out] table the index is on this table
4265 @param[in] index_def the index definition
4266 @param[in] add_v new virtual columns added along with add
4267 index call
4268 @return index, or NULL on error */
4269 dict_index_t*
row_merge_create_index(trx_t * trx,dict_table_t * table,const index_def_t * index_def,const dict_add_v_col_t * add_v)4270 row_merge_create_index(
4271 trx_t* trx,
4272 dict_table_t* table,
4273 const index_def_t* index_def,
4274 const dict_add_v_col_t* add_v)
4275 {
4276 dict_index_t* index;
4277 dberr_t err;
4278 ulint n_fields = index_def->n_fields;
4279 ulint i;
4280 bool has_new_v_col = false;
4281
4282 DBUG_ENTER("row_merge_create_index");
4283
4284 ut_ad(!srv_read_only_mode);
4285
4286 /* Create the index prototype, using the passed in def, this is not
4287 a persistent operation. We pass 0 as the space id, and determine at
4288 a lower level the space id where to store the table. */
4289
4290 index = dict_mem_index_create(table->name.m_name, index_def->name,
4291 0, index_def->ind_type, n_fields);
4292
4293 ut_a(index);
4294
4295 index->set_committed(index_def->rebuild);
4296
4297 for (i = 0; i < n_fields; i++) {
4298 const char* name;
4299 index_field_t* ifield = &index_def->fields[i];
4300
4301 if (ifield->is_v_col) {
4302 if (ifield->col_no >= table->n_v_def) {
4303 ut_ad(ifield->col_no < table->n_v_def
4304 + add_v->n_v_col);
4305 ut_ad(ifield->col_no >= table->n_v_def);
4306 name = add_v->v_col_name[
4307 ifield->col_no - table->n_v_def];
4308
4309 has_new_v_col = true;
4310 } else {
4311 name = dict_table_get_v_col_name(
4312 table, ifield->col_no);
4313 }
4314 } else {
4315 name = dict_table_get_col_name(table, ifield->col_no);
4316
4317 }
4318
4319 dict_mem_index_add_field(index, name, ifield->prefix_len);
4320 }
4321
4322 /* Add the index to SYS_INDEXES, using the index prototype. */
4323 err = row_merge_create_index_graph(trx, table, index, add_v);
4324
4325 if (err == DB_SUCCESS) {
4326
4327 index = dict_table_get_index_on_name(table, index_def->name,
4328 index_def->rebuild);
4329
4330 ut_a(index);
4331
4332 index->parser = index_def->parser;
4333 index->is_ngram = index_def->is_ngram;
4334 index->has_new_v_col = has_new_v_col;
4335
4336 /* Note the id of the transaction that created this
4337 index, we use it to restrict readers from accessing
4338 this index, to ensure read consistency. */
4339 ut_ad(index->trx_id == trx->id);
4340 } else {
4341 /* In case we were unable to assign an undo record for this index
4342 we won't free index memory object */
4343 if (err == DB_TOO_MANY_CONCURRENT_TRXS) {
4344 dict_mem_index_free(index);
4345 }
4346 index = NULL;
4347 }
4348
4349 DBUG_RETURN(index);
4350 }
4351
4352 /*********************************************************************//**
4353 Check if a transaction can use an index. */
4354 ibool
row_merge_is_index_usable(const trx_t * trx,const dict_index_t * index)4355 row_merge_is_index_usable(
4356 /*======================*/
4357 const trx_t* trx, /*!< in: transaction */
4358 const dict_index_t* index) /*!< in: index to check */
4359 {
4360 if (!dict_index_is_clust(index)
4361 && dict_index_is_online_ddl(index)) {
4362 /* Indexes that are being created are not useable. */
4363 return(FALSE);
4364 }
4365
4366 return(!dict_index_is_corrupted(index)
4367 && (dict_table_is_temporary(index->table)
4368 || index->trx_id == 0
4369 || !MVCC::is_view_active(trx->read_view)
4370 || trx->read_view->changes_visible(
4371 index->trx_id,
4372 index->table->name)));
4373 }
4374
4375 /*********************************************************************//**
4376 Drop a table. The caller must have ensured that the background stats
4377 thread is not processing the table. This can be done by calling
4378 dict_stats_wait_bg_to_stop_using_table() after locking the dictionary and
4379 before calling this function.
4380 @return DB_SUCCESS or error code */
4381 dberr_t
row_merge_drop_table(trx_t * trx,dict_table_t * table)4382 row_merge_drop_table(
4383 /*=================*/
4384 trx_t* trx, /*!< in: transaction */
4385 dict_table_t* table) /*!< in: table to drop */
4386 {
4387 ut_ad(!srv_read_only_mode);
4388
4389 /* There must be no open transactions on the table. */
4390 ut_a(table->get_ref_count() == 0);
4391
4392 return(row_drop_table_for_mysql(table->name.m_name,
4393 trx, false, false));
4394 }
4395
4396 /** Write an MLOG_INDEX_LOAD record to indicate in the redo-log
4397 that redo-logging of individual index pages was disabled, and
4398 the flushing of such pages to the data files was completed.
4399 @param[in] index an index tree on which redo logging was disabled */
4400 static
4401 void
row_merge_write_redo(const dict_index_t * index)4402 row_merge_write_redo(
4403 const dict_index_t* index)
4404 {
4405 mtr_t mtr;
4406 byte* log_ptr;
4407
4408 ut_ad(!dict_table_is_temporary(index->table));
4409 mtr.start();
4410 log_ptr = mlog_open(&mtr, 11 + 8);
4411 log_ptr = mlog_write_initial_log_record_low(
4412 MLOG_INDEX_LOAD,
4413 index->space, index->page, log_ptr, &mtr);
4414 mach_write_to_8(log_ptr, index->id);
4415 mlog_close(&mtr, log_ptr + 8);
4416 mtr.commit();
4417 }
4418
4419 /** Build indexes on a table by reading a clustered index, creating a temporary
4420 file containing index entries, merge sorting these index entries and inserting
4421 sorted index entries to indexes.
4422 @param[in] trx transaction
4423 @param[in] old_table table where rows are read from
4424 @param[in] new_table table where indexes are created; identical to
4425 old_table unless creating a PRIMARY KEY
4426 @param[in] online true if creating indexes online
4427 @param[in] indexes indexes to be created
4428 @param[in] key_numbers MySQL key numbers
4429 @param[in] n_indexes size of indexes[]
4430 @param[in,out] table MySQL table, for reporting erroneous key value
4431 if applicable
4432 @param[in] add_cols default values of added columns, or NULL
4433 @param[in] col_map mapping of old column numbers to new ones, or
4434 NULL if old_table == new_table
4435 @param[in] add_autoinc number of added AUTO_INCREMENT columns, or
4436 ULINT_UNDEFINED if none is added
4437 @param[in,out] sequence autoinc sequence
4438 @param[in] skip_pk_sort whether the new PRIMARY KEY will follow
4439 existing order
4440 @param[in,out] stage performance schema accounting object, used by
4441 ALTER TABLE. stage->begin_phase_read_pk() will be called at the beginning of
4442 this function and it will be passed to other functions for further accounting.
4443 @param[in] add_v new virtual columns added along with indexes
4444 @param[in] eval_table mysql table used to evaluate virtual column
4445 value, see innobase_get_computed_value().
4446 @param[in] prebuilt compress_heap must be taken from here
4447 @return DB_SUCCESS or error code */
4448 dberr_t
row_merge_build_indexes(trx_t * trx,dict_table_t * old_table,dict_table_t * new_table,bool online,dict_index_t ** indexes,const ulint * key_numbers,ulint n_indexes,struct TABLE * table,const dtuple_t * add_cols,const ulint * col_map,ulint add_autoinc,ib_sequence_t & sequence,bool skip_pk_sort,ut_stage_alter_t * stage,const dict_add_v_col_t * add_v,struct TABLE * eval_table,row_prebuilt_t * prebuilt)4449 row_merge_build_indexes(
4450 trx_t* trx,
4451 dict_table_t* old_table,
4452 dict_table_t* new_table,
4453 bool online,
4454 dict_index_t** indexes,
4455 const ulint* key_numbers,
4456 ulint n_indexes,
4457 struct TABLE* table,
4458 const dtuple_t* add_cols,
4459 const ulint* col_map,
4460 ulint add_autoinc,
4461 ib_sequence_t& sequence,
4462 bool skip_pk_sort,
4463 ut_stage_alter_t* stage,
4464 const dict_add_v_col_t* add_v,
4465 struct TABLE* eval_table,
4466 row_prebuilt_t* prebuilt)
4467 {
4468 merge_file_t* merge_files;
4469 row_merge_block_t* block;
4470 ut_new_pfx_t block_pfx;
4471 ut_new_pfx_t crypt_pfx;
4472 ulint i;
4473 ulint j;
4474 dberr_t error;
4475 int tmpfd = -1;
4476 dict_index_t* fts_sort_idx = NULL;
4477 fts_psort_t* psort_info = NULL;
4478 fts_psort_t* merge_info = NULL;
4479 int64_t sig_count = 0;
4480 bool fts_psort_initiated = false;
4481 DBUG_ENTER("row_merge_build_indexes");
4482
4483 ut_ad(!srv_read_only_mode);
4484 ut_ad((old_table == new_table) == !col_map);
4485 ut_ad(!add_cols || col_map);
4486
4487 stage->begin_phase_read_pk(skip_pk_sort && new_table != old_table
4488 ? n_indexes - 1
4489 : n_indexes);
4490
4491 /* Allocate memory for merge file data structure and initialize
4492 fields */
4493
4494 ut_allocator<row_merge_block_t> alloc(mem_key_row_merge_sort);
4495
4496 /* This will allocate "3 * srv_sort_buf_size" elements of type
4497 row_merge_block_t. The latter is defined as byte. */
4498 block = alloc.allocate_large(3 * srv_sort_buf_size, &block_pfx, false);
4499
4500 if (block == NULL) {
4501 DBUG_RETURN(DB_OUT_OF_MEMORY);
4502 }
4503
4504 /* If online logs are set to be encrypted, allocate additional buffer
4505 for encryption/decryption. */
4506 row_merge_block_t* crypt_block = NULL;
4507
4508 if (log_tmp_is_encrypted()) {
4509 crypt_block = static_cast<row_merge_block_t*>(
4510 alloc.allocate_large(
4511 3 * srv_sort_buf_size, &crypt_pfx, false));
4512
4513 if (crypt_block == NULL) {
4514 DBUG_RETURN(DB_OUT_OF_MEMORY);
4515 }
4516 }
4517
4518 trx_start_if_not_started_xa(trx, true);
4519
4520 /* Check if we need a flush observer to flush dirty pages.
4521 Since we disable redo logging in bulk load, so we should flush
4522 dirty pages before online log apply, because online log apply enables
4523 redo logging(we can do further optimization here).
4524 1. online add index: flush dirty pages right before row_log_apply().
4525 2. table rebuild: flush dirty pages before row_log_table_apply().
4526
4527 we use bulk load to create all types of indexes except spatial index,
4528 for which redo logging is enabled. If we create only spatial indexes,
4529 we don't need to flush dirty pages at all. */
4530 bool need_flush_observer = (old_table != new_table);
4531
4532 for (i = 0; i < n_indexes; i++) {
4533 if (!dict_index_is_spatial(indexes[i])) {
4534 need_flush_observer = true;
4535 }
4536 }
4537
4538 FlushObserver* flush_observer = NULL;
4539 if (need_flush_observer) {
4540 flush_observer = UT_NEW_NOKEY(
4541 FlushObserver(new_table->space, trx, stage));
4542
4543 trx_set_flush_observer(trx, flush_observer);
4544 }
4545
4546 merge_files = static_cast<merge_file_t*>(
4547 ut_malloc_nokey(n_indexes * sizeof *merge_files));
4548
4549 /* Initialize all the merge file descriptors, so that we
4550 don't call row_merge_file_destroy() on uninitialized
4551 merge file descriptor */
4552
4553 for (i = 0; i < n_indexes; i++) {
4554 merge_files[i].fd = -1;
4555 }
4556
4557 for (i = 0; i < n_indexes; i++) {
4558 if (indexes[i]->type & DICT_FTS) {
4559 ibool opt_doc_id_size = FALSE;
4560
4561 /* To build FTS index, we would need to extract
4562 doc's word, Doc ID, and word's position, so
4563 we need to build a "fts sort index" indexing
4564 on above three 'fields' */
4565 fts_sort_idx = row_merge_create_fts_sort_index(
4566 indexes[i], old_table, &opt_doc_id_size);
4567
4568 row_merge_dup_t* dup
4569 = static_cast<row_merge_dup_t*>(
4570 ut_malloc_nokey(sizeof *dup));
4571 dup->index = fts_sort_idx;
4572 dup->table = table;
4573 dup->col_map = col_map;
4574 dup->n_dup = 0;
4575
4576 row_fts_psort_info_init(
4577 trx, dup, new_table, opt_doc_id_size,
4578 &psort_info, &merge_info);
4579
4580 /* We need to ensure that we free the resources
4581 allocated */
4582 fts_psort_initiated = true;
4583 }
4584 }
4585
4586 /* Reset the MySQL row buffer that is used when reporting
4587 duplicate keys. */
4588 innobase_rec_reset(table);
4589
4590 if (!old_table->is_readable() ||
4591 !new_table->is_readable()) {
4592 error = DB_DECRYPTION_FAILED;
4593 ib::warn() << "Table %s is encrypted but encryption service or"
4594 " used key_id is not available. "
4595 " Can't continue reading table.";
4596 goto func_exit;
4597 }
4598
4599 /* Read clustered index of the table and create files for
4600 secondary index entries for merge sort */
4601 error = row_merge_read_clustered_index(
4602 trx, table, old_table, new_table, online, indexes,
4603 fts_sort_idx, psort_info, merge_files, key_numbers,
4604 n_indexes, add_cols, add_v, col_map, add_autoinc,
4605 sequence, block, crypt_block, skip_pk_sort, &tmpfd, stage,
4606 eval_table, prebuilt);
4607
4608 stage->end_phase_read_pk();
4609
4610 if (error != DB_SUCCESS) {
4611
4612 goto func_exit;
4613 }
4614
4615 DEBUG_SYNC_C("row_merge_after_scan");
4616
4617 /* Now we have files containing index entries ready for
4618 sorting and inserting. */
4619
4620 for (i = 0; i < n_indexes; i++) {
4621 dict_index_t* sort_idx = indexes[i];
4622
4623 if (dict_index_is_spatial(sort_idx)) {
4624 continue;
4625 }
4626
4627 if (indexes[i]->type & DICT_FTS) {
4628 os_event_t fts_parallel_merge_event;
4629
4630 sort_idx = fts_sort_idx;
4631
4632 fts_parallel_merge_event
4633 = merge_info[0].psort_common->merge_event;
4634
4635 if (FTS_PLL_MERGE) {
4636 ulint trial_count = 0;
4637 bool all_exit = false;
4638
4639 os_event_reset(fts_parallel_merge_event);
4640 row_fts_start_parallel_merge(merge_info);
4641 wait_again:
4642 os_event_wait_time_low(
4643 fts_parallel_merge_event, 1000000,
4644 sig_count);
4645
4646 for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
4647 if (merge_info[j].child_status
4648 != FTS_CHILD_COMPLETE
4649 && merge_info[j].child_status
4650 != FTS_CHILD_EXITING) {
4651 sig_count = os_event_reset(
4652 fts_parallel_merge_event);
4653
4654 goto wait_again;
4655 }
4656 }
4657
4658 /* Now all children should complete, wait
4659 a bit until they all finish using event */
4660 while (!all_exit && trial_count < 10000) {
4661 all_exit = true;
4662
4663 for (j = 0; j < FTS_NUM_AUX_INDEX;
4664 j++) {
4665 if (merge_info[j].child_status
4666 != FTS_CHILD_EXITING) {
4667 all_exit = false;
4668 os_thread_sleep(1000);
4669 break;
4670 }
4671 }
4672 trial_count++;
4673 }
4674
4675 if (!all_exit) {
4676 ib::error() << "Not all child merge"
4677 " threads exited when creating"
4678 " FTS index '"
4679 << indexes[i]->name << "'";
4680 } else {
4681 for (j = 0; j < FTS_NUM_AUX_INDEX;
4682 j++) {
4683
4684 os_thread_join(merge_info[j]
4685 .thread_hdl);
4686 }
4687 }
4688 } else {
4689 /* This cannot report duplicates; an
4690 assertion would fail in that case. */
4691 error = row_fts_merge_insert(
4692 sort_idx, new_table,
4693 psort_info, 0);
4694 }
4695
4696 #ifdef FTS_INTERNAL_DIAG_PRINT
4697 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n");
4698 #endif
4699 } else if (merge_files[i].fd >= 0) {
4700 row_merge_dup_t dup = {
4701 sort_idx, table, col_map, 0};
4702
4703 error = row_merge_sort(
4704 trx, &dup, &merge_files[i], block, crypt_block,
4705 new_table->space, &tmpfd, stage);
4706
4707 if (error == DB_SUCCESS) {
4708 BtrBulk btr_bulk(sort_idx, trx->id,
4709 flush_observer);
4710 btr_bulk.init();
4711
4712 error = row_merge_insert_index_tuples(
4713 trx->id, sort_idx, old_table,
4714 merge_files[i].fd, block, crypt_block,
4715 new_table->space,
4716 NULL, &btr_bulk, stage);
4717
4718 error = btr_bulk.finish(error);
4719 }
4720 }
4721
4722 /* Close the temporary file to free up space. */
4723 row_merge_file_destroy(&merge_files[i]);
4724
4725 if (indexes[i]->type & DICT_FTS) {
4726 row_fts_psort_info_destroy(psort_info, merge_info);
4727 fts_psort_initiated = false;
4728 } else if (error != DB_SUCCESS || !online) {
4729 /* Do not apply any online log. */
4730 } else if (old_table != new_table) {
4731 ut_ad(!sort_idx->online_log);
4732 ut_ad(sort_idx->online_status
4733 == ONLINE_INDEX_COMPLETE);
4734 } else {
4735 ut_ad(need_flush_observer);
4736
4737 flush_observer->flush();
4738 row_merge_write_redo(indexes[i]);
4739
4740 DEBUG_SYNC_C("row_log_apply_before");
4741 error = row_log_apply(trx, sort_idx, table, stage);
4742 DEBUG_SYNC_C("row_log_apply_after");
4743 }
4744
4745 if (error != DB_SUCCESS) {
4746 trx->error_key_num = key_numbers[i];
4747 goto func_exit;
4748 }
4749
4750 if (indexes[i]->type & DICT_FTS && fts_enable_diag_print) {
4751 ib::info() << "Finished building full-text index "
4752 << indexes[i]->name;
4753 }
4754 }
4755
4756 func_exit:
4757 DBUG_EXECUTE_IF(
4758 "ib_build_indexes_too_many_concurrent_trxs",
4759 error = DB_TOO_MANY_CONCURRENT_TRXS;
4760 trx->error_state = error;);
4761
4762 if (fts_psort_initiated) {
4763 /* Clean up FTS psort related resource */
4764 row_fts_psort_info_destroy(psort_info, merge_info);
4765 fts_psort_initiated = false;
4766 }
4767
4768 row_merge_file_destroy_low(tmpfd);
4769
4770 for (i = 0; i < n_indexes; i++) {
4771 row_merge_file_destroy(&merge_files[i]);
4772 }
4773
4774 if (fts_sort_idx) {
4775 dict_mem_index_free(fts_sort_idx);
4776 }
4777
4778 ut_free(merge_files);
4779
4780 alloc.deallocate_large(block, &block_pfx);
4781
4782 if (crypt_block) {
4783 alloc.deallocate_large(crypt_block, &crypt_pfx);
4784 }
4785
4786 DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID);
4787
4788 if (online && old_table == new_table && error != DB_SUCCESS) {
4789 /* On error, flag all online secondary index creation
4790 as aborted. */
4791 for (i = 0; i < n_indexes; i++) {
4792 ut_ad(!(indexes[i]->type & DICT_FTS));
4793 ut_ad(!indexes[i]->is_committed());
4794 ut_ad(!dict_index_is_clust(indexes[i]));
4795
4796 /* Completed indexes should be dropped as
4797 well, and indexes whose creation was aborted
4798 should be dropped from the persistent
4799 storage. However, at this point we can only
4800 set some flags in the not-yet-published
4801 indexes. These indexes will be dropped later
4802 in row_merge_drop_indexes(), called by
4803 rollback_inplace_alter_table(). */
4804
4805 switch (dict_index_get_online_status(indexes[i])) {
4806 case ONLINE_INDEX_COMPLETE:
4807 break;
4808 case ONLINE_INDEX_CREATION:
4809 rw_lock_x_lock(
4810 dict_index_get_lock(indexes[i]));
4811 row_log_abort_sec(indexes[i]);
4812 indexes[i]->type |= DICT_CORRUPT;
4813 rw_lock_x_unlock(
4814 dict_index_get_lock(indexes[i]));
4815 new_table->drop_aborted = TRUE;
4816 /* fall through */
4817 case ONLINE_INDEX_ABORTED_DROPPED:
4818 case ONLINE_INDEX_ABORTED:
4819 MONITOR_ATOMIC_INC(
4820 MONITOR_BACKGROUND_DROP_INDEX);
4821 }
4822 }
4823 }
4824
4825 DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
4826
4827 if (flush_observer != NULL) {
4828 ut_ad(need_flush_observer);
4829
4830 DBUG_EXECUTE_IF("ib_index_build_fail_before_flush",
4831 error = DB_FAIL;
4832 );
4833
4834 if (error != DB_SUCCESS) {
4835 flush_observer->interrupted();
4836 }
4837
4838 flush_observer->flush();
4839
4840 UT_DELETE(flush_observer);
4841
4842 if (trx_is_interrupted(trx)) {
4843 error = DB_INTERRUPTED;
4844 }
4845
4846 if (error == DB_SUCCESS && old_table != new_table) {
4847 for (const dict_index_t* index
4848 = dict_table_get_first_index(new_table);
4849 index != NULL;
4850 index = dict_table_get_next_index(index)) {
4851 row_merge_write_redo(index);
4852 }
4853 }
4854 }
4855
4856 DBUG_RETURN(error);
4857 }
4858