1 /***************************************************************************** 2 3 Copyright (c) 2005, 2017, Oracle and/or its affiliates. All Rights Reserved. 4 Copyright (c) 2014, 2021, MariaDB Corporation. 5 6 This program is free software; you can redistribute it and/or modify it under 7 the terms of the GNU General Public License as published by the Free Software 8 Foundation; version 2 of the License. 9 10 This program is distributed in the hope that it will be useful, but WITHOUT 11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. 13 14 You should have received a copy of the GNU General Public License along with 15 this program; if not, write to the Free Software Foundation, Inc., 16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA 17 18 *****************************************************************************/ 19 20 /**************************************************//** 21 @file row/row0merge.cc 22 New index creation routines using a merge sort 23 24 Created 12/4/2005 Jan Lindstrom 25 Completed by Sunny Bains and Marko Makela 26 *******************************************************/ 27 #include <my_global.h> 28 #include <log.h> 29 #include <sql_class.h> 30 #include <math.h> 31 32 #include "row0merge.h" 33 #include "row0ext.h" 34 #include "row0log.h" 35 #include "row0ins.h" 36 #include "row0row.h" 37 #include "row0sel.h" 38 #include "log0crypt.h" 39 #include "dict0crea.h" 40 #include "trx0purge.h" 41 #include "lock0lock.h" 42 #include "pars0pars.h" 43 #include "ut0sort.h" 44 #include "row0ftsort.h" 45 #include "row0import.h" 46 #include "row0vers.h" 47 #include "handler0alter.h" 48 #include "btr0bulk.h" 49 #ifdef BTR_CUR_ADAPT 50 # include "btr0sea.h" 51 #endif /* BTR_CUR_ADAPT */ 52 #include "ut0stage.h" 53 #include "fil0crypt.h" 54 55 /* Ignore posix_fadvise() on those platforms where it does not exist */ 56 #if defined _WIN32 57 # define posix_fadvise(fd, offset, len, advice) /* nothing */ 58 #endif /* _WIN32 */ 59 60 /* Whether to disable file system cache */ 61 char srv_disable_sort_file_cache; 62 63 /** Class that caches index row tuples made from a single cluster 64 index page scan, and then insert into corresponding index tree */ 65 class index_tuple_info_t { 66 public: 67 /** constructor 68 @param[in] heap memory heap 69 @param[in] index index to be created */ 70 index_tuple_info_t( 71 mem_heap_t* heap, 72 dict_index_t* index) UNIV_NOTHROW 73 { 74 m_heap = heap; 75 m_index = index; 76 m_dtuple_vec = UT_NEW_NOKEY(idx_tuple_vec()); 77 } 78 79 /** destructor */ 80 ~index_tuple_info_t() 81 { 82 UT_DELETE(m_dtuple_vec); 83 } 84 85 /** Get the index object 86 @return the index object */ 87 dict_index_t* get_index() UNIV_NOTHROW 88 { 89 return(m_index); 90 } 91 92 /** Caches an index row into index tuple vector 93 @param[in] row table row 94 @param[in] ext externally stored column 95 prefixes, or NULL */ 96 void add( 97 const dtuple_t* row, 98 const row_ext_t* ext) UNIV_NOTHROW 99 { 100 dtuple_t* dtuple; 101 102 dtuple = row_build_index_entry(row, ext, m_index, m_heap); 103 104 ut_ad(dtuple); 105 106 m_dtuple_vec->push_back(dtuple); 107 } 108 109 /** Insert spatial index rows cached in vector into spatial index 110 @param[in] trx_id transaction id 111 @param[in,out] row_heap memory heap 112 @param[in] pcur cluster index scanning cursor 113 @param[in,out] scan_mtr mini-transaction for pcur 114 @return DB_SUCCESS if successful, else error number */ 115 inline dberr_t insert( 116 trx_id_t trx_id, 117 mem_heap_t* row_heap, 118 btr_pcur_t* pcur, 119 mtr_t* scan_mtr) 120 { 121 big_rec_t* big_rec; 122 rec_t* rec; 123 btr_cur_t ins_cur; 124 mtr_t mtr; 125 rtr_info_t rtr_info; 126 rec_offs* ins_offsets = NULL; 127 dberr_t error = DB_SUCCESS; 128 dtuple_t* dtuple; 129 ulint count = 0; 130 const ulint flag = BTR_NO_UNDO_LOG_FLAG 131 | BTR_NO_LOCKING_FLAG 132 | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG; 133 134 ut_ad(dict_index_is_spatial(m_index)); 135 136 DBUG_EXECUTE_IF("row_merge_instrument_log_check_flush", 137 log_sys.check_flush_or_checkpoint = true; 138 ); 139 140 for (idx_tuple_vec::iterator it = m_dtuple_vec->begin(); 141 it != m_dtuple_vec->end(); 142 ++it) { 143 dtuple = *it; 144 ut_ad(dtuple); 145 146 if (log_sys.check_flush_or_checkpoint) { 147 if (scan_mtr->is_active()) { 148 btr_pcur_move_to_prev_on_page(pcur); 149 btr_pcur_store_position(pcur, scan_mtr); 150 scan_mtr->commit(); 151 } 152 153 log_free_check(); 154 } 155 156 mtr.start(); 157 m_index->set_modified(mtr); 158 159 ins_cur.index = m_index; 160 rtr_init_rtr_info(&rtr_info, false, &ins_cur, m_index, 161 false); 162 rtr_info_update_btr(&ins_cur, &rtr_info); 163 164 btr_cur_search_to_nth_level(m_index, 0, dtuple, 165 PAGE_CUR_RTREE_INSERT, 166 BTR_MODIFY_LEAF, &ins_cur, 167 0, __FILE__, __LINE__, 168 &mtr); 169 170 /* It need to update MBR in parent entry, 171 so change search mode to BTR_MODIFY_TREE */ 172 if (rtr_info.mbr_adj) { 173 mtr_commit(&mtr); 174 rtr_clean_rtr_info(&rtr_info, true); 175 rtr_init_rtr_info(&rtr_info, false, &ins_cur, 176 m_index, false); 177 rtr_info_update_btr(&ins_cur, &rtr_info); 178 mtr_start(&mtr); 179 m_index->set_modified(mtr); 180 btr_cur_search_to_nth_level( 181 m_index, 0, dtuple, 182 PAGE_CUR_RTREE_INSERT, 183 BTR_MODIFY_TREE, &ins_cur, 0, 184 __FILE__, __LINE__, &mtr); 185 } 186 187 error = btr_cur_optimistic_insert( 188 flag, &ins_cur, &ins_offsets, &row_heap, 189 dtuple, &rec, &big_rec, 0, NULL, &mtr); 190 191 if (error == DB_FAIL) { 192 ut_ad(!big_rec); 193 mtr.commit(); 194 mtr.start(); 195 m_index->set_modified(mtr); 196 197 rtr_clean_rtr_info(&rtr_info, true); 198 rtr_init_rtr_info(&rtr_info, false, 199 &ins_cur, m_index, false); 200 201 rtr_info_update_btr(&ins_cur, &rtr_info); 202 btr_cur_search_to_nth_level( 203 m_index, 0, dtuple, 204 PAGE_CUR_RTREE_INSERT, 205 BTR_MODIFY_TREE, 206 &ins_cur, 0, 207 __FILE__, __LINE__, &mtr); 208 209 error = btr_cur_pessimistic_insert( 210 flag, &ins_cur, &ins_offsets, 211 &row_heap, dtuple, &rec, 212 &big_rec, 0, NULL, &mtr); 213 } 214 215 DBUG_EXECUTE_IF( 216 "row_merge_ins_spatial_fail", 217 error = DB_FAIL; 218 ); 219 220 if (error == DB_SUCCESS) { 221 if (rtr_info.mbr_adj) { 222 error = rtr_ins_enlarge_mbr( 223 &ins_cur, &mtr); 224 } 225 226 if (error == DB_SUCCESS) { 227 page_update_max_trx_id( 228 btr_cur_get_block(&ins_cur), 229 btr_cur_get_page_zip(&ins_cur), 230 trx_id, &mtr); 231 } 232 } 233 234 mtr_commit(&mtr); 235 236 rtr_clean_rtr_info(&rtr_info, true); 237 count++; 238 } 239 240 m_dtuple_vec->clear(); 241 242 return(error); 243 } 244 245 private: 246 /** Cache index rows made from a cluster index scan. Usually 247 for rows on single cluster index page */ 248 typedef std::vector<dtuple_t*, ut_allocator<dtuple_t*> > 249 idx_tuple_vec; 250 251 /** vector used to cache index rows made from cluster index scan */ 252 idx_tuple_vec* m_dtuple_vec; 253 254 /** the index being built */ 255 dict_index_t* m_index; 256 257 /** memory heap for creating index tuples */ 258 mem_heap_t* m_heap; 259 }; 260 261 /* Maximum pending doc memory limit in bytes for a fts tokenization thread */ 262 #define FTS_PENDING_DOC_MEMORY_LIMIT 1000000 263 264 /** Insert sorted data tuples to the index. 265 @param[in] index index to be inserted 266 @param[in] old_table old table 267 @param[in] fd file descriptor 268 @param[in,out] block file buffer 269 @param[in] row_buf row_buf the sorted data tuples, 270 or NULL if fd, block will be used instead 271 @param[in,out] btr_bulk btr bulk instance 272 @param[in,out] stage performance schema accounting object, used by 273 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially 274 and then stage->inc() will be called for each record that is processed. 275 @return DB_SUCCESS or error number */ 276 static MY_ATTRIBUTE((warn_unused_result)) 277 dberr_t 278 row_merge_insert_index_tuples( 279 dict_index_t* index, 280 const dict_table_t* old_table, 281 const pfs_os_file_t& fd, 282 row_merge_block_t* block, 283 const row_merge_buf_t* row_buf, 284 BtrBulk* btr_bulk, 285 const ib_uint64_t table_total_rows, /*!< in: total rows of old table */ 286 const double pct_progress, /*!< in: total progress 287 percent until now */ 288 const double pct_cost, /*!< in: current progress percent 289 */ 290 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ 291 ulint space, /*!< in: space id */ 292 ut_stage_alter_t* stage = NULL); 293 294 /******************************************************//** 295 Encode an index record. */ 296 static MY_ATTRIBUTE((nonnull)) 297 void 298 row_merge_buf_encode( 299 /*=================*/ 300 byte** b, /*!< in/out: pointer to 301 current end of output buffer */ 302 const dict_index_t* index, /*!< in: index */ 303 const mtuple_t* entry, /*!< in: index fields 304 of the record to encode */ 305 ulint n_fields) /*!< in: number of fields 306 in the entry */ 307 { 308 ulint size; 309 ulint extra_size; 310 311 size = rec_get_converted_size_temp<false>( 312 index, entry->fields, n_fields, &extra_size); 313 ut_ad(size >= extra_size); 314 315 /* Encode extra_size + 1 */ 316 if (extra_size + 1 < 0x80) { 317 *(*b)++ = (byte) (extra_size + 1); 318 } else { 319 ut_ad((extra_size + 1) < 0x8000); 320 *(*b)++ = (byte) (0x80 | ((extra_size + 1) >> 8)); 321 *(*b)++ = (byte) (extra_size + 1); 322 } 323 324 rec_convert_dtuple_to_temp<false>(*b + extra_size, index, 325 entry->fields, n_fields); 326 327 *b += size; 328 } 329 330 /******************************************************//** 331 Allocate a sort buffer. 332 @return own: sort buffer */ 333 static MY_ATTRIBUTE((malloc, nonnull)) 334 row_merge_buf_t* 335 row_merge_buf_create_low( 336 /*=====================*/ 337 mem_heap_t* heap, /*!< in: heap where allocated */ 338 dict_index_t* index, /*!< in: secondary index */ 339 ulint max_tuples, /*!< in: maximum number of 340 data tuples */ 341 ulint buf_size) /*!< in: size of the buffer, 342 in bytes */ 343 { 344 row_merge_buf_t* buf; 345 346 ut_ad(max_tuples > 0); 347 348 ut_ad(max_tuples <= srv_sort_buf_size); 349 350 buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size)); 351 buf->heap = heap; 352 buf->index = index; 353 buf->max_tuples = max_tuples; 354 buf->tuples = static_cast<mtuple_t*>( 355 ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples)); 356 buf->tmp_tuples = buf->tuples + max_tuples; 357 358 return(buf); 359 } 360 361 /******************************************************//** 362 Allocate a sort buffer. 363 @return own: sort buffer */ 364 row_merge_buf_t* 365 row_merge_buf_create( 366 /*=================*/ 367 dict_index_t* index) /*!< in: secondary index */ 368 { 369 row_merge_buf_t* buf; 370 ulint max_tuples; 371 ulint buf_size; 372 mem_heap_t* heap; 373 374 max_tuples = srv_sort_buf_size 375 / ut_max(static_cast<ulint>(1), 376 dict_index_get_min_size(index)); 377 378 buf_size = (sizeof *buf); 379 380 heap = mem_heap_create(buf_size); 381 382 buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size); 383 384 return(buf); 385 } 386 387 /******************************************************//** 388 Empty a sort buffer. 389 @return sort buffer */ 390 row_merge_buf_t* 391 row_merge_buf_empty( 392 /*================*/ 393 row_merge_buf_t* buf) /*!< in,own: sort buffer */ 394 { 395 ulint buf_size = sizeof *buf; 396 ulint max_tuples = buf->max_tuples; 397 mem_heap_t* heap = buf->heap; 398 dict_index_t* index = buf->index; 399 mtuple_t* tuples = buf->tuples; 400 401 mem_heap_empty(heap); 402 403 buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size)); 404 buf->heap = heap; 405 buf->index = index; 406 buf->max_tuples = max_tuples; 407 buf->tuples = tuples; 408 buf->tmp_tuples = buf->tuples + max_tuples; 409 410 return(buf); 411 } 412 413 /******************************************************//** 414 Deallocate a sort buffer. */ 415 void 416 row_merge_buf_free( 417 /*===============*/ 418 row_merge_buf_t* buf) /*!< in,own: sort buffer to be freed */ 419 { 420 ut_free(buf->tuples); 421 mem_heap_free(buf->heap); 422 } 423 424 /** Convert the field data from compact to redundant format. 425 @param[in] row_field field to copy from 426 @param[out] field field to copy to 427 @param[in] len length of the field data 428 @param[in] zip_size compressed BLOB page size, 429 zero for uncompressed BLOBs 430 @param[in,out] heap memory heap where to allocate data when 431 converting to ROW_FORMAT=REDUNDANT, or NULL 432 when not to invoke 433 row_merge_buf_redundant_convert(). */ 434 static 435 void 436 row_merge_buf_redundant_convert( 437 const dfield_t* row_field, 438 dfield_t* field, 439 ulint len, 440 const page_size_t& page_size, 441 mem_heap_t* heap) 442 { 443 ut_ad(field->type.mbminlen == 1); 444 ut_ad(field->type.mbmaxlen > 1); 445 446 byte* buf = (byte*) mem_heap_alloc(heap, len); 447 ulint field_len = row_field->len; 448 ut_ad(field_len <= len); 449 450 if (row_field->ext) { 451 const byte* field_data = static_cast<const byte*>( 452 dfield_get_data(row_field)); 453 ulint ext_len; 454 455 ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE); 456 ut_a(memcmp(field_data + field_len - BTR_EXTERN_FIELD_REF_SIZE, 457 field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE)); 458 459 byte* data = btr_copy_externally_stored_field( 460 &ext_len, field_data, page_size, field_len, heap); 461 462 ut_ad(ext_len < len); 463 464 memcpy(buf, data, ext_len); 465 field_len = ext_len; 466 } else { 467 memcpy(buf, row_field->data, field_len); 468 } 469 470 memset(buf + field_len, 0x20, len - field_len); 471 472 dfield_set_data(field, buf, len); 473 } 474 475 /** Insert a data tuple into a sort buffer. 476 @param[in,out] buf sort buffer 477 @param[in] fts_index fts index to be created 478 @param[in] old_table original table 479 @param[in] new_table new table 480 @param[in,out] psort_info parallel sort info 481 @param[in,out] row table row 482 @param[in] ext cache of externally stored 483 column prefixes, or NULL 484 @param[in,out] doc_id Doc ID if we are creating 485 FTS index 486 @param[in,out] conv_heap memory heap where to allocate data when 487 converting to ROW_FORMAT=REDUNDANT, or NULL 488 when not to invoke 489 row_merge_buf_redundant_convert() 490 @param[in,out] err set if error occurs 491 @param[in,out] v_heap heap memory to process data for virtual column 492 @param[in,out] my_table mysql table object 493 @param[in] trx transaction object 494 @return number of rows added, 0 if out of space */ 495 static 496 ulint 497 row_merge_buf_add( 498 row_merge_buf_t* buf, 499 dict_index_t* fts_index, 500 const dict_table_t* old_table, 501 const dict_table_t* new_table, 502 fts_psort_t* psort_info, 503 dtuple_t* row, 504 const row_ext_t* ext, 505 doc_id_t* doc_id, 506 mem_heap_t* conv_heap, 507 dberr_t* err, 508 mem_heap_t** v_heap, 509 TABLE* my_table, 510 trx_t* trx) 511 { 512 ulint i; 513 const dict_index_t* index; 514 mtuple_t* entry; 515 dfield_t* field; 516 const dict_field_t* ifield; 517 ulint n_fields; 518 ulint data_size; 519 ulint extra_size; 520 ulint bucket = 0; 521 doc_id_t write_doc_id; 522 ulint n_row_added = 0; 523 VCOL_STORAGE vcol_storage; 524 DBUG_ENTER("row_merge_buf_add"); 525 526 if (buf->n_tuples >= buf->max_tuples) { 527 error: 528 n_row_added = 0; 529 goto end; 530 } 531 532 DBUG_EXECUTE_IF( 533 "ib_row_merge_buf_add_two", 534 if (buf->n_tuples >= 2) DBUG_RETURN(0);); 535 536 UNIV_PREFETCH_R(row->fields); 537 538 /* If we are building FTS index, buf->index points to 539 the 'fts_sort_idx', and real FTS index is stored in 540 fts_index */ 541 index = (buf->index->type & DICT_FTS) ? fts_index : buf->index; 542 543 /* create spatial index should not come here */ 544 ut_ad(!dict_index_is_spatial(index)); 545 546 n_fields = dict_index_get_n_fields(index); 547 548 entry = &buf->tuples[buf->n_tuples]; 549 field = entry->fields = static_cast<dfield_t*>( 550 mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields)); 551 552 data_size = 0; 553 extra_size = UT_BITS_IN_BYTES(unsigned(index->n_nullable)); 554 555 ifield = dict_index_get_nth_field(index, 0); 556 557 for (i = 0; i < n_fields; i++, field++, ifield++) { 558 ulint len; 559 ulint fixed_len; 560 const dfield_t* row_field; 561 const dict_col_t* const col = ifield->col; 562 const dict_v_col_t* const v_col = col->is_virtual() 563 ? reinterpret_cast<const dict_v_col_t*>(col) 564 : NULL; 565 566 /* Process the Doc ID column */ 567 if (!v_col && *doc_id 568 && col->ind == index->table->fts->doc_col) { 569 fts_write_doc_id((byte*) &write_doc_id, *doc_id); 570 571 /* Note: field->data now points to a value on the 572 stack: &write_doc_id after dfield_set_data(). Because 573 there is only one doc_id per row, it shouldn't matter. 574 We allocate a new buffer before we leave the function 575 later below. */ 576 577 dfield_set_data( 578 field, &write_doc_id, sizeof(write_doc_id)); 579 580 field->type.mtype = ifield->col->mtype; 581 field->type.prtype = ifield->col->prtype; 582 field->type.mbminlen = 0; 583 field->type.mbmaxlen = 0; 584 field->type.len = ifield->col->len; 585 } else { 586 /* Use callback to get the virtual column value */ 587 if (v_col) { 588 dict_index_t* clust_index 589 = dict_table_get_first_index(new_table); 590 591 if (!vcol_storage.innobase_record && 592 !innobase_allocate_row_for_vcol( 593 trx->mysql_thd, clust_index, 594 v_heap, &my_table, 595 &vcol_storage)) { 596 *err = DB_OUT_OF_MEMORY; 597 goto error; 598 } 599 600 row_field = innobase_get_computed_value( 601 row, v_col, clust_index, 602 v_heap, NULL, ifield, trx->mysql_thd, 603 my_table, vcol_storage.innobase_record, 604 old_table, NULL); 605 606 if (row_field == NULL) { 607 *err = DB_COMPUTE_VALUE_FAILED; 608 goto error; 609 } 610 dfield_copy(field, row_field); 611 } else { 612 row_field = dtuple_get_nth_field(row, 613 col->ind); 614 dfield_copy(field, row_field); 615 } 616 617 618 /* Tokenize and process data for FTS */ 619 if (index->type & DICT_FTS) { 620 fts_doc_item_t* doc_item; 621 byte* value; 622 void* ptr; 623 const ulint max_trial_count = 10000; 624 ulint trial_count = 0; 625 626 /* fetch Doc ID if it already exists 627 in the row, and not supplied by the 628 caller. Even if the value column is 629 NULL, we still need to get the Doc 630 ID so to maintain the correct max 631 Doc ID */ 632 if (*doc_id == 0) { 633 const dfield_t* doc_field; 634 doc_field = dtuple_get_nth_field( 635 row, 636 index->table->fts->doc_col); 637 *doc_id = (doc_id_t) mach_read_from_8( 638 static_cast<const byte*>( 639 dfield_get_data(doc_field))); 640 641 if (*doc_id == 0) { 642 ib::warn() << "FTS Doc ID is" 643 " zero. Record" 644 " skipped"; 645 goto error; 646 } 647 } 648 649 if (dfield_is_null(field)) { 650 n_row_added = 1; 651 continue; 652 } 653 654 ptr = ut_malloc_nokey(sizeof(*doc_item) 655 + field->len); 656 657 doc_item = static_cast<fts_doc_item_t*>(ptr); 658 value = static_cast<byte*>(ptr) 659 + sizeof(*doc_item); 660 memcpy(value, field->data, field->len); 661 field->data = value; 662 663 doc_item->field = field; 664 doc_item->doc_id = *doc_id; 665 666 bucket = *doc_id % fts_sort_pll_degree; 667 668 /* Add doc item to fts_doc_list */ 669 mutex_enter(&psort_info[bucket].mutex); 670 671 if (psort_info[bucket].error == DB_SUCCESS) { 672 UT_LIST_ADD_LAST( 673 psort_info[bucket].fts_doc_list, 674 doc_item); 675 psort_info[bucket].memory_used += 676 sizeof(*doc_item) + field->len; 677 } else { 678 ut_free(doc_item); 679 } 680 681 mutex_exit(&psort_info[bucket].mutex); 682 683 /* Sleep when memory used exceeds limit*/ 684 while (psort_info[bucket].memory_used 685 > FTS_PENDING_DOC_MEMORY_LIMIT 686 && trial_count++ < max_trial_count) { 687 os_thread_sleep(1000); 688 } 689 690 n_row_added = 1; 691 continue; 692 } 693 694 /* innobase_get_computed_value() sets the 695 length of the virtual column field. */ 696 if (v_col == NULL 697 && field->len != UNIV_SQL_NULL 698 && col->mtype == DATA_MYSQL 699 && col->len != field->len) { 700 if (conv_heap != NULL) { 701 row_merge_buf_redundant_convert( 702 row_field, field, col->len, 703 dict_table_page_size(old_table), 704 conv_heap); 705 } else { 706 /* Field length mismatch should not 707 happen when rebuilding redundant row 708 format table. */ 709 ut_ad(dict_table_is_comp(index->table)); 710 } 711 } 712 } 713 714 len = dfield_get_len(field); 715 716 if (dfield_is_null(field)) { 717 ut_ad(!(col->prtype & DATA_NOT_NULL)); 718 continue; 719 } else if (!ext) { 720 } else if (dict_index_is_clust(index)) { 721 /* Flag externally stored fields. */ 722 const byte* buf = row_ext_lookup(ext, col->ind, 723 &len); 724 if (UNIV_LIKELY_NULL(buf)) { 725 ut_a(buf != field_ref_zero); 726 if (i < dict_index_get_n_unique(index)) { 727 dfield_set_data(field, buf, len); 728 } else { 729 dfield_set_ext(field); 730 len = dfield_get_len(field); 731 } 732 } 733 } else if (!v_col) { 734 /* Only non-virtual column are stored externally */ 735 const byte* buf = row_ext_lookup(ext, col->ind, 736 &len); 737 if (UNIV_LIKELY_NULL(buf)) { 738 ut_a(buf != field_ref_zero); 739 dfield_set_data(field, buf, len); 740 } 741 } 742 743 /* If a column prefix index, take only the prefix */ 744 745 if (ifield->prefix_len) { 746 len = dtype_get_at_most_n_mbchars( 747 col->prtype, 748 col->mbminlen, col->mbmaxlen, 749 ifield->prefix_len, 750 len, 751 static_cast<char*>(dfield_get_data(field))); 752 dfield_set_len(field, len); 753 } 754 755 ut_ad(len <= col->len 756 || DATA_LARGE_MTYPE(col->mtype)); 757 758 fixed_len = ifield->fixed_len; 759 if (fixed_len && !dict_table_is_comp(index->table) 760 && col->mbminlen != col->mbmaxlen) { 761 /* CHAR in ROW_FORMAT=REDUNDANT is always 762 fixed-length, but in the temporary file it is 763 variable-length for variable-length character 764 sets. */ 765 fixed_len = 0; 766 } 767 768 if (fixed_len) { 769 #ifdef UNIV_DEBUG 770 /* len should be between size calcualted base on 771 mbmaxlen and mbminlen */ 772 ut_ad(len <= fixed_len); 773 ut_ad(!col->mbmaxlen || len >= col->mbminlen 774 * (fixed_len / col->mbmaxlen)); 775 776 ut_ad(!dfield_is_ext(field)); 777 #endif /* UNIV_DEBUG */ 778 } else if (dfield_is_ext(field)) { 779 extra_size += 2; 780 } else if (len < 128 781 || (!DATA_BIG_COL(col))) { 782 extra_size++; 783 } else { 784 /* For variable-length columns, we look up the 785 maximum length from the column itself. If this 786 is a prefix index column shorter than 256 bytes, 787 this will waste one byte. */ 788 extra_size += 2; 789 } 790 data_size += len; 791 } 792 793 /* If this is FTS index, we already populated the sort buffer, return 794 here */ 795 if (index->type & DICT_FTS) { 796 goto end; 797 } 798 799 #ifdef UNIV_DEBUG 800 { 801 ulint size; 802 ulint extra; 803 804 size = rec_get_converted_size_temp<false>( 805 index, entry->fields, n_fields, &extra); 806 807 ut_ad(data_size + extra_size == size); 808 ut_ad(extra_size == extra); 809 } 810 #endif /* UNIV_DEBUG */ 811 812 /* Add to the total size of the record in row_merge_block_t 813 the encoded length of extra_size and the extra bytes (extra_size). 814 See row_merge_buf_write() for the variable-length encoding 815 of extra_size. */ 816 data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80); 817 818 /* Record size can exceed page size while converting to 819 redundant row format. But there is assert 820 ut_ad(size < srv_page_size) in rec_offs_data_size(). 821 It may hit the assert before attempting to insert the row. */ 822 if (conv_heap != NULL && data_size > srv_page_size) { 823 *err = DB_TOO_BIG_RECORD; 824 } 825 826 ut_ad(data_size < srv_sort_buf_size); 827 828 /* Reserve bytes for the end marker of row_merge_block_t. */ 829 if (buf->total_size + data_size >= srv_sort_buf_size) { 830 goto error; 831 } 832 833 buf->total_size += data_size; 834 buf->n_tuples++; 835 n_row_added++; 836 837 field = entry->fields; 838 839 /* Copy the data fields. */ 840 841 do { 842 dfield_dup(field++, buf->heap); 843 } while (--n_fields); 844 845 if (conv_heap != NULL) { 846 mem_heap_empty(conv_heap); 847 } 848 849 end: 850 if (vcol_storage.innobase_record) 851 innobase_free_row_for_vcol(&vcol_storage); 852 DBUG_RETURN(n_row_added); 853 } 854 855 /*************************************************************//** 856 Report a duplicate key. */ 857 void 858 row_merge_dup_report( 859 /*=================*/ 860 row_merge_dup_t* dup, /*!< in/out: for reporting duplicates */ 861 const dfield_t* entry) /*!< in: duplicate index entry */ 862 { 863 if (!dup->n_dup++) { 864 /* Only report the first duplicate record, 865 but count all duplicate records. */ 866 innobase_fields_to_mysql(dup->table, dup->index, entry); 867 } 868 } 869 870 /*************************************************************//** 871 Compare two tuples. 872 @return positive, 0, negative if a is greater, equal, less, than b, 873 respectively */ 874 static MY_ATTRIBUTE((warn_unused_result)) 875 int 876 row_merge_tuple_cmp( 877 /*================*/ 878 ulint n_uniq, /*!< in: number of unique fields */ 879 ulint n_field,/*!< in: number of fields */ 880 const mtuple_t& a, /*!< in: first tuple to be compared */ 881 const mtuple_t& b, /*!< in: second tuple to be compared */ 882 row_merge_dup_t* dup) /*!< in/out: for reporting duplicates, 883 NULL if non-unique index */ 884 { 885 int cmp; 886 const dfield_t* af = a.fields; 887 const dfield_t* bf = b.fields; 888 ulint n = n_uniq; 889 890 ut_ad(n_uniq > 0); 891 ut_ad(n_uniq <= n_field); 892 893 /* Compare the fields of the tuples until a difference is 894 found or we run out of fields to compare. If !cmp at the 895 end, the tuples are equal. */ 896 do { 897 cmp = cmp_dfield_dfield(af++, bf++); 898 } while (!cmp && --n); 899 900 if (cmp) { 901 return(cmp); 902 } 903 904 if (dup) { 905 /* Report a duplicate value error if the tuples are 906 logically equal. NULL columns are logically inequal, 907 although they are equal in the sorting order. Find 908 out if any of the fields are NULL. */ 909 for (const dfield_t* df = a.fields; df != af; df++) { 910 if (dfield_is_null(df)) { 911 goto no_report; 912 } 913 } 914 915 row_merge_dup_report(dup, a.fields); 916 } 917 918 no_report: 919 /* The n_uniq fields were equal, but we compare all fields so 920 that we will get the same (internal) order as in the B-tree. */ 921 for (n = n_field - n_uniq + 1; --n; ) { 922 cmp = cmp_dfield_dfield(af++, bf++); 923 if (cmp) { 924 return(cmp); 925 } 926 } 927 928 /* This should never be reached, except in a secondary index 929 when creating a secondary index and a PRIMARY KEY, and there 930 is a duplicate in the PRIMARY KEY that has not been detected 931 yet. Internally, an index must never contain duplicates. */ 932 return(cmp); 933 } 934 935 /** Wrapper for row_merge_tuple_sort() to inject some more context to 936 UT_SORT_FUNCTION_BODY(). 937 @param tuples array of tuples that being sorted 938 @param aux work area, same size as tuples[] 939 @param low lower bound of the sorting area, inclusive 940 @param high upper bound of the sorting area, inclusive */ 941 #define row_merge_tuple_sort_ctx(tuples, aux, low, high) \ 942 row_merge_tuple_sort(n_uniq, n_field, dup, tuples, aux, low, high) 943 /** Wrapper for row_merge_tuple_cmp() to inject some more context to 944 UT_SORT_FUNCTION_BODY(). 945 @param a first tuple to be compared 946 @param b second tuple to be compared 947 @return positive, 0, negative, if a is greater, equal, less, than b, 948 respectively */ 949 #define row_merge_tuple_cmp_ctx(a,b) \ 950 row_merge_tuple_cmp(n_uniq, n_field, a, b, dup) 951 952 /**********************************************************************//** 953 Merge sort the tuple buffer in main memory. */ 954 static 955 void 956 row_merge_tuple_sort( 957 /*=================*/ 958 ulint n_uniq, /*!< in: number of unique fields */ 959 ulint n_field,/*!< in: number of fields */ 960 row_merge_dup_t* dup, /*!< in/out: reporter of duplicates 961 (NULL if non-unique index) */ 962 mtuple_t* tuples, /*!< in/out: tuples */ 963 mtuple_t* aux, /*!< in/out: work area */ 964 ulint low, /*!< in: lower bound of the 965 sorting area, inclusive */ 966 ulint high) /*!< in: upper bound of the 967 sorting area, exclusive */ 968 { 969 ut_ad(n_field > 0); 970 ut_ad(n_uniq <= n_field); 971 972 UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx, 973 tuples, aux, low, high, row_merge_tuple_cmp_ctx); 974 } 975 976 /******************************************************//** 977 Sort a buffer. */ 978 void 979 row_merge_buf_sort( 980 /*===============*/ 981 row_merge_buf_t* buf, /*!< in/out: sort buffer */ 982 row_merge_dup_t* dup) /*!< in/out: reporter of duplicates 983 (NULL if non-unique index) */ 984 { 985 ut_ad(!dict_index_is_spatial(buf->index)); 986 987 row_merge_tuple_sort(dict_index_get_n_unique(buf->index), 988 dict_index_get_n_fields(buf->index), 989 dup, 990 buf->tuples, buf->tmp_tuples, 0, buf->n_tuples); 991 } 992 993 /******************************************************//** 994 Write a buffer to a block. */ 995 void 996 row_merge_buf_write( 997 /*================*/ 998 const row_merge_buf_t* buf, /*!< in: sorted buffer */ 999 const merge_file_t* of UNIV_UNUSED, 1000 /*!< in: output file */ 1001 row_merge_block_t* block) /*!< out: buffer for writing to file */ 1002 { 1003 const dict_index_t* index = buf->index; 1004 ulint n_fields= dict_index_get_n_fields(index); 1005 byte* b = &block[0]; 1006 1007 DBUG_ENTER("row_merge_buf_write"); 1008 1009 for (ulint i = 0; i < buf->n_tuples; i++) { 1010 const mtuple_t* entry = &buf->tuples[i]; 1011 1012 row_merge_buf_encode(&b, index, entry, n_fields); 1013 ut_ad(b < &block[srv_sort_buf_size]); 1014 1015 DBUG_LOG("ib_merge_sort", 1016 reinterpret_cast<const void*>(b) << ',' 1017 << of->fd << ',' << of->offset << ' ' << 1018 i << ": " << 1019 rec_printer(entry->fields, n_fields).str()); 1020 } 1021 1022 /* Write an "end-of-chunk" marker. */ 1023 ut_a(b < &block[srv_sort_buf_size]); 1024 ut_a(b == &block[0] + buf->total_size); 1025 *b++ = 0; 1026 #ifdef HAVE_valgrind 1027 /* The rest of the block is uninitialized. Initialize it 1028 to avoid bogus warnings. */ 1029 memset(b, 0xff, &block[srv_sort_buf_size] - b); 1030 #endif /* HAVE_valgrind */ 1031 DBUG_LOG("ib_merge_sort", 1032 "write " << reinterpret_cast<const void*>(b) << ',' 1033 << of->fd << ',' << of->offset << " EOF"); 1034 DBUG_VOID_RETURN; 1035 } 1036 1037 /******************************************************//** 1038 Create a memory heap and allocate space for row_merge_rec_offsets() 1039 and mrec_buf_t[3]. 1040 @return memory heap */ 1041 static 1042 mem_heap_t* 1043 row_merge_heap_create( 1044 /*==================*/ 1045 const dict_index_t* index, /*!< in: record descriptor */ 1046 mrec_buf_t** buf, /*!< out: 3 buffers */ 1047 rec_offs** offsets1, /*!< out: offsets */ 1048 rec_offs** offsets2) /*!< out: offsets */ 1049 { 1050 ulint i = 1 + REC_OFFS_HEADER_SIZE 1051 + dict_index_get_n_fields(index); 1052 mem_heap_t* heap = mem_heap_create(2 * i * sizeof **offsets1 1053 + 3 * sizeof **buf); 1054 1055 *buf = static_cast<mrec_buf_t*>( 1056 mem_heap_alloc(heap, 3 * sizeof **buf)); 1057 *offsets1 = static_cast<rec_offs*>( 1058 mem_heap_alloc(heap, i * sizeof **offsets1)); 1059 *offsets2 = static_cast<rec_offs*>( 1060 mem_heap_alloc(heap, i * sizeof **offsets2)); 1061 1062 rec_offs_set_n_alloc(*offsets1, i); 1063 rec_offs_set_n_alloc(*offsets2, i); 1064 rec_offs_set_n_fields(*offsets1, dict_index_get_n_fields(index)); 1065 rec_offs_set_n_fields(*offsets2, dict_index_get_n_fields(index)); 1066 1067 return(heap); 1068 } 1069 1070 /** Read a merge block from the file system. 1071 @return whether the request was completed successfully */ 1072 bool 1073 row_merge_read( 1074 /*===========*/ 1075 const pfs_os_file_t& fd, /*!< in: file descriptor */ 1076 ulint offset, /*!< in: offset where to read 1077 in number of row_merge_block_t 1078 elements */ 1079 row_merge_block_t* buf, /*!< out: data */ 1080 row_merge_block_t* crypt_buf, /*!< in: crypt buf or NULL */ 1081 ulint space) /*!< in: space id */ 1082 { 1083 os_offset_t ofs = ((os_offset_t) offset) * srv_sort_buf_size; 1084 1085 DBUG_ENTER("row_merge_read"); 1086 DBUG_LOG("ib_merge_sort", "fd=" << fd << " ofs=" << ofs); 1087 DBUG_EXECUTE_IF("row_merge_read_failure", DBUG_RETURN(FALSE);); 1088 1089 IORequest request(IORequest::READ); 1090 const bool success = DB_SUCCESS == os_file_read_no_error_handling( 1091 request, fd, buf, ofs, srv_sort_buf_size, 0); 1092 1093 /* If encryption is enabled decrypt buffer */ 1094 if (success && log_tmp_is_encrypted()) { 1095 if (!log_tmp_block_decrypt(buf, srv_sort_buf_size, 1096 crypt_buf, ofs)) { 1097 return (FALSE); 1098 } 1099 1100 srv_stats.n_merge_blocks_decrypted.inc(); 1101 memcpy(buf, crypt_buf, srv_sort_buf_size); 1102 } 1103 1104 #ifdef POSIX_FADV_DONTNEED 1105 /* Each block is read exactly once. Free up the file cache. */ 1106 posix_fadvise(fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED); 1107 #endif /* POSIX_FADV_DONTNEED */ 1108 1109 if (!success) { 1110 ib::error() << "Failed to read merge block at " << ofs; 1111 } 1112 1113 DBUG_RETURN(success); 1114 } 1115 1116 /********************************************************************//** 1117 Write a merge block to the file system. 1118 @return whether the request was completed successfully 1119 @retval false on error 1120 @retval true on success */ 1121 UNIV_INTERN 1122 bool 1123 row_merge_write( 1124 /*============*/ 1125 const pfs_os_file_t& fd, /*!< in: file descriptor */ 1126 ulint offset, /*!< in: offset where to write, 1127 in number of row_merge_block_t elements */ 1128 const void* buf, /*!< in: data */ 1129 void* crypt_buf, /*!< in: crypt buf or NULL */ 1130 ulint space) /*!< in: space id */ 1131 { 1132 size_t buf_len = srv_sort_buf_size; 1133 os_offset_t ofs = buf_len * (os_offset_t) offset; 1134 void* out_buf = (void *)buf; 1135 1136 DBUG_ENTER("row_merge_write"); 1137 DBUG_LOG("ib_merge_sort", "fd=" << fd << " ofs=" << ofs); 1138 DBUG_EXECUTE_IF("row_merge_write_failure", DBUG_RETURN(FALSE);); 1139 1140 /* For encrypted tables, encrypt data before writing */ 1141 if (log_tmp_is_encrypted()) { 1142 if (!log_tmp_block_encrypt(static_cast<const byte*>(buf), 1143 buf_len, 1144 static_cast<byte*>(crypt_buf), 1145 ofs)) { 1146 return false; 1147 } 1148 1149 srv_stats.n_merge_blocks_encrypted.inc(); 1150 out_buf = crypt_buf; 1151 } 1152 1153 IORequest request(IORequest::WRITE); 1154 const bool success = DB_SUCCESS == os_file_write( 1155 request, "(merge)", fd, out_buf, ofs, buf_len); 1156 1157 #ifdef POSIX_FADV_DONTNEED 1158 /* The block will be needed on the next merge pass, 1159 but it can be evicted from the file cache meanwhile. */ 1160 posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED); 1161 #endif /* POSIX_FADV_DONTNEED */ 1162 1163 DBUG_RETURN(success); 1164 } 1165 1166 /********************************************************************//** 1167 Read a merge record. 1168 @return pointer to next record, or NULL on I/O error or end of list */ 1169 const byte* 1170 row_merge_read_rec( 1171 /*===============*/ 1172 row_merge_block_t* block, /*!< in/out: file buffer */ 1173 mrec_buf_t* buf, /*!< in/out: secondary buffer */ 1174 const byte* b, /*!< in: pointer to record */ 1175 const dict_index_t* index, /*!< in: index of the record */ 1176 const pfs_os_file_t& fd, /*!< in: file descriptor */ 1177 ulint* foffs, /*!< in/out: file offset */ 1178 const mrec_t** mrec, /*!< out: pointer to merge record, 1179 or NULL on end of list 1180 (non-NULL on I/O error) */ 1181 rec_offs* offsets,/*!< out: offsets of mrec */ 1182 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ 1183 ulint space) /*!< in: space id */ 1184 { 1185 ulint extra_size; 1186 ulint data_size; 1187 ulint avail_size; 1188 1189 ut_ad(b >= &block[0]); 1190 ut_ad(b < &block[srv_sort_buf_size]); 1191 1192 ut_ad(rec_offs_get_n_alloc(offsets) == 1 + REC_OFFS_HEADER_SIZE 1193 + dict_index_get_n_fields(index)); 1194 1195 DBUG_ENTER("row_merge_read_rec"); 1196 1197 extra_size = *b++; 1198 1199 if (UNIV_UNLIKELY(!extra_size)) { 1200 /* End of list */ 1201 *mrec = NULL; 1202 DBUG_LOG("ib_merge_sort", 1203 "read " << reinterpret_cast<const void*>(b) << ',' << 1204 reinterpret_cast<const void*>(block) << ',' << 1205 fd << ',' << *foffs << " EOF"); 1206 DBUG_RETURN(NULL); 1207 } 1208 1209 if (extra_size >= 0x80) { 1210 /* Read another byte of extra_size. */ 1211 1212 if (UNIV_UNLIKELY(b >= &block[srv_sort_buf_size])) { 1213 if (!row_merge_read(fd, ++(*foffs), block, 1214 crypt_block, 1215 space)) { 1216 err_exit: 1217 /* Signal I/O error. */ 1218 *mrec = b; 1219 DBUG_RETURN(NULL); 1220 } 1221 1222 /* Wrap around to the beginning of the buffer. */ 1223 b = &block[0]; 1224 } 1225 1226 extra_size = (extra_size & 0x7f) << 8; 1227 extra_size |= *b++; 1228 } 1229 1230 /* Normalize extra_size. Above, value 0 signals "end of list". */ 1231 extra_size--; 1232 1233 /* Read the extra bytes. */ 1234 1235 if (UNIV_UNLIKELY(b + extra_size >= &block[srv_sort_buf_size])) { 1236 /* The record spans two blocks. Copy the entire record 1237 to the auxiliary buffer and handle this as a special 1238 case. */ 1239 1240 avail_size = ulint(&block[srv_sort_buf_size] - b); 1241 ut_ad(avail_size < sizeof *buf); 1242 memcpy(*buf, b, avail_size); 1243 1244 if (!row_merge_read(fd, ++(*foffs), block, 1245 crypt_block, 1246 space)) { 1247 1248 goto err_exit; 1249 } 1250 1251 /* Wrap around to the beginning of the buffer. */ 1252 b = &block[0]; 1253 1254 /* Copy the record. */ 1255 memcpy(*buf + avail_size, b, extra_size - avail_size); 1256 b += extra_size - avail_size; 1257 1258 *mrec = *buf + extra_size; 1259 1260 rec_init_offsets_temp(*mrec, index, offsets); 1261 1262 data_size = rec_offs_data_size(offsets); 1263 1264 /* These overflows should be impossible given that 1265 records are much smaller than either buffer, and 1266 the record starts near the beginning of each buffer. */ 1267 ut_a(extra_size + data_size < sizeof *buf); 1268 ut_a(b + data_size < &block[srv_sort_buf_size]); 1269 1270 /* Copy the data bytes. */ 1271 memcpy(*buf + extra_size, b, data_size); 1272 b += data_size; 1273 1274 goto func_exit; 1275 } 1276 1277 *mrec = b + extra_size; 1278 1279 rec_init_offsets_temp(*mrec, index, offsets); 1280 1281 data_size = rec_offs_data_size(offsets); 1282 ut_ad(extra_size + data_size < sizeof *buf); 1283 1284 b += extra_size + data_size; 1285 1286 if (UNIV_LIKELY(b < &block[srv_sort_buf_size])) { 1287 /* The record fits entirely in the block. 1288 This is the normal case. */ 1289 goto func_exit; 1290 } 1291 1292 /* The record spans two blocks. Copy it to buf. */ 1293 1294 b -= extra_size + data_size; 1295 avail_size = ulint(&block[srv_sort_buf_size] - b); 1296 memcpy(*buf, b, avail_size); 1297 *mrec = *buf + extra_size; 1298 1299 rec_init_offsets_temp(*mrec, index, offsets); 1300 1301 if (!row_merge_read(fd, ++(*foffs), block, 1302 crypt_block, 1303 space)) { 1304 1305 goto err_exit; 1306 } 1307 1308 /* Wrap around to the beginning of the buffer. */ 1309 b = &block[0]; 1310 1311 /* Copy the rest of the record. */ 1312 memcpy(*buf + avail_size, b, extra_size + data_size - avail_size); 1313 b += extra_size + data_size - avail_size; 1314 1315 func_exit: 1316 DBUG_LOG("ib_merge_sort", 1317 reinterpret_cast<const void*>(b) << ',' << 1318 reinterpret_cast<const void*>(block) 1319 << ",fd=" << fd << ',' << *foffs << ": " 1320 << rec_printer(*mrec, 0, offsets).str()); 1321 DBUG_RETURN(b); 1322 } 1323 1324 /********************************************************************//** 1325 Write a merge record. */ 1326 static 1327 void 1328 row_merge_write_rec_low( 1329 /*====================*/ 1330 byte* b, /*!< out: buffer */ 1331 ulint e, /*!< in: encoded extra_size */ 1332 #ifndef DBUG_OFF 1333 ulint size, /*!< in: total size to write */ 1334 const pfs_os_file_t& fd, /*!< in: file descriptor */ 1335 ulint foffs, /*!< in: file offset */ 1336 #endif /* !DBUG_OFF */ 1337 const mrec_t* mrec, /*!< in: record to write */ 1338 const rec_offs* offsets)/*!< in: offsets of mrec */ 1339 #ifdef DBUG_OFF 1340 # define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets) \ 1341 row_merge_write_rec_low(b, e, mrec, offsets) 1342 #endif /* DBUG_OFF */ 1343 { 1344 DBUG_ENTER("row_merge_write_rec_low"); 1345 1346 #ifndef DBUG_OFF 1347 const byte* const end = b + size; 1348 #endif /* DBUG_OFF */ 1349 DBUG_ASSERT(e == rec_offs_extra_size(offsets) + 1); 1350 1351 DBUG_LOG("ib_merge_sort", 1352 reinterpret_cast<const void*>(b) << ",fd=" << fd << ',' 1353 << foffs << ": " << rec_printer(mrec, 0, offsets).str()); 1354 1355 if (e < 0x80) { 1356 *b++ = (byte) e; 1357 } else { 1358 *b++ = (byte) (0x80 | (e >> 8)); 1359 *b++ = (byte) e; 1360 } 1361 1362 memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets)); 1363 DBUG_SLOW_ASSERT(b + rec_offs_size(offsets) == end); 1364 DBUG_VOID_RETURN; 1365 } 1366 1367 /********************************************************************//** 1368 Write a merge record. 1369 @return pointer to end of block, or NULL on error */ 1370 static 1371 byte* 1372 row_merge_write_rec( 1373 /*================*/ 1374 row_merge_block_t* block, /*!< in/out: file buffer */ 1375 mrec_buf_t* buf, /*!< in/out: secondary buffer */ 1376 byte* b, /*!< in: pointer to end of block */ 1377 const pfs_os_file_t& fd, /*!< in: file descriptor */ 1378 ulint* foffs, /*!< in/out: file offset */ 1379 const mrec_t* mrec, /*!< in: record to write */ 1380 const rec_offs* offsets,/*!< in: offsets of mrec */ 1381 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ 1382 ulint space) /*!< in: space id */ 1383 { 1384 ulint extra_size; 1385 ulint size; 1386 ulint avail_size; 1387 1388 ut_ad(block); 1389 ut_ad(buf); 1390 ut_ad(b >= &block[0]); 1391 ut_ad(b < &block[srv_sort_buf_size]); 1392 ut_ad(mrec); 1393 ut_ad(foffs); 1394 ut_ad(mrec < &block[0] || mrec > &block[srv_sort_buf_size]); 1395 ut_ad(mrec < buf[0] || mrec > buf[1]); 1396 1397 /* Normalize extra_size. Value 0 signals "end of list". */ 1398 extra_size = rec_offs_extra_size(offsets) + 1; 1399 1400 size = extra_size + (extra_size >= 0x80) 1401 + rec_offs_data_size(offsets); 1402 1403 if (UNIV_UNLIKELY(b + size >= &block[srv_sort_buf_size])) { 1404 /* The record spans two blocks. 1405 Copy it to the temporary buffer first. */ 1406 avail_size = ulint(&block[srv_sort_buf_size] - b); 1407 1408 row_merge_write_rec_low(buf[0], 1409 extra_size, size, fd, *foffs, 1410 mrec, offsets); 1411 1412 /* Copy the head of the temporary buffer, write 1413 the completed block, and copy the tail of the 1414 record to the head of the new block. */ 1415 memcpy(b, buf[0], avail_size); 1416 1417 if (!row_merge_write(fd, (*foffs)++, block, 1418 crypt_block, 1419 space)) { 1420 return(NULL); 1421 } 1422 1423 MEM_UNDEFINED(&block[0], srv_sort_buf_size); 1424 1425 /* Copy the rest. */ 1426 b = &block[0]; 1427 memcpy(b, buf[0] + avail_size, size - avail_size); 1428 b += size - avail_size; 1429 } else { 1430 row_merge_write_rec_low(b, extra_size, size, fd, *foffs, 1431 mrec, offsets); 1432 b += size; 1433 } 1434 1435 return(b); 1436 } 1437 1438 /********************************************************************//** 1439 Write an end-of-list marker. 1440 @return pointer to end of block, or NULL on error */ 1441 static 1442 byte* 1443 row_merge_write_eof( 1444 /*================*/ 1445 row_merge_block_t* block, /*!< in/out: file buffer */ 1446 byte* b, /*!< in: pointer to end of block */ 1447 const pfs_os_file_t& fd, /*!< in: file descriptor */ 1448 ulint* foffs, /*!< in/out: file offset */ 1449 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ 1450 ulint space) /*!< in: space id */ 1451 { 1452 ut_ad(block); 1453 ut_ad(b >= &block[0]); 1454 ut_ad(b < &block[srv_sort_buf_size]); 1455 ut_ad(foffs); 1456 1457 DBUG_ENTER("row_merge_write_eof"); 1458 DBUG_LOG("ib_merge_sort", 1459 reinterpret_cast<const void*>(b) << ',' << 1460 reinterpret_cast<const void*>(block) << 1461 ",fd=" << fd << ',' << *foffs); 1462 1463 *b++ = 0; 1464 MEM_CHECK_DEFINED(&block[0], b - &block[0]); 1465 MEM_CHECK_ADDRESSABLE(&block[0], srv_sort_buf_size); 1466 1467 /* The rest of the block is uninitialized. Silence warnings. */ 1468 MEM_MAKE_DEFINED(b, &block[srv_sort_buf_size] - b); 1469 1470 if (!row_merge_write(fd, (*foffs)++, block, crypt_block, space)) { 1471 DBUG_RETURN(NULL); 1472 } 1473 1474 MEM_UNDEFINED(&block[0], srv_sort_buf_size); 1475 DBUG_RETURN(&block[0]); 1476 } 1477 1478 /** Create a temporary file if it has not been created already. 1479 @param[in,out] tmpfd temporary file handle 1480 @param[in] path location for creating temporary file 1481 @return true on success, false on error */ 1482 static MY_ATTRIBUTE((warn_unused_result)) 1483 bool 1484 row_merge_tmpfile_if_needed( 1485 pfs_os_file_t* tmpfd, 1486 const char* path) 1487 { 1488 if (*tmpfd == OS_FILE_CLOSED) { 1489 *tmpfd = row_merge_file_create_low(path); 1490 if (*tmpfd != OS_FILE_CLOSED) { 1491 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES); 1492 } 1493 } 1494 1495 return(*tmpfd != OS_FILE_CLOSED); 1496 } 1497 1498 /** Create a temporary file for merge sort if it was not created already. 1499 @param[in,out] file merge file structure 1500 @param[in] nrec number of records in the file 1501 @param[in] path location for creating temporary file 1502 @return true on success, false on error */ 1503 static MY_ATTRIBUTE((warn_unused_result)) 1504 bool 1505 row_merge_file_create_if_needed( 1506 merge_file_t* file, 1507 pfs_os_file_t* tmpfd, 1508 ulint nrec, 1509 const char* path) 1510 { 1511 ut_ad(file->fd == OS_FILE_CLOSED || *tmpfd != OS_FILE_CLOSED); 1512 if (file->fd == OS_FILE_CLOSED && row_merge_file_create(file, path)!= OS_FILE_CLOSED) { 1513 MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES); 1514 if (!row_merge_tmpfile_if_needed(tmpfd, path) ) { 1515 return(false); 1516 } 1517 1518 file->n_rec = nrec; 1519 } 1520 1521 ut_ad(file->fd == OS_FILE_CLOSED || *tmpfd != OS_FILE_CLOSED); 1522 return(file->fd != OS_FILE_CLOSED); 1523 } 1524 1525 /** Copy the merge data tuple from another merge data tuple. 1526 @param[in] mtuple source merge data tuple 1527 @param[in,out] prev_mtuple destination merge data tuple 1528 @param[in] n_unique number of unique fields exist in the mtuple 1529 @param[in,out] heap memory heap where last_mtuple allocated */ 1530 static 1531 void 1532 row_mtuple_create( 1533 const mtuple_t* mtuple, 1534 mtuple_t* prev_mtuple, 1535 ulint n_unique, 1536 mem_heap_t* heap) 1537 { 1538 memcpy(prev_mtuple->fields, mtuple->fields, 1539 n_unique * sizeof *mtuple->fields); 1540 1541 dfield_t* field = prev_mtuple->fields; 1542 1543 for (ulint i = 0; i < n_unique; i++) { 1544 dfield_dup(field++, heap); 1545 } 1546 } 1547 1548 /** Compare two merge data tuples. 1549 @param[in] prev_mtuple merge data tuple 1550 @param[in] current_mtuple merge data tuple 1551 @param[in,out] dup reporter of duplicates 1552 @retval positive, 0, negative if current_mtuple is greater, equal, less, than 1553 last_mtuple. */ 1554 static 1555 int 1556 row_mtuple_cmp( 1557 const mtuple_t* prev_mtuple, 1558 const mtuple_t* current_mtuple, 1559 row_merge_dup_t* dup) 1560 { 1561 ut_ad(dict_index_is_clust(dup->index)); 1562 const ulint n_unique = dict_index_get_n_unique(dup->index); 1563 1564 return(row_merge_tuple_cmp( 1565 n_unique, n_unique, *current_mtuple, *prev_mtuple, dup)); 1566 } 1567 1568 /** Insert cached spatial index rows. 1569 @param[in] trx_id transaction id 1570 @param[in] sp_tuples cached spatial rows 1571 @param[in] num_spatial number of spatial indexes 1572 @param[in,out] row_heap heap for insert 1573 @param[in,out] sp_heap heap for tuples 1574 @param[in,out] pcur cluster index cursor 1575 @param[in,out] mtr mini transaction 1576 @return DB_SUCCESS or error number */ 1577 static 1578 dberr_t 1579 row_merge_spatial_rows( 1580 trx_id_t trx_id, 1581 index_tuple_info_t** sp_tuples, 1582 ulint num_spatial, 1583 mem_heap_t* row_heap, 1584 mem_heap_t* sp_heap, 1585 btr_pcur_t* pcur, 1586 mtr_t* mtr) 1587 { 1588 dberr_t err = DB_SUCCESS; 1589 1590 if (sp_tuples == NULL) { 1591 return(DB_SUCCESS); 1592 } 1593 1594 ut_ad(sp_heap != NULL); 1595 1596 for (ulint j = 0; j < num_spatial; j++) { 1597 err = sp_tuples[j]->insert(trx_id, row_heap, pcur, mtr); 1598 1599 if (err != DB_SUCCESS) { 1600 return(err); 1601 } 1602 } 1603 1604 mem_heap_empty(sp_heap); 1605 1606 return(err); 1607 } 1608 1609 /** Check if the geometry field is valid. 1610 @param[in] row the row 1611 @param[in] index spatial index 1612 @return true if it's valid, false if it's invalid. */ 1613 static 1614 bool 1615 row_geo_field_is_valid( 1616 const dtuple_t* row, 1617 dict_index_t* index) 1618 { 1619 const dict_field_t* ind_field 1620 = dict_index_get_nth_field(index, 0); 1621 const dict_col_t* col 1622 = ind_field->col; 1623 ulint col_no 1624 = dict_col_get_no(col); 1625 const dfield_t* dfield 1626 = dtuple_get_nth_field(row, col_no); 1627 1628 if (dfield_is_null(dfield) 1629 || dfield_get_len(dfield) < GEO_DATA_HEADER_SIZE) { 1630 return(false); 1631 } 1632 1633 return(true); 1634 } 1635 1636 /** Reads clustered index of the table and create temporary files 1637 containing the index entries for the indexes to be built. 1638 @param[in] trx transaction 1639 @param[in,out] table MySQL table object, for reporting erroneous 1640 records 1641 @param[in] old_table table where rows are read from 1642 @param[in] new_table table where indexes are created; identical to 1643 old_table unless creating a PRIMARY KEY 1644 @param[in] online true if creating indexes online 1645 @param[in] index indexes to be created 1646 @param[in] fts_sort_idx full-text index to be created, or NULL 1647 @param[in] psort_info parallel sort info for fts_sort_idx creation, 1648 or NULL 1649 @param[in] files temporary files 1650 @param[in] key_numbers MySQL key numbers to create 1651 @param[in] n_index number of indexes to create 1652 @param[in] defaults default values of added, changed columns, or NULL 1653 @param[in] add_v newly added virtual columns along with indexes 1654 @param[in] col_map mapping of old column numbers to new ones, or 1655 NULL if old_table == new_table 1656 @param[in] add_autoinc number of added AUTO_INCREMENT columns, or 1657 ULINT_UNDEFINED if none is added 1658 @param[in,out] sequence autoinc sequence 1659 @param[in,out] block file buffer 1660 @param[in] skip_pk_sort whether the new PRIMARY KEY will follow 1661 existing order 1662 @param[in,out] tmpfd temporary file handle 1663 @param[in,out] stage performance schema accounting object, used by 1664 ALTER TABLE. stage->n_pk_recs_inc() will be called for each record read and 1665 stage->inc() will be called for each page read. 1666 @param[in] pct_cost percent of task weight out of total alter job 1667 @param[in,out] crypt_block crypted file buffer 1668 @param[in] eval_table mysql table used to evaluate virtual column 1669 value, see innobase_get_computed_value(). 1670 @param[in] allow_not_null allow null to not-null conversion 1671 @return DB_SUCCESS or error */ 1672 static MY_ATTRIBUTE((warn_unused_result)) 1673 dberr_t 1674 row_merge_read_clustered_index( 1675 trx_t* trx, 1676 struct TABLE* table, 1677 const dict_table_t* old_table, 1678 dict_table_t* new_table, 1679 bool online, 1680 dict_index_t** index, 1681 dict_index_t* fts_sort_idx, 1682 fts_psort_t* psort_info, 1683 merge_file_t* files, 1684 const ulint* key_numbers, 1685 ulint n_index, 1686 const dtuple_t* defaults, 1687 const dict_add_v_col_t* add_v, 1688 const ulint* col_map, 1689 ulint add_autoinc, 1690 ib_sequence_t& sequence, 1691 row_merge_block_t* block, 1692 bool skip_pk_sort, 1693 pfs_os_file_t* tmpfd, 1694 ut_stage_alter_t* stage, 1695 double pct_cost, 1696 row_merge_block_t* crypt_block, 1697 struct TABLE* eval_table, 1698 bool allow_not_null) 1699 { 1700 dict_index_t* clust_index; /* Clustered index */ 1701 mem_heap_t* row_heap = NULL;/* Heap memory to create 1702 clustered index tuples */ 1703 row_merge_buf_t** merge_buf; /* Temporary list for records*/ 1704 mem_heap_t* v_heap = NULL; /* Heap memory to process large 1705 data for virtual column */ 1706 btr_pcur_t pcur; /* Cursor on the clustered 1707 index */ 1708 mtr_t mtr; /* Mini transaction */ 1709 dberr_t err = DB_SUCCESS;/* Return code */ 1710 ulint n_nonnull = 0; /* number of columns 1711 changed to NOT NULL */ 1712 ulint* nonnull = NULL; /* NOT NULL columns */ 1713 dict_index_t* fts_index = NULL;/* FTS index */ 1714 doc_id_t doc_id = 0; 1715 doc_id_t max_doc_id = 0; 1716 ibool add_doc_id = FALSE; 1717 os_event_t fts_parallel_sort_event = NULL; 1718 ibool fts_pll_sort = FALSE; 1719 int64_t sig_count = 0; 1720 index_tuple_info_t** sp_tuples = NULL; 1721 mem_heap_t* sp_heap = NULL; 1722 ulint num_spatial = 0; 1723 BtrBulk* clust_btr_bulk = NULL; 1724 bool clust_temp_file = false; 1725 mem_heap_t* mtuple_heap = NULL; 1726 mtuple_t prev_mtuple; 1727 mem_heap_t* conv_heap = NULL; 1728 double curr_progress = 0.0; 1729 ib_uint64_t read_rows = 0; 1730 ib_uint64_t table_total_rows = 0; 1731 char new_sys_trx_start[8]; 1732 char new_sys_trx_end[8]; 1733 byte any_autoinc_data[8] = {0}; 1734 bool vers_update_trt = false; 1735 1736 DBUG_ENTER("row_merge_read_clustered_index"); 1737 1738 ut_ad((old_table == new_table) == !col_map); 1739 ut_ad(!defaults || col_map); 1740 ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE)); 1741 ut_ad(trx->id); 1742 1743 table_total_rows = dict_table_get_n_rows(old_table); 1744 if(table_total_rows == 0) { 1745 /* We don't know total row count */ 1746 table_total_rows = 1; 1747 } 1748 1749 trx->op_info = "reading clustered index"; 1750 1751 #ifdef FTS_INTERNAL_DIAG_PRINT 1752 DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n"); 1753 #endif 1754 1755 /* Create and initialize memory for record buffers */ 1756 1757 merge_buf = static_cast<row_merge_buf_t**>( 1758 ut_malloc_nokey(n_index * sizeof *merge_buf)); 1759 1760 row_merge_dup_t clust_dup = {index[0], table, col_map, 0}; 1761 dfield_t* prev_fields; 1762 const ulint n_uniq = dict_index_get_n_unique(index[0]); 1763 1764 ut_ad(trx->mysql_thd != NULL); 1765 1766 const char* path = thd_innodb_tmpdir(trx->mysql_thd); 1767 1768 ut_ad(!skip_pk_sort || dict_index_is_clust(index[0])); 1769 /* There is no previous tuple yet. */ 1770 prev_mtuple.fields = NULL; 1771 1772 for (ulint i = 0; i < n_index; i++) { 1773 if (index[i]->type & DICT_FTS) { 1774 1775 /* We are building a FT index, make sure 1776 we have the temporary 'fts_sort_idx' */ 1777 ut_a(fts_sort_idx); 1778 1779 fts_index = index[i]; 1780 1781 merge_buf[i] = row_merge_buf_create(fts_sort_idx); 1782 1783 add_doc_id = DICT_TF2_FLAG_IS_SET( 1784 new_table, DICT_TF2_FTS_ADD_DOC_ID); 1785 1786 /* If Doc ID does not exist in the table itself, 1787 fetch the first FTS Doc ID */ 1788 if (add_doc_id) { 1789 fts_get_next_doc_id( 1790 (dict_table_t*) new_table, 1791 &doc_id); 1792 ut_ad(doc_id > 0); 1793 } 1794 1795 fts_pll_sort = TRUE; 1796 row_fts_start_psort(psort_info); 1797 fts_parallel_sort_event = 1798 psort_info[0].psort_common->sort_event; 1799 } else { 1800 if (dict_index_is_spatial(index[i])) { 1801 num_spatial++; 1802 } 1803 1804 merge_buf[i] = row_merge_buf_create(index[i]); 1805 } 1806 } 1807 1808 if (num_spatial > 0) { 1809 ulint count = 0; 1810 1811 sp_heap = mem_heap_create(512); 1812 1813 sp_tuples = static_cast<index_tuple_info_t**>( 1814 ut_malloc_nokey(num_spatial 1815 * sizeof(*sp_tuples))); 1816 1817 for (ulint i = 0; i < n_index; i++) { 1818 if (dict_index_is_spatial(index[i])) { 1819 sp_tuples[count] 1820 = UT_NEW_NOKEY( 1821 index_tuple_info_t( 1822 sp_heap, 1823 index[i])); 1824 count++; 1825 } 1826 } 1827 1828 ut_ad(count == num_spatial); 1829 } 1830 1831 mtr_start(&mtr); 1832 1833 /* Find the clustered index and create a persistent cursor 1834 based on that. */ 1835 1836 clust_index = dict_table_get_first_index(old_table); 1837 const ulint old_trx_id_col = DATA_TRX_ID - DATA_N_SYS_COLS 1838 + ulint(old_table->n_cols); 1839 ut_ad(old_table->cols[old_trx_id_col].mtype == DATA_SYS); 1840 ut_ad(old_table->cols[old_trx_id_col].prtype 1841 == (DATA_TRX_ID | DATA_NOT_NULL)); 1842 ut_ad(old_table->cols[old_trx_id_col + 1].mtype == DATA_SYS); 1843 ut_ad(old_table->cols[old_trx_id_col + 1].prtype 1844 == (DATA_ROLL_PTR | DATA_NOT_NULL)); 1845 const ulint new_trx_id_col = col_map 1846 ? col_map[old_trx_id_col] : old_trx_id_col; 1847 1848 btr_pcur_open_at_index_side( 1849 true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr); 1850 btr_pcur_move_to_next_user_rec(&pcur, &mtr); 1851 if (rec_is_metadata(btr_pcur_get_rec(&pcur), clust_index)) { 1852 ut_ad(btr_pcur_is_on_user_rec(&pcur)); 1853 /* Skip the metadata pseudo-record. */ 1854 } else { 1855 ut_ad(!clust_index->is_instant()); 1856 btr_pcur_move_to_prev_on_page(&pcur); 1857 } 1858 1859 if (old_table != new_table) { 1860 /* The table is being rebuilt. Identify the columns 1861 that were flagged NOT NULL in the new table, so that 1862 we can quickly check that the records in the old table 1863 do not violate the added NOT NULL constraints. */ 1864 1865 nonnull = static_cast<ulint*>( 1866 ut_malloc_nokey(dict_table_get_n_cols(new_table) 1867 * sizeof *nonnull)); 1868 1869 for (ulint i = 0; i < dict_table_get_n_cols(old_table); i++) { 1870 if (dict_table_get_nth_col(old_table, i)->prtype 1871 & DATA_NOT_NULL) { 1872 continue; 1873 } 1874 1875 const ulint j = col_map[i]; 1876 1877 if (j == ULINT_UNDEFINED) { 1878 /* The column was dropped. */ 1879 continue; 1880 } 1881 1882 if (dict_table_get_nth_col(new_table, j)->prtype 1883 & DATA_NOT_NULL) { 1884 nonnull[n_nonnull++] = j; 1885 } 1886 } 1887 1888 if (!n_nonnull) { 1889 ut_free(nonnull); 1890 nonnull = NULL; 1891 } 1892 } 1893 1894 row_heap = mem_heap_create(sizeof(mrec_buf_t)); 1895 1896 if (dict_table_is_comp(old_table) 1897 && !dict_table_is_comp(new_table)) { 1898 conv_heap = mem_heap_create(sizeof(mrec_buf_t)); 1899 } 1900 1901 if (skip_pk_sort) { 1902 prev_fields = static_cast<dfield_t*>( 1903 ut_malloc_nokey(n_uniq * sizeof *prev_fields)); 1904 mtuple_heap = mem_heap_create(sizeof(mrec_buf_t)); 1905 } else { 1906 prev_fields = NULL; 1907 } 1908 1909 mach_write_to_8(new_sys_trx_start, trx->id); 1910 mach_write_to_8(new_sys_trx_end, TRX_ID_MAX); 1911 uint64_t n_rows = 0; 1912 1913 /* Scan the clustered index. */ 1914 for (;;) { 1915 /* Do not continue if table pages are still encrypted */ 1916 if (!old_table->is_readable() || !new_table->is_readable()) { 1917 err = DB_DECRYPTION_FAILED; 1918 trx->error_key_num = 0; 1919 goto func_exit; 1920 } 1921 1922 const rec_t* rec; 1923 trx_id_t rec_trx_id; 1924 rec_offs* offsets; 1925 dtuple_t* row; 1926 row_ext_t* ext; 1927 page_cur_t* cur = btr_pcur_get_page_cur(&pcur); 1928 1929 mem_heap_empty(row_heap); 1930 1931 page_cur_move_to_next(cur); 1932 1933 stage->n_pk_recs_inc(); 1934 1935 if (page_cur_is_after_last(cur)) { 1936 1937 stage->inc(); 1938 1939 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) { 1940 err = DB_INTERRUPTED; 1941 trx->error_key_num = 0; 1942 goto func_exit; 1943 } 1944 1945 if (online && old_table != new_table) { 1946 err = row_log_table_get_error(clust_index); 1947 if (err != DB_SUCCESS) { 1948 trx->error_key_num = 0; 1949 goto func_exit; 1950 } 1951 } 1952 1953 /* Insert the cached spatial index rows. */ 1954 err = row_merge_spatial_rows( 1955 trx->id, sp_tuples, num_spatial, 1956 row_heap, sp_heap, &pcur, &mtr); 1957 1958 if (err != DB_SUCCESS) { 1959 goto func_exit; 1960 } 1961 1962 if (!mtr.is_active()) { 1963 goto scan_next; 1964 } 1965 1966 if (my_atomic_load32_explicit(&clust_index->lock.waiters, 1967 MY_MEMORY_ORDER_RELAXED)) { 1968 /* There are waiters on the clustered 1969 index tree lock, likely the purge 1970 thread. Store and restore the cursor 1971 position, and yield so that scanning a 1972 large table will not starve other 1973 threads. */ 1974 1975 /* Store the cursor position on the last user 1976 record on the page. */ 1977 btr_pcur_move_to_prev_on_page(&pcur); 1978 /* Leaf pages must never be empty, unless 1979 this is the only page in the index tree. */ 1980 ut_ad(btr_pcur_is_on_user_rec(&pcur) 1981 || btr_pcur_get_block( 1982 &pcur)->page.id.page_no() 1983 == clust_index->page); 1984 1985 btr_pcur_store_position(&pcur, &mtr); 1986 mtr_commit(&mtr); 1987 1988 /* Give the waiters a chance to proceed. */ 1989 os_thread_yield(); 1990 scan_next: 1991 mtr_start(&mtr); 1992 /* Restore position on the record, or its 1993 predecessor if the record was purged 1994 meanwhile. */ 1995 btr_pcur_restore_position( 1996 BTR_SEARCH_LEAF, &pcur, &mtr); 1997 /* Move to the successor of the 1998 original record. */ 1999 if (!btr_pcur_move_to_next_user_rec( 2000 &pcur, &mtr)) { 2001 end_of_index: 2002 row = NULL; 2003 mtr_commit(&mtr); 2004 mem_heap_free(row_heap); 2005 row_heap = NULL; 2006 ut_free(nonnull); 2007 nonnull = NULL; 2008 goto write_buffers; 2009 } 2010 } else { 2011 ulint next_page_no; 2012 buf_block_t* block; 2013 2014 next_page_no = btr_page_get_next( 2015 page_cur_get_page(cur)); 2016 2017 if (next_page_no == FIL_NULL) { 2018 goto end_of_index; 2019 } 2020 2021 block = page_cur_get_block(cur); 2022 block = btr_block_get( 2023 page_id_t(block->page.id.space(), 2024 next_page_no), 2025 block->page.size, 2026 BTR_SEARCH_LEAF, 2027 clust_index, &mtr); 2028 2029 btr_leaf_page_release(page_cur_get_block(cur), 2030 BTR_SEARCH_LEAF, &mtr); 2031 page_cur_set_before_first(block, cur); 2032 page_cur_move_to_next(cur); 2033 2034 ut_ad(!page_cur_is_after_last(cur)); 2035 } 2036 } 2037 2038 rec = page_cur_get_rec(cur); 2039 2040 if (online) { 2041 offsets = rec_get_offsets(rec, clust_index, NULL, 2042 clust_index->n_core_fields, 2043 ULINT_UNDEFINED, &row_heap); 2044 rec_trx_id = row_get_rec_trx_id(rec, clust_index, 2045 offsets); 2046 2047 /* Perform a REPEATABLE READ. 2048 2049 When rebuilding the table online, 2050 row_log_table_apply() must not see a newer 2051 state of the table when applying the log. 2052 This is mainly to prevent false duplicate key 2053 errors, because the log will identify records 2054 by the PRIMARY KEY, and also to prevent unsafe 2055 BLOB access. 2056 2057 When creating a secondary index online, this 2058 table scan must not see records that have only 2059 been inserted to the clustered index, but have 2060 not been written to the online_log of 2061 index[]. If we performed READ UNCOMMITTED, it 2062 could happen that the ADD INDEX reaches 2063 ONLINE_INDEX_COMPLETE state between the time 2064 the DML thread has updated the clustered index 2065 but has not yet accessed secondary index. */ 2066 ut_ad(trx->read_view.is_open()); 2067 ut_ad(rec_trx_id != trx->id); 2068 2069 if (!trx->read_view.changes_visible( 2070 rec_trx_id, old_table->name)) { 2071 rec_t* old_vers; 2072 2073 row_vers_build_for_consistent_read( 2074 rec, &mtr, clust_index, &offsets, 2075 &trx->read_view, &row_heap, 2076 row_heap, &old_vers, NULL); 2077 2078 if (!old_vers) { 2079 continue; 2080 } 2081 2082 /* The old version must necessarily be 2083 in the "prehistory", because the 2084 exclusive lock in 2085 ha_innobase::prepare_inplace_alter_table() 2086 forced the completion of any transactions 2087 that accessed this table. */ 2088 ut_ad(row_get_rec_trx_id(old_vers, clust_index, 2089 offsets) < trx->id); 2090 2091 rec = old_vers; 2092 rec_trx_id = 0; 2093 } 2094 2095 if (rec_get_deleted_flag( 2096 rec, 2097 dict_table_is_comp(old_table))) { 2098 /* In delete-marked records, DB_TRX_ID must 2099 always refer to an existing undo log record. 2100 Above, we did reset rec_trx_id = 0 2101 for rec = old_vers.*/ 2102 ut_ad(rec == page_cur_get_rec(cur) 2103 ? rec_trx_id 2104 : !rec_trx_id); 2105 /* This record was deleted in the latest 2106 committed version, or it was deleted and 2107 then reinserted-by-update before purge 2108 kicked in. Skip it. */ 2109 continue; 2110 } 2111 2112 ut_ad(!rec_offs_any_null_extern(rec, offsets)); 2113 } else if (rec_get_deleted_flag( 2114 rec, dict_table_is_comp(old_table))) { 2115 /* In delete-marked records, DB_TRX_ID must 2116 always refer to an existing undo log record. */ 2117 ut_d(rec_trx_id = rec_get_trx_id(rec, clust_index)); 2118 ut_ad(rec_trx_id); 2119 /* This must be a purgeable delete-marked record, 2120 and the transaction that delete-marked the record 2121 must have been committed before this 2122 !online ALTER TABLE transaction. */ 2123 ut_ad(rec_trx_id < trx->id); 2124 /* Skip delete-marked records. 2125 2126 Skipping delete-marked records will make the 2127 created indexes unuseable for transactions 2128 whose read views were created before the index 2129 creation completed, but an attempt to preserve 2130 the history would make it tricky to detect 2131 duplicate keys. */ 2132 continue; 2133 } else { 2134 offsets = rec_get_offsets(rec, clust_index, NULL, 2135 clust_index->n_core_fields, 2136 ULINT_UNDEFINED, &row_heap); 2137 /* This is a locking ALTER TABLE. 2138 2139 If we are not rebuilding the table, the 2140 DB_TRX_ID does not matter, as it is not being 2141 written to any secondary indexes; see 2142 if (old_table == new_table) below. 2143 2144 If we are rebuilding the table, the 2145 DB_TRX_ID,DB_ROLL_PTR should be reset, because 2146 there will be no history available. */ 2147 ut_ad(rec_get_trx_id(rec, clust_index) < trx->id); 2148 rec_trx_id = 0; 2149 } 2150 2151 /* When !online, we are holding a lock on old_table, preventing 2152 any inserts that could have written a record 'stub' before 2153 writing out off-page columns. */ 2154 ut_ad(!rec_offs_any_null_extern(rec, offsets)); 2155 2156 /* Build a row based on the clustered index. */ 2157 2158 row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index, 2159 rec, offsets, new_table, 2160 defaults, add_v, col_map, &ext, 2161 row_heap); 2162 ut_ad(row); 2163 2164 for (ulint i = 0; i < n_nonnull; i++) { 2165 dfield_t* field = &row->fields[nonnull[i]]; 2166 2167 ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL); 2168 2169 if (dfield_is_null(field)) { 2170 2171 Field* null_field = 2172 table->field[nonnull[i]]; 2173 2174 null_field->set_warning( 2175 Sql_condition::WARN_LEVEL_WARN, 2176 WARN_DATA_TRUNCATED, 1, 2177 ulong(n_rows + 1)); 2178 2179 if (!allow_not_null) { 2180 err = DB_INVALID_NULL; 2181 trx->error_key_num = 0; 2182 goto func_exit; 2183 } 2184 2185 const dfield_t& default_field 2186 = defaults->fields[nonnull[i]]; 2187 2188 *field = default_field; 2189 } 2190 } 2191 2192 /* Get the next Doc ID */ 2193 if (add_doc_id) { 2194 doc_id++; 2195 } else { 2196 doc_id = 0; 2197 } 2198 2199 ut_ad(row->fields[new_trx_id_col].type.mtype == DATA_SYS); 2200 ut_ad(row->fields[new_trx_id_col].type.prtype 2201 == (DATA_TRX_ID | DATA_NOT_NULL)); 2202 ut_ad(row->fields[new_trx_id_col].len == DATA_TRX_ID_LEN); 2203 ut_ad(row->fields[new_trx_id_col + 1].type.mtype == DATA_SYS); 2204 ut_ad(row->fields[new_trx_id_col + 1].type.prtype 2205 == (DATA_ROLL_PTR | DATA_NOT_NULL)); 2206 ut_ad(row->fields[new_trx_id_col + 1].len == DATA_ROLL_PTR_LEN); 2207 2208 if (old_table == new_table) { 2209 /* Do not bother touching DB_TRX_ID,DB_ROLL_PTR 2210 because they are not going to be written into 2211 secondary indexes. */ 2212 } else if (rec_trx_id < trx->id) { 2213 /* Reset the DB_TRX_ID,DB_ROLL_PTR of old rows 2214 for which history is not going to be 2215 available after the rebuild operation. 2216 This essentially mimics row_purge_reset_trx_id(). */ 2217 row->fields[new_trx_id_col].data 2218 = const_cast<byte*>(reset_trx_id); 2219 row->fields[new_trx_id_col + 1].data 2220 = const_cast<byte*>(reset_trx_id 2221 + DATA_TRX_ID_LEN); 2222 } 2223 2224 if (add_autoinc != ULINT_UNDEFINED) { 2225 2226 ut_ad(add_autoinc 2227 < dict_table_get_n_user_cols(new_table)); 2228 2229 bool history_row = false; 2230 if (new_table->versioned()) { 2231 const dfield_t* dfield = dtuple_get_nth_field( 2232 row, new_table->vers_end); 2233 history_row = dfield->vers_history_row(); 2234 } 2235 2236 dfield_t* dfield = dtuple_get_nth_field(row, 2237 add_autoinc); 2238 2239 if (new_table->versioned()) { 2240 if (history_row) { 2241 if (dfield_get_type(dfield)->prtype & DATA_NOT_NULL) { 2242 err = DB_UNSUPPORTED; 2243 my_error(ER_UNSUPPORTED_EXTENSION, MYF(0), 2244 old_table->name.m_name); 2245 goto func_exit; 2246 } 2247 dfield_set_null(dfield); 2248 } else { 2249 // set not null 2250 ulint len = dfield_get_type(dfield)->len; 2251 dfield_set_data(dfield, any_autoinc_data, len); 2252 } 2253 } 2254 2255 if (dfield_is_null(dfield)) { 2256 goto write_buffers; 2257 } 2258 2259 const dtype_t* dtype = dfield_get_type(dfield); 2260 byte* b = static_cast<byte*>(dfield_get_data(dfield)); 2261 2262 if (sequence.eof()) { 2263 err = DB_ERROR; 2264 trx->error_key_num = 0; 2265 2266 ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR, 2267 ER_AUTOINC_READ_FAILED, "[NULL]"); 2268 2269 goto func_exit; 2270 } 2271 2272 ulonglong value = sequence++; 2273 2274 switch (dtype_get_mtype(dtype)) { 2275 case DATA_INT: { 2276 ibool usign; 2277 ulint len = dfield_get_len(dfield); 2278 2279 usign = dtype_get_prtype(dtype) & DATA_UNSIGNED; 2280 mach_write_ulonglong(b, value, len, usign); 2281 2282 break; 2283 } 2284 2285 case DATA_FLOAT: 2286 mach_float_write( 2287 b, static_cast<float>(value)); 2288 break; 2289 2290 case DATA_DOUBLE: 2291 mach_double_write( 2292 b, static_cast<double>(value)); 2293 break; 2294 2295 default: 2296 ut_ad(0); 2297 } 2298 } 2299 2300 if (old_table->versioned()) { 2301 if (!new_table->versioned() 2302 && clust_index->vers_history_row(rec, offsets)) { 2303 continue; 2304 } 2305 } else if (new_table->versioned()) { 2306 dfield_t* start = 2307 dtuple_get_nth_field(row, new_table->vers_start); 2308 dfield_t* end = 2309 dtuple_get_nth_field(row, new_table->vers_end); 2310 dfield_set_data(start, new_sys_trx_start, 8); 2311 dfield_set_data(end, new_sys_trx_end, 8); 2312 vers_update_trt = true; 2313 } 2314 2315 write_buffers: 2316 /* Build all entries for all the indexes to be created 2317 in a single scan of the clustered index. */ 2318 2319 n_rows++; 2320 ulint s_idx_cnt = 0; 2321 bool skip_sort = skip_pk_sort 2322 && dict_index_is_clust(merge_buf[0]->index); 2323 2324 for (ulint k = 0, i = 0; i < n_index; i++, skip_sort = false) { 2325 row_merge_buf_t* buf = merge_buf[i]; 2326 ulint rows_added = 0; 2327 2328 if (dict_index_is_spatial(buf->index)) { 2329 if (!row) { 2330 continue; 2331 } 2332 2333 ut_ad(sp_tuples[s_idx_cnt]->get_index() 2334 == buf->index); 2335 2336 /* If the geometry field is invalid, report 2337 error. */ 2338 if (!row_geo_field_is_valid(row, buf->index)) { 2339 err = DB_CANT_CREATE_GEOMETRY_OBJECT; 2340 break; 2341 } 2342 2343 sp_tuples[s_idx_cnt]->add(row, ext); 2344 s_idx_cnt++; 2345 2346 continue; 2347 } 2348 2349 ut_ad(!row 2350 || !dict_index_is_clust(buf->index) 2351 || trx_id_check(row->fields[new_trx_id_col].data, 2352 trx->id)); 2353 2354 merge_file_t* file = &files[k++]; 2355 2356 if (UNIV_LIKELY 2357 (row && (rows_added = row_merge_buf_add( 2358 buf, fts_index, old_table, new_table, 2359 psort_info, row, ext, &doc_id, 2360 conv_heap, &err, 2361 &v_heap, eval_table, trx)))) { 2362 2363 /* Set the page flush observer for the 2364 transaction when buffering the very first 2365 record for a non-redo-logged operation. */ 2366 if (file->n_rec == 0 && i == 0 2367 && innodb_log_optimize_ddl) { 2368 trx->set_flush_observer( 2369 new_table->space, stage); 2370 } 2371 2372 /* If we are creating FTS index, 2373 a single row can generate more 2374 records for tokenized word */ 2375 file->n_rec += rows_added; 2376 2377 if (err != DB_SUCCESS) { 2378 ut_ad(err == DB_TOO_BIG_RECORD); 2379 break; 2380 } 2381 2382 if (doc_id > max_doc_id) { 2383 max_doc_id = doc_id; 2384 } 2385 2386 if (buf->index->type & DICT_FTS) { 2387 /* Check if error occurs in child thread */ 2388 for (ulint j = 0; 2389 j < fts_sort_pll_degree; j++) { 2390 if (psort_info[j].error 2391 != DB_SUCCESS) { 2392 err = psort_info[j].error; 2393 trx->error_key_num = i; 2394 break; 2395 } 2396 } 2397 2398 if (err != DB_SUCCESS) { 2399 break; 2400 } 2401 } 2402 2403 if (skip_sort) { 2404 ut_ad(buf->n_tuples > 0); 2405 const mtuple_t* curr = 2406 &buf->tuples[buf->n_tuples - 1]; 2407 2408 ut_ad(i == 0); 2409 ut_ad(dict_index_is_clust(merge_buf[0]->index)); 2410 /* Detect duplicates by comparing the 2411 current record with previous record. 2412 When temp file is not used, records 2413 should be in sorted order. */ 2414 if (prev_mtuple.fields != NULL 2415 && (row_mtuple_cmp( 2416 &prev_mtuple, curr, 2417 &clust_dup) == 0)) { 2418 2419 err = DB_DUPLICATE_KEY; 2420 trx->error_key_num 2421 = key_numbers[0]; 2422 goto func_exit; 2423 } 2424 2425 prev_mtuple.fields = curr->fields; 2426 } 2427 2428 continue; 2429 } 2430 2431 if (err == DB_COMPUTE_VALUE_FAILED) { 2432 trx->error_key_num = i; 2433 goto func_exit; 2434 } 2435 2436 if (buf->index->type & DICT_FTS) { 2437 if (!row || !doc_id) { 2438 continue; 2439 } 2440 } 2441 2442 /* The buffer must be sufficiently large 2443 to hold at least one record. It may only 2444 be empty when we reach the end of the 2445 clustered index. row_merge_buf_add() 2446 must not have been called in this loop. */ 2447 ut_ad(buf->n_tuples || row == NULL); 2448 2449 /* We have enough data tuples to form a block. 2450 Sort them and write to disk if temp file is used 2451 or insert into index if temp file is not used. */ 2452 ut_ad(old_table == new_table 2453 ? !dict_index_is_clust(buf->index) 2454 : (i == 0) == dict_index_is_clust(buf->index)); 2455 2456 /* We have enough data tuples to form a block. 2457 Sort them (if !skip_sort) and write to disk. */ 2458 2459 if (buf->n_tuples) { 2460 if (skip_sort) { 2461 /* Temporary File is not used. 2462 so insert sorted block to the index */ 2463 if (row != NULL) { 2464 /* We have to do insert the 2465 cached spatial index rows, since 2466 after the mtr_commit, the cluster 2467 index page could be updated, then 2468 the data in cached rows become 2469 invalid. */ 2470 err = row_merge_spatial_rows( 2471 trx->id, sp_tuples, 2472 num_spatial, 2473 row_heap, sp_heap, 2474 &pcur, &mtr); 2475 2476 if (err != DB_SUCCESS) { 2477 goto func_exit; 2478 } 2479 2480 /* We are not at the end of 2481 the scan yet. We must 2482 mtr_commit() in order to be 2483 able to call log_free_check() 2484 in row_merge_insert_index_tuples(). 2485 Due to mtr_commit(), the 2486 current row will be invalid, and 2487 we must reread it on the next 2488 loop iteration. */ 2489 if (mtr.is_active()) { 2490 btr_pcur_move_to_prev_on_page( 2491 &pcur); 2492 btr_pcur_store_position( 2493 &pcur, &mtr); 2494 2495 mtr.commit(); 2496 } 2497 } 2498 2499 mem_heap_empty(mtuple_heap); 2500 prev_mtuple.fields = prev_fields; 2501 2502 row_mtuple_create( 2503 &buf->tuples[buf->n_tuples - 1], 2504 &prev_mtuple, n_uniq, 2505 mtuple_heap); 2506 2507 if (clust_btr_bulk == NULL) { 2508 clust_btr_bulk = UT_NEW_NOKEY( 2509 BtrBulk(index[i], 2510 trx, 2511 trx->get_flush_observer())); 2512 } else { 2513 clust_btr_bulk->latch(); 2514 } 2515 2516 err = row_merge_insert_index_tuples( 2517 index[i], old_table, 2518 OS_FILE_CLOSED, NULL, buf, 2519 clust_btr_bulk, 2520 table_total_rows, 2521 curr_progress, 2522 pct_cost, 2523 crypt_block, 2524 new_table->space_id); 2525 2526 if (row == NULL) { 2527 err = clust_btr_bulk->finish( 2528 err); 2529 UT_DELETE(clust_btr_bulk); 2530 clust_btr_bulk = NULL; 2531 } else { 2532 /* Release latches for possible 2533 log_free_chck in spatial index 2534 build. */ 2535 clust_btr_bulk->release(); 2536 } 2537 2538 if (err != DB_SUCCESS) { 2539 break; 2540 } 2541 2542 if (row != NULL) { 2543 /* Restore the cursor on the 2544 previous clustered index record, 2545 and empty the buffer. The next 2546 iteration of the outer loop will 2547 advance the cursor and read the 2548 next record (the one which we 2549 had to ignore due to the buffer 2550 overflow). */ 2551 mtr_start(&mtr); 2552 btr_pcur_restore_position( 2553 BTR_SEARCH_LEAF, &pcur, 2554 &mtr); 2555 buf = row_merge_buf_empty(buf); 2556 merge_buf[i] = buf; 2557 /* Restart the outer loop on the 2558 record. We did not insert it 2559 into any index yet. */ 2560 ut_ad(i == 0); 2561 break; 2562 } 2563 } else if (dict_index_is_unique(buf->index)) { 2564 row_merge_dup_t dup = { 2565 buf->index, table, col_map, 0}; 2566 2567 row_merge_buf_sort(buf, &dup); 2568 2569 if (dup.n_dup) { 2570 err = DB_DUPLICATE_KEY; 2571 trx->error_key_num 2572 = key_numbers[i]; 2573 break; 2574 } 2575 } else { 2576 row_merge_buf_sort(buf, NULL); 2577 } 2578 } else if (online && new_table == old_table) { 2579 /* Note the newest transaction that 2580 modified this index when the scan was 2581 completed. We prevent older readers 2582 from accessing this index, to ensure 2583 read consistency. */ 2584 2585 trx_id_t max_trx_id; 2586 2587 ut_a(row == NULL); 2588 rw_lock_x_lock( 2589 dict_index_get_lock(buf->index)); 2590 ut_a(dict_index_get_online_status(buf->index) 2591 == ONLINE_INDEX_CREATION); 2592 2593 max_trx_id = row_log_get_max_trx(buf->index); 2594 2595 if (max_trx_id > buf->index->trx_id) { 2596 buf->index->trx_id = max_trx_id; 2597 } 2598 2599 rw_lock_x_unlock( 2600 dict_index_get_lock(buf->index)); 2601 } 2602 2603 /* Secondary index and clustered index which is 2604 not in sorted order can use the temporary file. 2605 Fulltext index should not use the temporary file. */ 2606 if (!skip_sort && !(buf->index->type & DICT_FTS)) { 2607 /* In case we can have all rows in sort buffer, 2608 we can insert directly into the index without 2609 temporary file if clustered index does not uses 2610 temporary file. */ 2611 if (row == NULL && file->fd == OS_FILE_CLOSED 2612 && !clust_temp_file) { 2613 DBUG_EXECUTE_IF( 2614 "row_merge_write_failure", 2615 err = DB_TEMP_FILE_WRITE_FAIL; 2616 trx->error_key_num = i; 2617 goto all_done;); 2618 2619 DBUG_EXECUTE_IF( 2620 "row_merge_tmpfile_fail", 2621 err = DB_OUT_OF_MEMORY; 2622 trx->error_key_num = i; 2623 goto all_done;); 2624 2625 BtrBulk btr_bulk( 2626 index[i], trx, 2627 trx->get_flush_observer()); 2628 2629 err = row_merge_insert_index_tuples( 2630 index[i], old_table, 2631 OS_FILE_CLOSED, NULL, buf, 2632 &btr_bulk, 2633 table_total_rows, 2634 curr_progress, 2635 pct_cost, 2636 crypt_block, 2637 new_table->space_id); 2638 2639 err = btr_bulk.finish(err); 2640 2641 DBUG_EXECUTE_IF( 2642 "row_merge_insert_big_row", 2643 err = DB_TOO_BIG_RECORD;); 2644 2645 if (err != DB_SUCCESS) { 2646 break; 2647 } 2648 } else { 2649 if (!row_merge_file_create_if_needed( 2650 file, tmpfd, 2651 buf->n_tuples, path)) { 2652 err = DB_OUT_OF_MEMORY; 2653 trx->error_key_num = i; 2654 break; 2655 } 2656 2657 /* Ensure that duplicates in the 2658 clustered index will be detected before 2659 inserting secondary index records. */ 2660 if (dict_index_is_clust(buf->index)) { 2661 clust_temp_file = true; 2662 } 2663 2664 ut_ad(file->n_rec > 0); 2665 2666 row_merge_buf_write(buf, file, block); 2667 2668 if (!row_merge_write( 2669 file->fd, file->offset++, 2670 block, crypt_block, 2671 new_table->space_id)) { 2672 err = DB_TEMP_FILE_WRITE_FAIL; 2673 trx->error_key_num = i; 2674 break; 2675 } 2676 2677 MEM_UNDEFINED( 2678 &block[0], srv_sort_buf_size); 2679 } 2680 } 2681 merge_buf[i] = row_merge_buf_empty(buf); 2682 buf = merge_buf[i]; 2683 2684 if (UNIV_LIKELY(row != NULL)) { 2685 /* Try writing the record again, now 2686 that the buffer has been written out 2687 and emptied. */ 2688 2689 if (UNIV_UNLIKELY 2690 (!(rows_added = row_merge_buf_add( 2691 buf, fts_index, old_table, 2692 new_table, psort_info, row, ext, 2693 &doc_id, conv_heap, 2694 &err, &v_heap, eval_table, trx)))) { 2695 /* An empty buffer should have enough 2696 room for at least one record. */ 2697 ut_ad(err == DB_COMPUTE_VALUE_FAILED 2698 || err == DB_OUT_OF_MEMORY 2699 || err == DB_TOO_BIG_RECORD); 2700 } else if (err == DB_SUCCESS) { 2701 file->n_rec += rows_added; 2702 continue; 2703 } 2704 2705 trx->error_key_num = i; 2706 break; 2707 } 2708 } 2709 2710 if (row == NULL) { 2711 if (old_table != new_table) { 2712 new_table->stat_n_rows = n_rows; 2713 } 2714 2715 goto all_done; 2716 } 2717 2718 if (err != DB_SUCCESS) { 2719 goto func_exit; 2720 } 2721 2722 if (v_heap) { 2723 mem_heap_empty(v_heap); 2724 } 2725 2726 /* Increment innodb_onlineddl_pct_progress status variable */ 2727 read_rows++; 2728 if(read_rows % 1000 == 0) { 2729 /* Update progress for each 1000 rows */ 2730 curr_progress = (read_rows >= table_total_rows) ? 2731 pct_cost : 2732 ((pct_cost * read_rows) / table_total_rows); 2733 /* presenting 10.12% as 1012 integer */ 2734 onlineddl_pct_progress = (ulint) (curr_progress * 100); 2735 } 2736 } 2737 2738 func_exit: 2739 if (mtr.is_active()) { 2740 mtr_commit(&mtr); 2741 } 2742 if (row_heap) { 2743 mem_heap_free(row_heap); 2744 } 2745 ut_free(nonnull); 2746 2747 all_done: 2748 if (clust_btr_bulk != NULL) { 2749 ut_ad(err != DB_SUCCESS); 2750 clust_btr_bulk->latch(); 2751 err = clust_btr_bulk->finish( 2752 err); 2753 UT_DELETE(clust_btr_bulk); 2754 } 2755 2756 if (prev_fields != NULL) { 2757 ut_free(prev_fields); 2758 mem_heap_free(mtuple_heap); 2759 } 2760 2761 if (v_heap) { 2762 mem_heap_free(v_heap); 2763 } 2764 2765 if (conv_heap != NULL) { 2766 mem_heap_free(conv_heap); 2767 } 2768 2769 #ifdef FTS_INTERNAL_DIAG_PRINT 2770 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n"); 2771 #endif 2772 if (fts_pll_sort) { 2773 bool all_exit = false; 2774 ulint trial_count = 0; 2775 const ulint max_trial_count = 10000; 2776 2777 wait_again: 2778 /* Check if error occurs in child thread */ 2779 for (ulint j = 0; j < fts_sort_pll_degree; j++) { 2780 if (psort_info[j].error != DB_SUCCESS) { 2781 err = psort_info[j].error; 2782 trx->error_key_num = j; 2783 break; 2784 } 2785 } 2786 2787 /* Tell all children that parent has done scanning */ 2788 for (ulint i = 0; i < fts_sort_pll_degree; i++) { 2789 if (err == DB_SUCCESS) { 2790 psort_info[i].state = FTS_PARENT_COMPLETE; 2791 } else { 2792 psort_info[i].state = FTS_PARENT_EXITING; 2793 } 2794 } 2795 2796 /* Now wait all children to report back to be completed */ 2797 os_event_wait_time_low(fts_parallel_sort_event, 2798 1000000, sig_count); 2799 2800 for (ulint i = 0; i < fts_sort_pll_degree; i++) { 2801 if (psort_info[i].child_status != FTS_CHILD_COMPLETE 2802 && psort_info[i].child_status != FTS_CHILD_EXITING) { 2803 sig_count = os_event_reset( 2804 fts_parallel_sort_event); 2805 goto wait_again; 2806 } 2807 } 2808 2809 /* Now all children should complete, wait a bit until 2810 they all finish setting the event, before we free everything. 2811 This has a 10 second timeout */ 2812 do { 2813 all_exit = true; 2814 2815 for (ulint j = 0; j < fts_sort_pll_degree; j++) { 2816 if (psort_info[j].child_status 2817 != FTS_CHILD_EXITING) { 2818 all_exit = false; 2819 os_thread_sleep(1000); 2820 break; 2821 } 2822 } 2823 trial_count++; 2824 } while (!all_exit && trial_count < max_trial_count); 2825 2826 if (!all_exit) { 2827 ib::fatal() << "Not all child sort threads exited" 2828 " when creating FTS index '" 2829 << fts_sort_idx->name << "'"; 2830 } 2831 } 2832 2833 #ifdef FTS_INTERNAL_DIAG_PRINT 2834 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Tokenization\n"); 2835 #endif 2836 for (ulint i = 0; i < n_index; i++) { 2837 row_merge_buf_free(merge_buf[i]); 2838 } 2839 2840 row_fts_free_pll_merge_buf(psort_info); 2841 2842 ut_free(merge_buf); 2843 2844 btr_pcur_close(&pcur); 2845 2846 if (sp_tuples != NULL) { 2847 for (ulint i = 0; i < num_spatial; i++) { 2848 UT_DELETE(sp_tuples[i]); 2849 } 2850 ut_free(sp_tuples); 2851 2852 if (sp_heap) { 2853 mem_heap_free(sp_heap); 2854 } 2855 } 2856 2857 /* Update the next Doc ID we used. Table should be locked, so 2858 no concurrent DML */ 2859 if (max_doc_id && err == DB_SUCCESS) { 2860 /* Sync fts cache for other fts indexes to keep all 2861 fts indexes consistent in sync_doc_id. */ 2862 err = fts_sync_table(const_cast<dict_table_t*>(new_table)); 2863 2864 if (err == DB_SUCCESS) { 2865 fts_update_next_doc_id(NULL, new_table, max_doc_id); 2866 } 2867 } 2868 2869 if (vers_update_trt) { 2870 trx_mod_table_time_t& time = 2871 trx->mod_tables 2872 .insert(trx_mod_tables_t::value_type( 2873 const_cast<dict_table_t*>(new_table), 0)) 2874 .first->second; 2875 time.set_versioned(0); 2876 } 2877 2878 trx->op_info = ""; 2879 2880 DBUG_RETURN(err); 2881 } 2882 2883 /** Write a record via buffer 2 and read the next record to buffer N. 2884 @param N number of the buffer (0 or 1) 2885 @param INDEX record descriptor 2886 @param AT_END statement to execute at end of input */ 2887 #define ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END) \ 2888 do { \ 2889 b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], \ 2890 &buf[2], b2, \ 2891 of->fd, &of->offset, \ 2892 mrec##N, offsets##N, \ 2893 crypt_block ? &crypt_block[2 * srv_sort_buf_size] : NULL , \ 2894 space); \ 2895 if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) { \ 2896 goto corrupt; \ 2897 } \ 2898 b##N = row_merge_read_rec(&block[N * srv_sort_buf_size],\ 2899 &buf[N], b##N, INDEX, \ 2900 file->fd, foffs##N, \ 2901 &mrec##N, offsets##N, \ 2902 crypt_block ? &crypt_block[N * srv_sort_buf_size] : NULL, \ 2903 space); \ 2904 \ 2905 if (UNIV_UNLIKELY(!b##N)) { \ 2906 if (mrec##N) { \ 2907 goto corrupt; \ 2908 } \ 2909 AT_END; \ 2910 } \ 2911 } while (0) 2912 2913 #ifdef HAVE_PSI_STAGE_INTERFACE 2914 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \ 2915 do { \ 2916 if (stage != NULL) { \ 2917 stage->inc(); \ 2918 } \ 2919 ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END); \ 2920 } while (0) 2921 #else /* HAVE_PSI_STAGE_INTERFACE */ 2922 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \ 2923 ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END) 2924 #endif /* HAVE_PSI_STAGE_INTERFACE */ 2925 2926 /** Merge two blocks of records on disk and write a bigger block. 2927 @param[in] dup descriptor of index being created 2928 @param[in] file file containing index entries 2929 @param[in,out] block 3 buffers 2930 @param[in,out] foffs0 offset of first source list in the file 2931 @param[in,out] foffs1 offset of second source list in the file 2932 @param[in,out] of output file 2933 @param[in,out] stage performance schema accounting object, used by 2934 ALTER TABLE. If not NULL stage->inc() will be called for each record 2935 processed. 2936 @param[in,out] crypt_block encryption buffer 2937 @param[in] space tablespace ID for encryption 2938 @return DB_SUCCESS or error code */ 2939 static MY_ATTRIBUTE((warn_unused_result)) 2940 dberr_t 2941 row_merge_blocks( 2942 const row_merge_dup_t* dup, 2943 const merge_file_t* file, 2944 row_merge_block_t* block, 2945 ulint* foffs0, 2946 ulint* foffs1, 2947 merge_file_t* of, 2948 ut_stage_alter_t* stage MY_ATTRIBUTE((unused)), 2949 row_merge_block_t* crypt_block, 2950 ulint space) 2951 { 2952 mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */ 2953 2954 mrec_buf_t* buf; /*!< buffer for handling 2955 split mrec in block[] */ 2956 const byte* b0; /*!< pointer to block[0] */ 2957 const byte* b1; /*!< pointer to block[srv_sort_buf_size] */ 2958 byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */ 2959 const mrec_t* mrec0; /*!< merge rec, points to block[0] or buf[0] */ 2960 const mrec_t* mrec1; /*!< merge rec, points to 2961 block[srv_sort_buf_size] or buf[1] */ 2962 rec_offs* offsets0;/* offsets of mrec0 */ 2963 rec_offs* offsets1;/* offsets of mrec1 */ 2964 2965 DBUG_ENTER("row_merge_blocks"); 2966 DBUG_LOG("ib_merge_sort", 2967 "fd=" << file->fd << ',' << *foffs0 << '+' << *foffs1 2968 << " to fd=" << of->fd << ',' << of->offset); 2969 2970 heap = row_merge_heap_create(dup->index, &buf, &offsets0, &offsets1); 2971 2972 /* Write a record and read the next record. Split the output 2973 file in two halves, which can be merged on the following pass. */ 2974 2975 if (!row_merge_read(file->fd, *foffs0, &block[0], 2976 crypt_block ? &crypt_block[0] : NULL, 2977 space) || 2978 !row_merge_read(file->fd, *foffs1, &block[srv_sort_buf_size], 2979 crypt_block ? &crypt_block[srv_sort_buf_size] : NULL, 2980 space)) { 2981 corrupt: 2982 mem_heap_free(heap); 2983 DBUG_RETURN(DB_CORRUPTION); 2984 } 2985 2986 b0 = &block[0]; 2987 b1 = &block[srv_sort_buf_size]; 2988 b2 = &block[2 * srv_sort_buf_size]; 2989 2990 b0 = row_merge_read_rec( 2991 &block[0], &buf[0], b0, dup->index, 2992 file->fd, foffs0, &mrec0, offsets0, 2993 crypt_block ? &crypt_block[0] : NULL, 2994 space); 2995 2996 b1 = row_merge_read_rec( 2997 &block[srv_sort_buf_size], 2998 &buf[srv_sort_buf_size], b1, dup->index, 2999 file->fd, foffs1, &mrec1, offsets1, 3000 crypt_block ? &crypt_block[srv_sort_buf_size] : NULL, 3001 space); 3002 3003 if (UNIV_UNLIKELY(!b0 && mrec0) 3004 || UNIV_UNLIKELY(!b1 && mrec1)) { 3005 3006 goto corrupt; 3007 } 3008 3009 while (mrec0 && mrec1) { 3010 int cmp = cmp_rec_rec_simple( 3011 mrec0, mrec1, offsets0, offsets1, 3012 dup->index, dup->table); 3013 if (cmp < 0) { 3014 ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto merged); 3015 } else if (cmp) { 3016 ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto merged); 3017 } else { 3018 mem_heap_free(heap); 3019 DBUG_RETURN(DB_DUPLICATE_KEY); 3020 } 3021 } 3022 3023 merged: 3024 if (mrec0) { 3025 /* append all mrec0 to output */ 3026 for (;;) { 3027 ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto done0); 3028 } 3029 } 3030 done0: 3031 if (mrec1) { 3032 /* append all mrec1 to output */ 3033 for (;;) { 3034 ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto done1); 3035 } 3036 } 3037 done1: 3038 3039 mem_heap_free(heap); 3040 3041 b2 = row_merge_write_eof( 3042 &block[2 * srv_sort_buf_size], 3043 b2, of->fd, &of->offset, 3044 crypt_block ? &crypt_block[2 * srv_sort_buf_size] : NULL, 3045 space); 3046 DBUG_RETURN(b2 ? DB_SUCCESS : DB_CORRUPTION); 3047 } 3048 3049 /** Copy a block of index entries. 3050 @param[in] index index being created 3051 @param[in] file input file 3052 @param[in,out] block 3 buffers 3053 @param[in,out] foffs0 input file offset 3054 @param[in,out] of output file 3055 @param[in,out] stage performance schema accounting object, used by 3056 ALTER TABLE. If not NULL stage->inc() will be called for each record 3057 processed. 3058 @param[in,out] crypt_block encryption buffer 3059 @param[in] space tablespace ID for encryption 3060 @return TRUE on success, FALSE on failure */ 3061 static MY_ATTRIBUTE((warn_unused_result)) 3062 ibool 3063 row_merge_blocks_copy( 3064 const dict_index_t* index, 3065 const merge_file_t* file, 3066 row_merge_block_t* block, 3067 ulint* foffs0, 3068 merge_file_t* of, 3069 ut_stage_alter_t* stage MY_ATTRIBUTE((unused)), 3070 row_merge_block_t* crypt_block, 3071 ulint space) 3072 { 3073 mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */ 3074 3075 mrec_buf_t* buf; /*!< buffer for handling 3076 split mrec in block[] */ 3077 const byte* b0; /*!< pointer to block[0] */ 3078 byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */ 3079 const mrec_t* mrec0; /*!< merge rec, points to block[0] */ 3080 rec_offs* offsets0;/* offsets of mrec0 */ 3081 rec_offs* offsets1;/* dummy offsets */ 3082 3083 DBUG_ENTER("row_merge_blocks_copy"); 3084 DBUG_LOG("ib_merge_sort", 3085 "fd=" << file->fd << ',' << foffs0 3086 << " to fd=" << of->fd << ',' << of->offset); 3087 3088 heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1); 3089 3090 /* Write a record and read the next record. Split the output 3091 file in two halves, which can be merged on the following pass. */ 3092 3093 if (!row_merge_read(file->fd, *foffs0, &block[0], 3094 crypt_block ? &crypt_block[0] : NULL, 3095 space)) { 3096 corrupt: 3097 mem_heap_free(heap); 3098 DBUG_RETURN(FALSE); 3099 } 3100 3101 b0 = &block[0]; 3102 3103 b2 = &block[2 * srv_sort_buf_size]; 3104 3105 b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, 3106 file->fd, foffs0, &mrec0, offsets0, 3107 crypt_block ? &crypt_block[0] : NULL, 3108 space); 3109 3110 if (UNIV_UNLIKELY(!b0 && mrec0)) { 3111 3112 goto corrupt; 3113 } 3114 3115 if (mrec0) { 3116 /* append all mrec0 to output */ 3117 for (;;) { 3118 ROW_MERGE_WRITE_GET_NEXT(0, index, goto done0); 3119 } 3120 } 3121 done0: 3122 3123 /* The file offset points to the beginning of the last page 3124 that has been read. Update it to point to the next block. */ 3125 (*foffs0)++; 3126 3127 mem_heap_free(heap); 3128 3129 DBUG_RETURN(row_merge_write_eof( 3130 &block[2 * srv_sort_buf_size], 3131 b2, of->fd, &of->offset, 3132 crypt_block 3133 ? &crypt_block[2 * srv_sort_buf_size] 3134 : NULL, space) 3135 != NULL); 3136 } 3137 3138 /** Merge disk files. 3139 @param[in] trx transaction 3140 @param[in] dup descriptor of index being created 3141 @param[in,out] file file containing index entries 3142 @param[in,out] block 3 buffers 3143 @param[in,out] tmpfd temporary file handle 3144 @param[in,out] num_run Number of runs that remain to be merged 3145 @param[in,out] run_offset Array that contains the first offset number 3146 for each merge run 3147 @param[in,out] stage performance schema accounting object, used by 3148 @param[in,out] crypt_block encryption buffer 3149 @param[in] space tablespace ID for encryption 3150 ALTER TABLE. If not NULL stage->inc() will be called for each record 3151 processed. 3152 @return DB_SUCCESS or error code */ 3153 static 3154 dberr_t 3155 row_merge( 3156 trx_t* trx, 3157 const row_merge_dup_t* dup, 3158 merge_file_t* file, 3159 row_merge_block_t* block, 3160 pfs_os_file_t* tmpfd, 3161 ulint* num_run, 3162 ulint* run_offset, 3163 ut_stage_alter_t* stage, 3164 row_merge_block_t* crypt_block, 3165 ulint space) 3166 { 3167 ulint foffs0; /*!< first input offset */ 3168 ulint foffs1; /*!< second input offset */ 3169 dberr_t error; /*!< error code */ 3170 merge_file_t of; /*!< output file */ 3171 const ulint ihalf = run_offset[*num_run / 2]; 3172 /*!< half the input file */ 3173 ulint n_run = 0; 3174 /*!< num of runs generated from this merge */ 3175 3176 MEM_CHECK_ADDRESSABLE(&block[0], 3 * srv_sort_buf_size); 3177 3178 if (crypt_block) { 3179 MEM_CHECK_ADDRESSABLE(&crypt_block[0], 3 * srv_sort_buf_size); 3180 } 3181 3182 ut_ad(ihalf < file->offset); 3183 3184 of.fd = *tmpfd; 3185 of.offset = 0; 3186 of.n_rec = 0; 3187 3188 #ifdef POSIX_FADV_SEQUENTIAL 3189 /* The input file will be read sequentially, starting from the 3190 beginning and the middle. In Linux, the POSIX_FADV_SEQUENTIAL 3191 affects the entire file. Each block will be read exactly once. */ 3192 posix_fadvise(file->fd, 0, 0, 3193 POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE); 3194 #endif /* POSIX_FADV_SEQUENTIAL */ 3195 3196 /* Merge blocks to the output file. */ 3197 foffs0 = 0; 3198 foffs1 = ihalf; 3199 3200 MEM_UNDEFINED(run_offset, *num_run * sizeof *run_offset); 3201 3202 for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) { 3203 3204 if (trx_is_interrupted(trx)) { 3205 return(DB_INTERRUPTED); 3206 } 3207 3208 /* Remember the offset number for this run */ 3209 run_offset[n_run++] = of.offset; 3210 3211 error = row_merge_blocks(dup, file, block, 3212 &foffs0, &foffs1, &of, stage, 3213 crypt_block, space); 3214 3215 if (error != DB_SUCCESS) { 3216 return(error); 3217 } 3218 3219 } 3220 3221 /* Copy the last blocks, if there are any. */ 3222 3223 while (foffs0 < ihalf) { 3224 3225 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) { 3226 return(DB_INTERRUPTED); 3227 } 3228 3229 /* Remember the offset number for this run */ 3230 run_offset[n_run++] = of.offset; 3231 3232 if (!row_merge_blocks_copy(dup->index, file, block, 3233 &foffs0, &of, stage, 3234 crypt_block, space)) { 3235 return(DB_CORRUPTION); 3236 } 3237 } 3238 3239 ut_ad(foffs0 == ihalf); 3240 3241 while (foffs1 < file->offset) { 3242 3243 if (trx_is_interrupted(trx)) { 3244 return(DB_INTERRUPTED); 3245 } 3246 3247 /* Remember the offset number for this run */ 3248 run_offset[n_run++] = of.offset; 3249 3250 if (!row_merge_blocks_copy(dup->index, file, block, 3251 &foffs1, &of, stage, 3252 crypt_block, space)) { 3253 return(DB_CORRUPTION); 3254 } 3255 } 3256 3257 ut_ad(foffs1 == file->offset); 3258 3259 if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) { 3260 return(DB_CORRUPTION); 3261 } 3262 3263 ut_ad(n_run <= *num_run); 3264 3265 *num_run = n_run; 3266 3267 /* Each run can contain one or more offsets. As merge goes on, 3268 the number of runs (to merge) will reduce until we have one 3269 single run. So the number of runs will always be smaller than 3270 the number of offsets in file */ 3271 ut_ad((*num_run) <= file->offset); 3272 3273 /* The number of offsets in output file is always equal or 3274 smaller than input file */ 3275 ut_ad(of.offset <= file->offset); 3276 3277 /* Swap file descriptors for the next pass. */ 3278 *tmpfd = file->fd; 3279 *file = of; 3280 3281 MEM_UNDEFINED(&block[0], 3 * srv_sort_buf_size); 3282 3283 return(DB_SUCCESS); 3284 } 3285 3286 /** Merge disk files. 3287 @param[in] trx transaction 3288 @param[in] dup descriptor of index being created 3289 @param[in,out] file file containing index entries 3290 @param[in,out] block 3 buffers 3291 @param[in,out] tmpfd temporary file handle 3292 @param[in,out] stage performance schema accounting object, used by 3293 ALTER TABLE. If not NULL, stage->begin_phase_sort() will be called initially 3294 and then stage->inc() will be called for each record processed. 3295 @return DB_SUCCESS or error code */ 3296 dberr_t 3297 row_merge_sort( 3298 trx_t* trx, 3299 const row_merge_dup_t* dup, 3300 merge_file_t* file, 3301 row_merge_block_t* block, 3302 pfs_os_file_t* tmpfd, 3303 const bool update_progress, 3304 /*!< in: update progress 3305 status variable or not */ 3306 const double pct_progress, 3307 /*!< in: total progress percent 3308 until now */ 3309 const double pct_cost, /*!< in: current progress percent */ 3310 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ 3311 ulint space, /*!< in: space id */ 3312 ut_stage_alter_t* stage) 3313 { 3314 const ulint half = file->offset / 2; 3315 ulint num_runs; 3316 ulint* run_offset; 3317 dberr_t error = DB_SUCCESS; 3318 ulint merge_count = 0; 3319 ulint total_merge_sort_count; 3320 double curr_progress = 0; 3321 3322 DBUG_ENTER("row_merge_sort"); 3323 3324 /* Record the number of merge runs we need to perform */ 3325 num_runs = file->offset; 3326 3327 if (stage != NULL) { 3328 stage->begin_phase_sort(log2(num_runs)); 3329 } 3330 3331 /* If num_runs are less than 1, nothing to merge */ 3332 if (num_runs <= 1) { 3333 DBUG_RETURN(error); 3334 } 3335 3336 total_merge_sort_count = ulint(ceil(log2(double(num_runs)))); 3337 3338 /* "run_offset" records each run's first offset number */ 3339 run_offset = (ulint*) ut_malloc_nokey(file->offset * sizeof(ulint)); 3340 3341 /* This tells row_merge() where to start for the first round 3342 of merge. */ 3343 run_offset[half] = half; 3344 3345 /* The file should always contain at least one byte (the end 3346 of file marker). Thus, it must be at least one block. */ 3347 ut_ad(file->offset > 0); 3348 3349 /* These thd_progress* calls will crash on sol10-64 when innodb_plugin 3350 is used. MDEV-9356: innodb.innodb_bug53290 fails (crashes) on 3351 sol10-64 in buildbot. 3352 */ 3353 #ifndef UNIV_SOLARIS 3354 /* Progress report only for "normal" indexes. */ 3355 if (!(dup->index->type & DICT_FTS)) { 3356 thd_progress_init(trx->mysql_thd, 1); 3357 } 3358 #endif /* UNIV_SOLARIS */ 3359 3360 if (global_system_variables.log_warnings > 2) { 3361 sql_print_information("InnoDB: Online DDL : merge-sorting" 3362 " has estimated " ULINTPF " runs", 3363 num_runs); 3364 } 3365 3366 /* Merge the runs until we have one big run */ 3367 do { 3368 /* Report progress of merge sort to MySQL for 3369 show processlist progress field */ 3370 /* Progress report only for "normal" indexes. */ 3371 #ifndef UNIV_SOLARIS 3372 if (!(dup->index->type & DICT_FTS)) { 3373 thd_progress_report(trx->mysql_thd, file->offset - num_runs, file->offset); 3374 } 3375 #endif /* UNIV_SOLARIS */ 3376 3377 error = row_merge(trx, dup, file, block, tmpfd, 3378 &num_runs, run_offset, stage, 3379 crypt_block, space); 3380 3381 if(update_progress) { 3382 merge_count++; 3383 curr_progress = (merge_count >= total_merge_sort_count) ? 3384 pct_cost : 3385 ((pct_cost * merge_count) / total_merge_sort_count); 3386 /* presenting 10.12% as 1012 integer */; 3387 onlineddl_pct_progress = (ulint) ((pct_progress + curr_progress) * 100); 3388 } 3389 3390 if (error != DB_SUCCESS) { 3391 break; 3392 } 3393 3394 MEM_CHECK_DEFINED(run_offset, num_runs * sizeof *run_offset); 3395 } while (num_runs > 1); 3396 3397 ut_free(run_offset); 3398 3399 /* Progress report only for "normal" indexes. */ 3400 #ifndef UNIV_SOLARIS 3401 if (!(dup->index->type & DICT_FTS)) { 3402 thd_progress_end(trx->mysql_thd); 3403 } 3404 #endif /* UNIV_SOLARIS */ 3405 3406 DBUG_RETURN(error); 3407 } 3408 3409 /** Copy externally stored columns to the data tuple. 3410 @param[in] mrec record containing BLOB pointers, 3411 or NULL to use tuple instead 3412 @param[in] offsets offsets of mrec 3413 @param[in] zip_size compressed page size in bytes, or 0 3414 @param[in,out] tuple data tuple 3415 @param[in,out] heap memory heap */ 3416 static 3417 void 3418 row_merge_copy_blobs( 3419 const mrec_t* mrec, 3420 const rec_offs* offsets, 3421 const page_size_t& page_size, 3422 dtuple_t* tuple, 3423 mem_heap_t* heap) 3424 { 3425 ut_ad(mrec == NULL || rec_offs_any_extern(offsets)); 3426 3427 for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) { 3428 ulint len; 3429 const void* data; 3430 dfield_t* field = dtuple_get_nth_field(tuple, i); 3431 ulint field_len; 3432 const byte* field_data; 3433 3434 if (!dfield_is_ext(field)) { 3435 continue; 3436 } 3437 3438 ut_ad(!dfield_is_null(field)); 3439 3440 /* During the creation of a PRIMARY KEY, the table is 3441 X-locked, and we skip copying records that have been 3442 marked for deletion. Therefore, externally stored 3443 columns cannot possibly be freed between the time the 3444 BLOB pointers are read (row_merge_read_clustered_index()) 3445 and dereferenced (below). */ 3446 if (mrec == NULL) { 3447 field_data 3448 = static_cast<byte*>(dfield_get_data(field)); 3449 field_len = dfield_get_len(field); 3450 3451 ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE); 3452 3453 ut_a(memcmp(field_data + field_len 3454 - BTR_EXTERN_FIELD_REF_SIZE, 3455 field_ref_zero, 3456 BTR_EXTERN_FIELD_REF_SIZE)); 3457 3458 data = btr_copy_externally_stored_field( 3459 &len, field_data, page_size, field_len, heap); 3460 } else { 3461 data = btr_rec_copy_externally_stored_field( 3462 mrec, offsets, page_size, i, &len, heap); 3463 } 3464 3465 /* Because we have locked the table, any records 3466 written by incomplete transactions must have been 3467 rolled back already. There must not be any incomplete 3468 BLOB columns. */ 3469 ut_a(data); 3470 3471 dfield_set_data(field, data, len); 3472 } 3473 } 3474 3475 /** Convert a merge record to a typed data tuple. Note that externally 3476 stored fields are not copied to heap. 3477 @param[in,out] index index on the table 3478 @param[in] mtuple merge record 3479 @param[in] heap memory heap from which memory needed is allocated 3480 @return index entry built. */ 3481 static 3482 void 3483 row_merge_mtuple_to_dtuple( 3484 dict_index_t* index, 3485 dtuple_t* dtuple, 3486 const mtuple_t* mtuple) 3487 { 3488 ut_ad(!dict_index_is_ibuf(index)); 3489 3490 memcpy(dtuple->fields, mtuple->fields, 3491 dtuple->n_fields * sizeof *mtuple->fields); 3492 } 3493 3494 /** Insert sorted data tuples to the index. 3495 @param[in] index index to be inserted 3496 @param[in] old_table old table 3497 @param[in] fd file descriptor 3498 @param[in,out] block file buffer 3499 @param[in] row_buf row_buf the sorted data tuples, 3500 or NULL if fd, block will be used instead 3501 @param[in,out] btr_bulk btr bulk instance 3502 @param[in,out] stage performance schema accounting object, used by 3503 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially 3504 and then stage->inc() will be called for each record that is processed. 3505 @return DB_SUCCESS or error number */ 3506 static MY_ATTRIBUTE((warn_unused_result)) 3507 dberr_t 3508 row_merge_insert_index_tuples( 3509 dict_index_t* index, 3510 const dict_table_t* old_table, 3511 const pfs_os_file_t& fd, 3512 row_merge_block_t* block, 3513 const row_merge_buf_t* row_buf, 3514 BtrBulk* btr_bulk, 3515 const ib_uint64_t table_total_rows, /*!< in: total rows of old table */ 3516 const double pct_progress, /*!< in: total progress 3517 percent until now */ 3518 const double pct_cost, /*!< in: current progress percent 3519 */ 3520 row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ 3521 ulint space, /*!< in: space id */ 3522 ut_stage_alter_t* stage) 3523 { 3524 const byte* b; 3525 mem_heap_t* heap; 3526 mem_heap_t* tuple_heap; 3527 dberr_t error = DB_SUCCESS; 3528 ulint foffs = 0; 3529 rec_offs* offsets; 3530 mrec_buf_t* buf; 3531 ulint n_rows = 0; 3532 dtuple_t* dtuple; 3533 ib_uint64_t inserted_rows = 0; 3534 double curr_progress = 0; 3535 dict_index_t* old_index = NULL; 3536 const mrec_t* mrec = NULL; 3537 mtr_t mtr; 3538 3539 3540 DBUG_ENTER("row_merge_insert_index_tuples"); 3541 3542 ut_ad(!srv_read_only_mode); 3543 ut_ad(!(index->type & DICT_FTS)); 3544 ut_ad(!dict_index_is_spatial(index)); 3545 3546 if (stage != NULL) { 3547 stage->begin_phase_insert(); 3548 } 3549 3550 tuple_heap = mem_heap_create(1000); 3551 3552 { 3553 ulint i = 1 + REC_OFFS_HEADER_SIZE 3554 + dict_index_get_n_fields(index); 3555 heap = mem_heap_create(sizeof *buf + i * sizeof *offsets); 3556 offsets = static_cast<rec_offs*>( 3557 mem_heap_alloc(heap, i * sizeof *offsets)); 3558 rec_offs_set_n_alloc(offsets, i); 3559 rec_offs_set_n_fields(offsets, dict_index_get_n_fields(index)); 3560 } 3561 3562 if (row_buf != NULL) { 3563 ut_ad(fd == OS_FILE_CLOSED); 3564 ut_ad(block == NULL); 3565 DBUG_EXECUTE_IF("row_merge_read_failure", 3566 error = DB_CORRUPTION; 3567 goto err_exit;); 3568 buf = NULL; 3569 b = NULL; 3570 dtuple = dtuple_create( 3571 heap, dict_index_get_n_fields(index)); 3572 dtuple_set_n_fields_cmp( 3573 dtuple, dict_index_get_n_unique_in_tree(index)); 3574 } else { 3575 b = block; 3576 dtuple = NULL; 3577 3578 if (!row_merge_read(fd, foffs, block, crypt_block, space)) { 3579 error = DB_CORRUPTION; 3580 goto err_exit; 3581 } else { 3582 buf = static_cast<mrec_buf_t*>( 3583 mem_heap_alloc(heap, sizeof *buf)); 3584 } 3585 } 3586 3587 for (;;) { 3588 3589 if (stage != NULL) { 3590 stage->inc(); 3591 } 3592 3593 if (row_buf != NULL) { 3594 if (n_rows >= row_buf->n_tuples) { 3595 break; 3596 } 3597 3598 /* Convert merge tuple record from 3599 row buffer to data tuple record */ 3600 row_merge_mtuple_to_dtuple( 3601 index, dtuple, &row_buf->tuples[n_rows]); 3602 n_rows++; 3603 /* BLOB pointers must be copied from dtuple */ 3604 mrec = NULL; 3605 } else { 3606 b = row_merge_read_rec(block, buf, b, index, 3607 fd, &foffs, &mrec, offsets, 3608 crypt_block, 3609 space); 3610 3611 if (UNIV_UNLIKELY(!b)) { 3612 /* End of list, or I/O error */ 3613 if (mrec) { 3614 error = DB_CORRUPTION; 3615 } 3616 break; 3617 } 3618 3619 dtuple = row_rec_to_index_entry_low( 3620 mrec, index, offsets, tuple_heap); 3621 } 3622 3623 old_index = dict_table_get_first_index(old_table); 3624 3625 if (dict_index_is_clust(index) 3626 && dict_index_is_online_ddl(old_index)) { 3627 error = row_log_table_get_error(old_index); 3628 if (error != DB_SUCCESS) { 3629 break; 3630 } 3631 } 3632 3633 if (dict_index_is_clust(index) && dtuple_get_n_ext(dtuple)) { 3634 /* Off-page columns can be fetched safely 3635 when concurrent modifications to the table 3636 are disabled. (Purge can process delete-marked 3637 records, but row_merge_read_clustered_index() 3638 would have skipped them.) 3639 3640 When concurrent modifications are enabled, 3641 row_merge_read_clustered_index() will 3642 only see rows from transactions that were 3643 committed before the ALTER TABLE started 3644 (REPEATABLE READ). 3645 3646 Any modifications after the 3647 row_merge_read_clustered_index() scan 3648 will go through row_log_table_apply(). 3649 Any modifications to off-page columns 3650 will be tracked by 3651 row_log_table_blob_alloc() and 3652 row_log_table_blob_free(). */ 3653 row_merge_copy_blobs( 3654 mrec, offsets, 3655 dict_table_page_size(old_table), 3656 dtuple, tuple_heap); 3657 } 3658 3659 #ifdef UNIV_DEBUG 3660 static const latch_level_t latches[] = { 3661 SYNC_INDEX_TREE, /* index->lock */ 3662 SYNC_LEVEL_VARYING /* btr_bulk->m_page_bulks */ 3663 }; 3664 #endif /* UNIV_DEBUG */ 3665 3666 ut_ad(dtuple_validate(dtuple)); 3667 ut_ad(!sync_check_iterate(sync_allowed_latches(latches, 3668 latches + 2))); 3669 error = btr_bulk->insert(dtuple); 3670 3671 if (error != DB_SUCCESS) { 3672 goto err_exit; 3673 } 3674 3675 mem_heap_empty(tuple_heap); 3676 3677 /* Increment innodb_onlineddl_pct_progress status variable */ 3678 inserted_rows++; 3679 if(inserted_rows % 1000 == 0) { 3680 /* Update progress for each 1000 rows */ 3681 curr_progress = (inserted_rows >= table_total_rows || 3682 table_total_rows <= 0) ? 3683 pct_cost : 3684 ((pct_cost * inserted_rows) / table_total_rows); 3685 3686 /* presenting 10.12% as 1012 integer */; 3687 onlineddl_pct_progress = (ulint) ((pct_progress + curr_progress) * 100); 3688 } 3689 } 3690 3691 err_exit: 3692 mem_heap_free(tuple_heap); 3693 mem_heap_free(heap); 3694 3695 DBUG_RETURN(error); 3696 } 3697 3698 /*********************************************************************//** 3699 Sets an exclusive lock on a table, for the duration of creating indexes. 3700 @return error code or DB_SUCCESS */ 3701 dberr_t 3702 row_merge_lock_table( 3703 /*=================*/ 3704 trx_t* trx, /*!< in/out: transaction */ 3705 dict_table_t* table, /*!< in: table to lock */ 3706 enum lock_mode mode) /*!< in: LOCK_X or LOCK_S */ 3707 { 3708 ut_ad(!srv_read_only_mode); 3709 ut_ad(mode == LOCK_X || mode == LOCK_S); 3710 3711 trx->op_info = "setting table lock for creating or dropping index"; 3712 trx->ddl = true; 3713 3714 return(lock_table_for_trx(table, trx, mode)); 3715 } 3716 3717 /*********************************************************************//** 3718 Drop an index that was created before an error occurred. 3719 The data dictionary must have been locked exclusively by the caller, 3720 because the transaction will not be committed. */ 3721 static 3722 void 3723 row_merge_drop_index_dict( 3724 /*======================*/ 3725 trx_t* trx, /*!< in/out: dictionary transaction */ 3726 index_id_t index_id)/*!< in: index identifier */ 3727 { 3728 static const char sql[] = 3729 "PROCEDURE DROP_INDEX_PROC () IS\n" 3730 "BEGIN\n" 3731 "DELETE FROM SYS_FIELDS WHERE INDEX_ID=:indexid;\n" 3732 "DELETE FROM SYS_INDEXES WHERE ID=:indexid;\n" 3733 "END;\n"; 3734 dberr_t error; 3735 pars_info_t* info; 3736 3737 ut_ad(!srv_read_only_mode); 3738 ut_ad(mutex_own(&dict_sys->mutex)); 3739 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH); 3740 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX); 3741 ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X)); 3742 3743 info = pars_info_create(); 3744 pars_info_add_ull_literal(info, "indexid", index_id); 3745 trx->op_info = "dropping index from dictionary"; 3746 error = que_eval_sql(info, sql, FALSE, trx); 3747 3748 if (error != DB_SUCCESS) { 3749 /* Even though we ensure that DDL transactions are WAIT 3750 and DEADLOCK free, we could encounter other errors e.g., 3751 DB_TOO_MANY_CONCURRENT_TRXS. */ 3752 trx->error_state = DB_SUCCESS; 3753 3754 ib::error() << "row_merge_drop_index_dict failed with error " 3755 << error; 3756 } 3757 3758 trx->op_info = ""; 3759 } 3760 3761 /*********************************************************************//** 3762 Drop indexes that were created before an error occurred. 3763 The data dictionary must have been locked exclusively by the caller, 3764 because the transaction will not be committed. */ 3765 void 3766 row_merge_drop_indexes_dict( 3767 /*========================*/ 3768 trx_t* trx, /*!< in/out: dictionary transaction */ 3769 table_id_t table_id)/*!< in: table identifier */ 3770 { 3771 static const char sql[] = 3772 "PROCEDURE DROP_INDEXES_PROC () IS\n" 3773 "ixid CHAR;\n" 3774 "found INT;\n" 3775 3776 "DECLARE CURSOR index_cur IS\n" 3777 " SELECT ID FROM SYS_INDEXES\n" 3778 " WHERE TABLE_ID=:tableid AND\n" 3779 " SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n" 3780 "FOR UPDATE;\n" 3781 3782 "BEGIN\n" 3783 "found := 1;\n" 3784 "OPEN index_cur;\n" 3785 "WHILE found = 1 LOOP\n" 3786 " FETCH index_cur INTO ixid;\n" 3787 " IF (SQL % NOTFOUND) THEN\n" 3788 " found := 0;\n" 3789 " ELSE\n" 3790 " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n" 3791 " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n" 3792 " END IF;\n" 3793 "END LOOP;\n" 3794 "CLOSE index_cur;\n" 3795 3796 "END;\n"; 3797 dberr_t error; 3798 pars_info_t* info; 3799 3800 ut_ad(!srv_read_only_mode); 3801 ut_ad(mutex_own(&dict_sys->mutex)); 3802 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH); 3803 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX); 3804 ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X)); 3805 3806 /* It is possible that table->n_ref_count > 1 when 3807 locked=TRUE. In this case, all code that should have an open 3808 handle to the table be waiting for the next statement to execute, 3809 or waiting for a meta-data lock. 3810 3811 A concurrent purge will be prevented by dict_operation_lock. */ 3812 3813 info = pars_info_create(); 3814 pars_info_add_ull_literal(info, "tableid", table_id); 3815 trx->op_info = "dropping indexes"; 3816 error = que_eval_sql(info, sql, FALSE, trx); 3817 3818 switch (error) { 3819 case DB_SUCCESS: 3820 break; 3821 default: 3822 /* Even though we ensure that DDL transactions are WAIT 3823 and DEADLOCK free, we could encounter other errors e.g., 3824 DB_TOO_MANY_CONCURRENT_TRXS. */ 3825 ib::error() << "row_merge_drop_indexes_dict failed with error " 3826 << error; 3827 /* fall through */ 3828 case DB_TOO_MANY_CONCURRENT_TRXS: 3829 trx->error_state = DB_SUCCESS; 3830 } 3831 3832 trx->op_info = ""; 3833 } 3834 3835 /** Drop indexes that were created before an error occurred. 3836 The data dictionary must have been locked exclusively by the caller, 3837 because the transaction will not be committed. 3838 @param trx dictionary transaction 3839 @param table table containing the indexes 3840 @param locked True if table is locked, 3841 false - may need to do lazy drop 3842 @param alter_trx Alter table transaction */ 3843 void 3844 row_merge_drop_indexes( 3845 trx_t* trx, 3846 dict_table_t* table, 3847 bool locked, 3848 const trx_t* alter_trx) 3849 { 3850 dict_index_t* index; 3851 dict_index_t* next_index; 3852 3853 ut_ad(!srv_read_only_mode); 3854 ut_ad(mutex_own(&dict_sys->mutex)); 3855 ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH); 3856 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX); 3857 ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_X)); 3858 3859 index = dict_table_get_first_index(table); 3860 ut_ad(dict_index_is_clust(index)); 3861 ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE); 3862 3863 /* the caller should have an open handle to the table */ 3864 ut_ad(table->get_ref_count() >= 1); 3865 3866 /* It is possible that table->n_ref_count > 1 when 3867 locked=TRUE. In this case, all code that should have an open 3868 handle to the table be waiting for the next statement to execute, 3869 or waiting for a meta-data lock. 3870 3871 A concurrent purge will be prevented by dict_operation_lock. */ 3872 3873 if (!locked && (table->get_ref_count() > 1 3874 || table->has_lock_other_than(alter_trx))) { 3875 /* We will have to drop the indexes later, when the 3876 table is guaranteed to be no longer in use. Mark the 3877 indexes as incomplete and corrupted, so that other 3878 threads will stop using them. Let dict_table_close() 3879 or crash recovery or the next invocation of 3880 prepare_inplace_alter_table() take care of dropping 3881 the indexes. */ 3882 3883 while ((index = dict_table_get_next_index(index)) != NULL) { 3884 ut_ad(!dict_index_is_clust(index)); 3885 3886 switch (dict_index_get_online_status(index)) { 3887 case ONLINE_INDEX_ABORTED_DROPPED: 3888 continue; 3889 case ONLINE_INDEX_COMPLETE: 3890 if (index->is_committed()) { 3891 /* Do nothing to already 3892 published indexes. */ 3893 } else if (index->type & DICT_FTS) { 3894 /* Drop a completed FULLTEXT 3895 index, due to a timeout during 3896 MDL upgrade for 3897 commit_inplace_alter_table(). 3898 Because only concurrent reads 3899 are allowed (and they are not 3900 seeing this index yet) we 3901 are safe to drop the index. */ 3902 dict_index_t* prev = UT_LIST_GET_PREV( 3903 indexes, index); 3904 /* At least there should be 3905 the clustered index before 3906 this one. */ 3907 ut_ad(prev); 3908 ut_a(table->fts); 3909 fts_drop_index(table, index, trx); 3910 row_merge_drop_index_dict( 3911 trx, index->id); 3912 /* We can remove a DICT_FTS 3913 index from the cache, because 3914 we do not allow ADD FULLTEXT INDEX 3915 with LOCK=NONE. If we allowed that, 3916 we should exclude FTS entries from 3917 prebuilt->ins_node->entry_list 3918 in ins_node_create_entry_list(). */ 3919 #ifdef BTR_CUR_HASH_ADAPT 3920 ut_ad(!index->search_info->ref_count); 3921 #endif /* BTR_CUR_HASH_ADAPT */ 3922 dict_index_remove_from_cache( 3923 table, index); 3924 index = prev; 3925 } else { 3926 rw_lock_x_lock( 3927 dict_index_get_lock(index)); 3928 dict_index_set_online_status( 3929 index, ONLINE_INDEX_ABORTED); 3930 index->type |= DICT_CORRUPT; 3931 table->drop_aborted = TRUE; 3932 goto drop_aborted; 3933 } 3934 continue; 3935 case ONLINE_INDEX_CREATION: 3936 rw_lock_x_lock(dict_index_get_lock(index)); 3937 ut_ad(!index->is_committed()); 3938 row_log_abort_sec(index); 3939 drop_aborted: 3940 rw_lock_x_unlock(dict_index_get_lock(index)); 3941 3942 DEBUG_SYNC_C("merge_drop_index_after_abort"); 3943 /* covered by dict_sys->mutex */ 3944 MONITOR_INC(MONITOR_BACKGROUND_DROP_INDEX); 3945 /* fall through */ 3946 case ONLINE_INDEX_ABORTED: 3947 /* Drop the index tree from the 3948 data dictionary and free it from 3949 the tablespace, but keep the object 3950 in the data dictionary cache. */ 3951 row_merge_drop_index_dict(trx, index->id); 3952 rw_lock_x_lock(dict_index_get_lock(index)); 3953 dict_index_set_online_status( 3954 index, ONLINE_INDEX_ABORTED_DROPPED); 3955 rw_lock_x_unlock(dict_index_get_lock(index)); 3956 table->drop_aborted = TRUE; 3957 continue; 3958 } 3959 ut_error; 3960 } 3961 3962 fts_clear_all(table, trx); 3963 return; 3964 } 3965 3966 row_merge_drop_indexes_dict(trx, table->id); 3967 3968 /* Invalidate all row_prebuilt_t::ins_graph that are referring 3969 to this table. That is, force row_get_prebuilt_insert_row() to 3970 rebuild prebuilt->ins_node->entry_list). */ 3971 ut_ad(table->def_trx_id <= trx->id); 3972 table->def_trx_id = trx->id; 3973 3974 next_index = dict_table_get_next_index(index); 3975 3976 while ((index = next_index) != NULL) { 3977 /* read the next pointer before freeing the index */ 3978 next_index = dict_table_get_next_index(index); 3979 3980 ut_ad(!dict_index_is_clust(index)); 3981 3982 if (!index->is_committed()) { 3983 /* If it is FTS index, drop from table->fts 3984 and also drop its auxiliary tables */ 3985 if (index->type & DICT_FTS) { 3986 ut_a(table->fts); 3987 fts_drop_index(table, index, trx); 3988 } 3989 3990 switch (dict_index_get_online_status(index)) { 3991 case ONLINE_INDEX_CREATION: 3992 /* This state should only be possible 3993 when prepare_inplace_alter_table() fails 3994 after invoking row_merge_create_index(). 3995 In inplace_alter_table(), 3996 row_merge_build_indexes() 3997 should never leave the index in this state. 3998 It would invoke row_log_abort_sec() on 3999 failure. */ 4000 case ONLINE_INDEX_COMPLETE: 4001 /* In these cases, we are able to drop 4002 the index straight. The DROP INDEX was 4003 never deferred. */ 4004 break; 4005 case ONLINE_INDEX_ABORTED: 4006 case ONLINE_INDEX_ABORTED_DROPPED: 4007 /* covered by dict_sys->mutex */ 4008 MONITOR_DEC(MONITOR_BACKGROUND_DROP_INDEX); 4009 } 4010 4011 dict_index_remove_from_cache(table, index); 4012 } 4013 } 4014 4015 fts_clear_all(table, trx); 4016 table->drop_aborted = FALSE; 4017 ut_d(dict_table_check_for_dup_indexes(table, CHECK_ALL_COMPLETE)); 4018 } 4019 4020 /*********************************************************************//** 4021 Drop all partially created indexes during crash recovery. */ 4022 void 4023 row_merge_drop_temp_indexes(void) 4024 /*=============================*/ 4025 { 4026 static const char sql[] = 4027 "PROCEDURE DROP_TEMP_INDEXES_PROC () IS\n" 4028 "ixid CHAR;\n" 4029 "found INT;\n" 4030 4031 "DECLARE CURSOR index_cur IS\n" 4032 " SELECT ID FROM SYS_INDEXES\n" 4033 " WHERE SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n" 4034 "FOR UPDATE;\n" 4035 4036 "BEGIN\n" 4037 "found := 1;\n" 4038 "OPEN index_cur;\n" 4039 "WHILE found = 1 LOOP\n" 4040 " FETCH index_cur INTO ixid;\n" 4041 " IF (SQL % NOTFOUND) THEN\n" 4042 " found := 0;\n" 4043 " ELSE\n" 4044 " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n" 4045 " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n" 4046 " END IF;\n" 4047 "END LOOP;\n" 4048 "CLOSE index_cur;\n" 4049 "END;\n"; 4050 trx_t* trx; 4051 dberr_t error; 4052 4053 /* Load the table definitions that contain partially defined 4054 indexes, so that the data dictionary information can be checked 4055 when accessing the tablename.ibd files. */ 4056 trx = trx_create(); 4057 trx->op_info = "dropping partially created indexes"; 4058 row_mysql_lock_data_dictionary(trx); 4059 /* Ensure that this transaction will be rolled back and locks 4060 will be released, if the server gets killed before the commit 4061 gets written to the redo log. */ 4062 trx_set_dict_operation(trx, TRX_DICT_OP_INDEX); 4063 4064 trx->op_info = "dropping indexes"; 4065 error = que_eval_sql(NULL, sql, FALSE, trx); 4066 4067 if (error != DB_SUCCESS) { 4068 /* Even though we ensure that DDL transactions are WAIT 4069 and DEADLOCK free, we could encounter other errors e.g., 4070 DB_TOO_MANY_CONCURRENT_TRXS. */ 4071 trx->error_state = DB_SUCCESS; 4072 4073 ib::error() << "row_merge_drop_temp_indexes failed with error" 4074 << error; 4075 } 4076 4077 trx_commit_for_mysql(trx); 4078 row_mysql_unlock_data_dictionary(trx); 4079 trx->free(); 4080 } 4081 4082 4083 /** Create temporary merge files in the given paramater path, and if 4084 UNIV_PFS_IO defined, register the file descriptor with Performance Schema. 4085 @param[in] path location for creating temporary merge files, or NULL 4086 @return File descriptor */ 4087 pfs_os_file_t 4088 row_merge_file_create_low( 4089 const char* path) 4090 { 4091 #ifdef UNIV_PFS_IO 4092 /* This temp file open does not go through normal 4093 file APIs, add instrumentation to register with 4094 performance schema */ 4095 struct PSI_file_locker* locker; 4096 PSI_file_locker_state state; 4097 if (!path) { 4098 path = mysql_tmpdir; 4099 } 4100 static const char label[] = "/Innodb Merge Temp File"; 4101 char* name = static_cast<char*>( 4102 ut_malloc_nokey(strlen(path) + sizeof label)); 4103 strcpy(name, path); 4104 strcat(name, label); 4105 4106 register_pfs_file_open_begin( 4107 &state, locker, innodb_temp_file_key, 4108 PSI_FILE_CREATE, path ? name : label, __FILE__, __LINE__); 4109 4110 #endif 4111 pfs_os_file_t fd = innobase_mysql_tmpfile(path); 4112 #ifdef UNIV_PFS_IO 4113 register_pfs_file_open_end(locker, fd, 4114 (fd == OS_FILE_CLOSED)?NULL:&fd); 4115 ut_free(name); 4116 #endif 4117 4118 if (fd == OS_FILE_CLOSED) { 4119 ib::error() << "Cannot create temporary merge file"; 4120 } 4121 return(fd); 4122 } 4123 4124 4125 /** Create a merge file in the given location. 4126 @param[out] merge_file merge file structure 4127 @param[in] path location for creating temporary file, or NULL 4128 @return file descriptor, or OS_FILE_CLOSED on error */ 4129 pfs_os_file_t 4130 row_merge_file_create( 4131 merge_file_t* merge_file, 4132 const char* path) 4133 { 4134 merge_file->fd = row_merge_file_create_low(path); 4135 merge_file->offset = 0; 4136 merge_file->n_rec = 0; 4137 4138 if (merge_file->fd != OS_FILE_CLOSED) { 4139 if (srv_disable_sort_file_cache) { 4140 os_file_set_nocache(merge_file->fd, 4141 "row0merge.cc", "sort"); 4142 } 4143 } 4144 return(merge_file->fd); 4145 } 4146 4147 /*********************************************************************//** 4148 Destroy a merge file. And de-register the file from Performance Schema 4149 if UNIV_PFS_IO is defined. */ 4150 void 4151 row_merge_file_destroy_low( 4152 /*=======================*/ 4153 const pfs_os_file_t& fd) /*!< in: merge file descriptor */ 4154 { 4155 if (fd != OS_FILE_CLOSED) { 4156 os_file_close(fd); 4157 } 4158 } 4159 /*********************************************************************//** 4160 Destroy a merge file. */ 4161 void 4162 row_merge_file_destroy( 4163 /*===================*/ 4164 merge_file_t* merge_file) /*!< in/out: merge file structure */ 4165 { 4166 ut_ad(!srv_read_only_mode); 4167 4168 if (merge_file->fd != OS_FILE_CLOSED) { 4169 row_merge_file_destroy_low(merge_file->fd); 4170 merge_file->fd = OS_FILE_CLOSED; 4171 } 4172 } 4173 4174 /*********************************************************************//** 4175 Rename an index in the dictionary that was created. The data 4176 dictionary must have been locked exclusively by the caller, because 4177 the transaction will not be committed. 4178 @return DB_SUCCESS if all OK */ 4179 dberr_t 4180 row_merge_rename_index_to_add( 4181 /*==========================*/ 4182 trx_t* trx, /*!< in/out: transaction */ 4183 table_id_t table_id, /*!< in: table identifier */ 4184 index_id_t index_id) /*!< in: index identifier */ 4185 { 4186 dberr_t err = DB_SUCCESS; 4187 pars_info_t* info = pars_info_create(); 4188 4189 /* We use the private SQL parser of Innobase to generate the 4190 query graphs needed in renaming indexes. */ 4191 4192 static const char rename_index[] = 4193 "PROCEDURE RENAME_INDEX_PROC () IS\n" 4194 "BEGIN\n" 4195 "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n" 4196 "WHERE TABLE_ID = :tableid AND ID = :indexid;\n" 4197 "END;\n"; 4198 4199 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH); 4200 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX); 4201 4202 trx->op_info = "renaming index to add"; 4203 4204 pars_info_add_ull_literal(info, "tableid", table_id); 4205 pars_info_add_ull_literal(info, "indexid", index_id); 4206 4207 err = que_eval_sql(info, rename_index, FALSE, trx); 4208 4209 if (err != DB_SUCCESS) { 4210 /* Even though we ensure that DDL transactions are WAIT 4211 and DEADLOCK free, we could encounter other errors e.g., 4212 DB_TOO_MANY_CONCURRENT_TRXS. */ 4213 trx->error_state = DB_SUCCESS; 4214 4215 ib::error() << "row_merge_rename_index_to_add failed with" 4216 " error " << err; 4217 } 4218 4219 trx->op_info = ""; 4220 4221 return(err); 4222 } 4223 4224 /*********************************************************************//** 4225 Rename an index in the dictionary that is to be dropped. The data 4226 dictionary must have been locked exclusively by the caller, because 4227 the transaction will not be committed. 4228 @return DB_SUCCESS if all OK */ 4229 dberr_t 4230 row_merge_rename_index_to_drop( 4231 /*===========================*/ 4232 trx_t* trx, /*!< in/out: transaction */ 4233 table_id_t table_id, /*!< in: table identifier */ 4234 index_id_t index_id) /*!< in: index identifier */ 4235 { 4236 dberr_t err; 4237 pars_info_t* info = pars_info_create(); 4238 4239 ut_ad(!srv_read_only_mode); 4240 4241 /* We use the private SQL parser of Innobase to generate the 4242 query graphs needed in renaming indexes. */ 4243 4244 static const char rename_index[] = 4245 "PROCEDURE RENAME_INDEX_PROC () IS\n" 4246 "BEGIN\n" 4247 "UPDATE SYS_INDEXES SET NAME=CONCAT('" 4248 TEMP_INDEX_PREFIX_STR "',NAME)\n" 4249 "WHERE TABLE_ID = :tableid AND ID = :indexid;\n" 4250 "END;\n"; 4251 4252 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH); 4253 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX); 4254 4255 trx->op_info = "renaming index to drop"; 4256 4257 pars_info_add_ull_literal(info, "tableid", table_id); 4258 pars_info_add_ull_literal(info, "indexid", index_id); 4259 4260 err = que_eval_sql(info, rename_index, FALSE, trx); 4261 4262 if (err != DB_SUCCESS) { 4263 /* Even though we ensure that DDL transactions are WAIT 4264 and DEADLOCK free, we could encounter other errors e.g., 4265 DB_TOO_MANY_CONCURRENT_TRXS. */ 4266 trx->error_state = DB_SUCCESS; 4267 4268 ib::error() << "row_merge_rename_index_to_drop failed with" 4269 " error " << err; 4270 } 4271 4272 trx->op_info = ""; 4273 4274 return(err); 4275 } 4276 4277 /*********************************************************************//** 4278 Provide a new pathname for a table that is being renamed if it belongs to 4279 a file-per-table tablespace. The caller is responsible for freeing the 4280 memory allocated for the return value. 4281 @return new pathname of tablespace file, or NULL if space = 0 */ 4282 static 4283 char* 4284 row_make_new_pathname( 4285 /*==================*/ 4286 dict_table_t* table, /*!< in: table to be renamed */ 4287 const char* new_name) /*!< in: new name */ 4288 { 4289 ut_ad(!is_system_tablespace(table->space_id)); 4290 return os_file_make_new_pathname(table->space->chain.start->name, 4291 new_name); 4292 } 4293 4294 /*********************************************************************//** 4295 Rename the tables in the data dictionary. The data dictionary must 4296 have been locked exclusively by the caller, because the transaction 4297 will not be committed. 4298 @return error code or DB_SUCCESS */ 4299 dberr_t 4300 row_merge_rename_tables_dict( 4301 /*=========================*/ 4302 dict_table_t* old_table, /*!< in/out: old table, renamed to 4303 tmp_name */ 4304 dict_table_t* new_table, /*!< in/out: new table, renamed to 4305 old_table->name */ 4306 const char* tmp_name, /*!< in: new name for old_table */ 4307 trx_t* trx) /*!< in/out: dictionary transaction */ 4308 { 4309 dberr_t err = DB_ERROR; 4310 pars_info_t* info; 4311 4312 ut_ad(!srv_read_only_mode); 4313 ut_ad(old_table != new_table); 4314 ut_ad(mutex_own(&dict_sys->mutex)); 4315 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH); 4316 ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_TABLE 4317 || trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX); 4318 4319 trx->op_info = "renaming tables"; 4320 4321 /* We use the private SQL parser of Innobase to generate the query 4322 graphs needed in updating the dictionary data in system tables. */ 4323 4324 info = pars_info_create(); 4325 4326 pars_info_add_str_literal(info, "new_name", new_table->name.m_name); 4327 pars_info_add_str_literal(info, "old_name", old_table->name.m_name); 4328 pars_info_add_str_literal(info, "tmp_name", tmp_name); 4329 4330 err = que_eval_sql(info, 4331 "PROCEDURE RENAME_TABLES () IS\n" 4332 "BEGIN\n" 4333 "UPDATE SYS_TABLES SET NAME = :tmp_name\n" 4334 " WHERE NAME = :old_name;\n" 4335 "UPDATE SYS_TABLES SET NAME = :old_name\n" 4336 " WHERE NAME = :new_name;\n" 4337 "END;\n", FALSE, trx); 4338 4339 /* Update SYS_TABLESPACES and SYS_DATAFILES if the old table being 4340 renamed is a single-table tablespace, which must be implicitly 4341 renamed along with the table. */ 4342 if (err == DB_SUCCESS 4343 && old_table->space_id) { 4344 /* Make pathname to update SYS_DATAFILES. */ 4345 char* tmp_path = row_make_new_pathname(old_table, tmp_name); 4346 4347 info = pars_info_create(); 4348 4349 pars_info_add_str_literal(info, "tmp_name", tmp_name); 4350 pars_info_add_str_literal(info, "tmp_path", tmp_path); 4351 pars_info_add_int4_literal(info, "old_space", 4352 old_table->space_id); 4353 4354 err = que_eval_sql(info, 4355 "PROCEDURE RENAME_OLD_SPACE () IS\n" 4356 "BEGIN\n" 4357 "UPDATE SYS_TABLESPACES" 4358 " SET NAME = :tmp_name\n" 4359 " WHERE SPACE = :old_space;\n" 4360 "UPDATE SYS_DATAFILES" 4361 " SET PATH = :tmp_path\n" 4362 " WHERE SPACE = :old_space;\n" 4363 "END;\n", FALSE, trx); 4364 4365 ut_free(tmp_path); 4366 } 4367 4368 /* Update SYS_TABLESPACES and SYS_DATAFILES if the new table being 4369 renamed is a single-table tablespace, which must be implicitly 4370 renamed along with the table. */ 4371 if (err == DB_SUCCESS 4372 && dict_table_is_file_per_table(new_table)) { 4373 /* Make pathname to update SYS_DATAFILES. */ 4374 char* old_path = row_make_new_pathname( 4375 new_table, old_table->name.m_name); 4376 4377 info = pars_info_create(); 4378 4379 pars_info_add_str_literal(info, "old_name", 4380 old_table->name.m_name); 4381 pars_info_add_str_literal(info, "old_path", old_path); 4382 pars_info_add_int4_literal(info, "new_space", 4383 new_table->space_id); 4384 4385 err = que_eval_sql(info, 4386 "PROCEDURE RENAME_NEW_SPACE () IS\n" 4387 "BEGIN\n" 4388 "UPDATE SYS_TABLESPACES" 4389 " SET NAME = :old_name\n" 4390 " WHERE SPACE = :new_space;\n" 4391 "UPDATE SYS_DATAFILES" 4392 " SET PATH = :old_path\n" 4393 " WHERE SPACE = :new_space;\n" 4394 "END;\n", FALSE, trx); 4395 4396 ut_free(old_path); 4397 } 4398 4399 if (err == DB_SUCCESS && (new_table->flags2 & DICT_TF2_DISCARDED)) { 4400 err = row_import_update_discarded_flag( 4401 trx, new_table->id, true); 4402 } 4403 4404 trx->op_info = ""; 4405 4406 return(err); 4407 } 4408 4409 /** Create the index and load in to the dictionary. 4410 @param[in,out] table the index is on this table 4411 @param[in] index_def the index definition 4412 @param[in] add_v new virtual columns added along with add 4413 index call 4414 @return index, or NULL on error */ 4415 dict_index_t* 4416 row_merge_create_index( 4417 dict_table_t* table, 4418 const index_def_t* index_def, 4419 const dict_add_v_col_t* add_v) 4420 { 4421 dict_index_t* index; 4422 ulint n_fields = index_def->n_fields; 4423 ulint i; 4424 ulint n_add_vcol = 0; 4425 4426 DBUG_ENTER("row_merge_create_index"); 4427 4428 ut_ad(!srv_read_only_mode); 4429 4430 /* Create the index prototype, using the passed in def, this is not 4431 a persistent operation. We pass 0 as the space id, and determine at 4432 a lower level the space id where to store the table. */ 4433 4434 index = dict_mem_index_create(table, index_def->name, 4435 index_def->ind_type, n_fields); 4436 index->set_committed(index_def->rebuild); 4437 4438 for (i = 0; i < n_fields; i++) { 4439 const char* name; 4440 index_field_t* ifield = &index_def->fields[i]; 4441 4442 if (ifield->is_v_col) { 4443 if (ifield->col_no >= table->n_v_def) { 4444 ut_ad(ifield->col_no < table->n_v_def 4445 + add_v->n_v_col); 4446 ut_ad(ifield->col_no >= table->n_v_def); 4447 name = add_v->v_col_name[ 4448 ifield->col_no - table->n_v_def]; 4449 n_add_vcol++; 4450 } else { 4451 name = dict_table_get_v_col_name( 4452 table, ifield->col_no); 4453 } 4454 } else { 4455 name = dict_table_get_col_name(table, ifield->col_no); 4456 } 4457 4458 dict_mem_index_add_field(index, name, ifield->prefix_len); 4459 } 4460 4461 if (n_add_vcol) { 4462 index->assign_new_v_col(n_add_vcol); 4463 } 4464 4465 DBUG_RETURN(index); 4466 } 4467 4468 /*********************************************************************//** 4469 Check if a transaction can use an index. */ 4470 bool 4471 row_merge_is_index_usable( 4472 /*======================*/ 4473 const trx_t* trx, /*!< in: transaction */ 4474 const dict_index_t* index) /*!< in: index to check */ 4475 { 4476 if (!index->is_primary() 4477 && dict_index_is_online_ddl(index)) { 4478 /* Indexes that are being created are not useable. */ 4479 return(false); 4480 } 4481 4482 return(!index->is_corrupted() 4483 && (index->table->is_temporary() || index->table->no_rollback() 4484 || index->trx_id == 0 4485 || !trx->read_view.is_open() 4486 || trx->read_view.changes_visible( 4487 index->trx_id, 4488 index->table->name))); 4489 } 4490 4491 /*********************************************************************//** 4492 Drop a table. The caller must have ensured that the background stats 4493 thread is not processing the table. This can be done by calling 4494 dict_stats_wait_bg_to_stop_using_table() after locking the dictionary and 4495 before calling this function. 4496 @return DB_SUCCESS or error code */ 4497 dberr_t 4498 row_merge_drop_table( 4499 /*=================*/ 4500 trx_t* trx, /*!< in: transaction */ 4501 dict_table_t* table) /*!< in: table to drop */ 4502 { 4503 ut_ad(!srv_read_only_mode); 4504 4505 /* There must be no open transactions on the table. */ 4506 ut_a(table->get_ref_count() == 0); 4507 4508 return(row_drop_table_for_mysql(table->name.m_name, 4509 trx, SQLCOM_DROP_TABLE, false, false)); 4510 } 4511 4512 /** Write an MLOG_INDEX_LOAD record to indicate in the redo-log 4513 that redo-logging of individual index pages was disabled, and 4514 the flushing of such pages to the data files was completed. 4515 @param[in] index an index tree on which redo logging was disabled */ 4516 void row_merge_write_redo(const dict_index_t* index) 4517 { 4518 ut_ad(!index->table->is_temporary()); 4519 ut_ad(!(index->type & (DICT_SPATIAL | DICT_FTS))); 4520 4521 mtr_t mtr; 4522 mtr.start(); 4523 byte* log_ptr = mlog_open(&mtr, 11 + 8); 4524 log_ptr = mlog_write_initial_log_record_low( 4525 MLOG_INDEX_LOAD, 4526 index->table->space_id, index->page, log_ptr, &mtr); 4527 mach_write_to_8(log_ptr, index->id); 4528 mlog_close(&mtr, log_ptr + 8); 4529 mtr.commit(); 4530 } 4531 4532 /** Build indexes on a table by reading a clustered index, creating a temporary 4533 file containing index entries, merge sorting these index entries and inserting 4534 sorted index entries to indexes. 4535 @param[in] trx transaction 4536 @param[in] old_table table where rows are read from 4537 @param[in] new_table table where indexes are created; identical to 4538 old_table unless creating a PRIMARY KEY 4539 @param[in] online true if creating indexes online 4540 @param[in] indexes indexes to be created 4541 @param[in] key_numbers MySQL key numbers 4542 @param[in] n_indexes size of indexes[] 4543 @param[in,out] table MySQL table, for reporting erroneous key value 4544 if applicable 4545 @param[in] defaults default values of added, changed columns, or NULL 4546 @param[in] col_map mapping of old column numbers to new ones, or 4547 NULL if old_table == new_table 4548 @param[in] add_autoinc number of added AUTO_INCREMENT columns, or 4549 ULINT_UNDEFINED if none is added 4550 @param[in,out] sequence autoinc sequence 4551 @param[in] skip_pk_sort whether the new PRIMARY KEY will follow 4552 existing order 4553 @param[in,out] stage performance schema accounting object, used by 4554 ALTER TABLE. stage->begin_phase_read_pk() will be called at the beginning of 4555 this function and it will be passed to other functions for further accounting. 4556 @param[in] add_v new virtual columns added along with indexes 4557 @param[in] eval_table mysql table used to evaluate virtual column 4558 value, see innobase_get_computed_value(). 4559 @param[in] allow_not_null allow the conversion from null to not-null 4560 @return DB_SUCCESS or error code */ 4561 dberr_t 4562 row_merge_build_indexes( 4563 trx_t* trx, 4564 dict_table_t* old_table, 4565 dict_table_t* new_table, 4566 bool online, 4567 dict_index_t** indexes, 4568 const ulint* key_numbers, 4569 ulint n_indexes, 4570 struct TABLE* table, 4571 const dtuple_t* defaults, 4572 const ulint* col_map, 4573 ulint add_autoinc, 4574 ib_sequence_t& sequence, 4575 bool skip_pk_sort, 4576 ut_stage_alter_t* stage, 4577 const dict_add_v_col_t* add_v, 4578 struct TABLE* eval_table, 4579 bool allow_not_null) 4580 { 4581 merge_file_t* merge_files; 4582 row_merge_block_t* block; 4583 ut_new_pfx_t block_pfx; 4584 size_t block_size; 4585 ut_new_pfx_t crypt_pfx; 4586 row_merge_block_t* crypt_block = NULL; 4587 ulint i; 4588 ulint j; 4589 dberr_t error; 4590 pfs_os_file_t tmpfd = OS_FILE_CLOSED; 4591 dict_index_t* fts_sort_idx = NULL; 4592 fts_psort_t* psort_info = NULL; 4593 fts_psort_t* merge_info = NULL; 4594 int64_t sig_count = 0; 4595 bool fts_psort_initiated = false; 4596 4597 double total_static_cost = 0; 4598 double total_dynamic_cost = 0; 4599 ulint total_index_blocks = 0; 4600 double pct_cost=0; 4601 double pct_progress=0; 4602 4603 DBUG_ENTER("row_merge_build_indexes"); 4604 4605 ut_ad(!srv_read_only_mode); 4606 ut_ad((old_table == new_table) == !col_map); 4607 ut_ad(!defaults || col_map); 4608 4609 stage->begin_phase_read_pk(skip_pk_sort && new_table != old_table 4610 ? n_indexes - 1 4611 : n_indexes); 4612 4613 /* Allocate memory for merge file data structure and initialize 4614 fields */ 4615 4616 ut_allocator<row_merge_block_t> alloc(mem_key_row_merge_sort); 4617 4618 /* This will allocate "3 * srv_sort_buf_size" elements of type 4619 row_merge_block_t. The latter is defined as byte. */ 4620 block_size = 3 * srv_sort_buf_size; 4621 block = alloc.allocate_large(block_size, &block_pfx); 4622 4623 if (block == NULL) { 4624 DBUG_RETURN(DB_OUT_OF_MEMORY); 4625 } 4626 4627 crypt_pfx.m_size = 0; /* silence bogus -Wmaybe-uninitialized */ 4628 TRASH_ALLOC(&crypt_pfx, sizeof crypt_pfx); 4629 4630 if (log_tmp_is_encrypted()) { 4631 crypt_block = static_cast<row_merge_block_t*>( 4632 alloc.allocate_large(block_size, 4633 &crypt_pfx)); 4634 4635 if (crypt_block == NULL) { 4636 DBUG_RETURN(DB_OUT_OF_MEMORY); 4637 } 4638 } 4639 4640 trx_start_if_not_started_xa(trx, true); 4641 ulint n_merge_files = 0; 4642 4643 for (ulint i = 0; i < n_indexes; i++) 4644 { 4645 if (!dict_index_is_spatial(indexes[i])) { 4646 n_merge_files++; 4647 } 4648 } 4649 4650 merge_files = static_cast<merge_file_t*>( 4651 ut_malloc_nokey(n_merge_files * sizeof *merge_files)); 4652 4653 /* Initialize all the merge file descriptors, so that we 4654 don't call row_merge_file_destroy() on uninitialized 4655 merge file descriptor */ 4656 4657 for (i = 0; i < n_merge_files; i++) { 4658 merge_files[i].fd = OS_FILE_CLOSED; 4659 merge_files[i].offset = 0; 4660 merge_files[i].n_rec = 0; 4661 } 4662 4663 total_static_cost = COST_BUILD_INDEX_STATIC * n_indexes + COST_READ_CLUSTERED_INDEX; 4664 total_dynamic_cost = COST_BUILD_INDEX_DYNAMIC * n_indexes; 4665 for (i = 0; i < n_indexes; i++) { 4666 if (indexes[i]->type & DICT_FTS) { 4667 ibool opt_doc_id_size = FALSE; 4668 4669 /* To build FTS index, we would need to extract 4670 doc's word, Doc ID, and word's position, so 4671 we need to build a "fts sort index" indexing 4672 on above three 'fields' */ 4673 fts_sort_idx = row_merge_create_fts_sort_index( 4674 indexes[i], old_table, &opt_doc_id_size); 4675 4676 row_merge_dup_t* dup 4677 = static_cast<row_merge_dup_t*>( 4678 ut_malloc_nokey(sizeof *dup)); 4679 dup->index = fts_sort_idx; 4680 dup->table = table; 4681 dup->col_map = col_map; 4682 dup->n_dup = 0; 4683 4684 /* This can fail e.g. if temporal files can't be 4685 created */ 4686 if (!row_fts_psort_info_init( 4687 trx, dup, new_table, opt_doc_id_size, 4688 dict_table_page_size(old_table), 4689 &psort_info, &merge_info)) { 4690 error = DB_CORRUPTION; 4691 goto func_exit; 4692 } 4693 4694 /* We need to ensure that we free the resources 4695 allocated */ 4696 fts_psort_initiated = true; 4697 } 4698 } 4699 4700 if (global_system_variables.log_warnings > 2) { 4701 sql_print_information("InnoDB: Online DDL : Start reading" 4702 " clustered index of the table" 4703 " and create temporary files"); 4704 } 4705 4706 pct_cost = COST_READ_CLUSTERED_INDEX * 100 / (total_static_cost + total_dynamic_cost); 4707 4708 /* Do not continue if we can't encrypt table pages */ 4709 if (!old_table->is_readable() || 4710 !new_table->is_readable()) { 4711 error = DB_DECRYPTION_FAILED; 4712 ib_push_warning(trx->mysql_thd, DB_DECRYPTION_FAILED, 4713 "Table %s is encrypted but encryption service or" 4714 " used key_id is not available. " 4715 " Can't continue reading table.", 4716 !old_table->is_readable() ? old_table->name.m_name : 4717 new_table->name.m_name); 4718 goto func_exit; 4719 } 4720 4721 /* Read clustered index of the table and create files for 4722 secondary index entries for merge sort */ 4723 error = row_merge_read_clustered_index( 4724 trx, table, old_table, new_table, online, indexes, 4725 fts_sort_idx, psort_info, merge_files, key_numbers, 4726 n_indexes, defaults, add_v, col_map, add_autoinc, 4727 sequence, block, skip_pk_sort, &tmpfd, stage, 4728 pct_cost, crypt_block, eval_table, allow_not_null); 4729 4730 stage->end_phase_read_pk(); 4731 4732 pct_progress += pct_cost; 4733 4734 if (global_system_variables.log_warnings > 2) { 4735 sql_print_information("InnoDB: Online DDL : End of reading " 4736 "clustered index of the table" 4737 " and create temporary files"); 4738 } 4739 4740 for (i = 0; i < n_merge_files; i++) { 4741 total_index_blocks += merge_files[i].offset; 4742 } 4743 4744 if (error != DB_SUCCESS) { 4745 goto func_exit; 4746 } 4747 4748 DEBUG_SYNC_C("row_merge_after_scan"); 4749 4750 /* Now we have files containing index entries ready for 4751 sorting and inserting. */ 4752 4753 for (ulint k = 0, i = 0; i < n_indexes; i++) { 4754 dict_index_t* sort_idx = indexes[i]; 4755 4756 if (dict_index_is_spatial(sort_idx)) { 4757 continue; 4758 } 4759 4760 if (indexes[i]->type & DICT_FTS) { 4761 os_event_t fts_parallel_merge_event; 4762 4763 sort_idx = fts_sort_idx; 4764 4765 fts_parallel_merge_event 4766 = merge_info[0].psort_common->merge_event; 4767 4768 if (FTS_PLL_MERGE) { 4769 ulint trial_count = 0; 4770 bool all_exit = false; 4771 4772 os_event_reset(fts_parallel_merge_event); 4773 row_fts_start_parallel_merge(merge_info); 4774 wait_again: 4775 os_event_wait_time_low( 4776 fts_parallel_merge_event, 1000000, 4777 sig_count); 4778 4779 for (j = 0; j < FTS_NUM_AUX_INDEX; j++) { 4780 if (merge_info[j].child_status 4781 != FTS_CHILD_COMPLETE 4782 && merge_info[j].child_status 4783 != FTS_CHILD_EXITING) { 4784 sig_count = os_event_reset( 4785 fts_parallel_merge_event); 4786 4787 goto wait_again; 4788 } 4789 } 4790 4791 /* Now all children should complete, wait 4792 a bit until they all finish using event */ 4793 while (!all_exit && trial_count < 10000) { 4794 all_exit = true; 4795 4796 for (j = 0; j < FTS_NUM_AUX_INDEX; 4797 j++) { 4798 if (merge_info[j].child_status 4799 != FTS_CHILD_EXITING) { 4800 all_exit = false; 4801 os_thread_sleep(1000); 4802 break; 4803 } 4804 } 4805 trial_count++; 4806 } 4807 4808 if (!all_exit) { 4809 ib::error() << "Not all child merge" 4810 " threads exited when creating" 4811 " FTS index '" 4812 << indexes[i]->name << "'"; 4813 } else { 4814 for (j = 0; j < FTS_NUM_AUX_INDEX; 4815 j++) { 4816 4817 os_thread_join(merge_info[j] 4818 .thread_hdl); 4819 } 4820 } 4821 } else { 4822 /* This cannot report duplicates; an 4823 assertion would fail in that case. */ 4824 error = row_fts_merge_insert( 4825 sort_idx, new_table, 4826 psort_info, 0); 4827 } 4828 4829 #ifdef FTS_INTERNAL_DIAG_PRINT 4830 DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n"); 4831 #endif 4832 } else if (merge_files[k].fd != OS_FILE_CLOSED) { 4833 char buf[NAME_LEN + 1]; 4834 row_merge_dup_t dup = { 4835 sort_idx, table, col_map, 0}; 4836 4837 pct_cost = (COST_BUILD_INDEX_STATIC + 4838 (total_dynamic_cost * merge_files[k].offset / 4839 total_index_blocks)) / 4840 (total_static_cost + total_dynamic_cost) 4841 * PCT_COST_MERGESORT_INDEX * 100; 4842 char* bufend = innobase_convert_name( 4843 buf, sizeof buf, 4844 indexes[i]->name, 4845 strlen(indexes[i]->name), 4846 trx->mysql_thd); 4847 buf[bufend - buf]='\0'; 4848 4849 if (global_system_variables.log_warnings > 2) { 4850 sql_print_information("InnoDB: Online DDL :" 4851 " Start merge-sorting" 4852 " index %s" 4853 " (" ULINTPF 4854 " / " ULINTPF ")," 4855 " estimated cost :" 4856 " %2.4f", 4857 buf, i + 1, n_indexes, 4858 pct_cost); 4859 } 4860 4861 error = row_merge_sort( 4862 trx, &dup, &merge_files[k], 4863 block, &tmpfd, true, 4864 pct_progress, pct_cost, 4865 crypt_block, new_table->space_id, 4866 stage); 4867 4868 pct_progress += pct_cost; 4869 4870 if (global_system_variables.log_warnings > 2) { 4871 sql_print_information("InnoDB: Online DDL :" 4872 " End of " 4873 " merge-sorting index %s" 4874 " (" ULINTPF 4875 " / " ULINTPF ")", 4876 buf, i + 1, n_indexes); 4877 } 4878 4879 if (error == DB_SUCCESS) { 4880 BtrBulk btr_bulk(sort_idx, trx, 4881 trx->get_flush_observer()); 4882 4883 pct_cost = (COST_BUILD_INDEX_STATIC + 4884 (total_dynamic_cost * merge_files[k].offset / 4885 total_index_blocks)) / 4886 (total_static_cost + total_dynamic_cost) * 4887 PCT_COST_INSERT_INDEX * 100; 4888 4889 if (global_system_variables.log_warnings > 2) { 4890 sql_print_information( 4891 "InnoDB: Online DDL : Start " 4892 "building index %s" 4893 " (" ULINTPF 4894 " / " ULINTPF "), estimated " 4895 "cost : %2.4f", buf, i + 1, 4896 n_indexes, pct_cost); 4897 } 4898 4899 error = row_merge_insert_index_tuples( 4900 sort_idx, old_table, 4901 merge_files[k].fd, block, NULL, 4902 &btr_bulk, 4903 merge_files[k].n_rec, pct_progress, pct_cost, 4904 crypt_block, new_table->space_id, 4905 stage); 4906 4907 error = btr_bulk.finish(error); 4908 4909 pct_progress += pct_cost; 4910 4911 if (global_system_variables.log_warnings > 2) { 4912 sql_print_information( 4913 "InnoDB: Online DDL : " 4914 "End of building index %s" 4915 " (" ULINTPF " / " ULINTPF ")", 4916 buf, i + 1, n_indexes); 4917 } 4918 } 4919 } 4920 4921 /* Close the temporary file to free up space. */ 4922 row_merge_file_destroy(&merge_files[k++]); 4923 4924 if (indexes[i]->type & DICT_FTS) { 4925 row_fts_psort_info_destroy(psort_info, merge_info); 4926 fts_psort_initiated = false; 4927 } else if (dict_index_is_spatial(indexes[i])) { 4928 /* We never disable redo logging for 4929 creating SPATIAL INDEX. Avoid writing any 4930 unnecessary MLOG_INDEX_LOAD record. */ 4931 } else if (old_table != new_table) { 4932 ut_ad(!sort_idx->online_log); 4933 ut_ad(sort_idx->online_status 4934 == ONLINE_INDEX_COMPLETE); 4935 } else if (FlushObserver* flush_observer = 4936 trx->get_flush_observer()) { 4937 if (error != DB_SUCCESS) { 4938 flush_observer->interrupted(); 4939 } 4940 flush_observer->flush(); 4941 row_merge_write_redo(indexes[i]); 4942 } 4943 4944 if (old_table != new_table 4945 || (indexes[i]->type & (DICT_FTS | DICT_SPATIAL)) 4946 || error != DB_SUCCESS || !online) { 4947 /* Do not apply any online log. */ 4948 } else { 4949 if (global_system_variables.log_warnings > 2) { 4950 sql_print_information( 4951 "InnoDB: Online DDL : Applying" 4952 " log to index"); 4953 } 4954 4955 DEBUG_SYNC_C("row_log_apply_before"); 4956 error = row_log_apply(trx, sort_idx, table, stage); 4957 DEBUG_SYNC_C("row_log_apply_after"); 4958 } 4959 4960 if (error != DB_SUCCESS) { 4961 trx->error_key_num = key_numbers[i]; 4962 goto func_exit; 4963 } 4964 4965 if (indexes[i]->type & DICT_FTS 4966 && UNIV_UNLIKELY(fts_enable_diag_print)) { 4967 ib::info() << "Finished building full-text index " 4968 << indexes[i]->name; 4969 } 4970 } 4971 4972 func_exit: 4973 4974 DBUG_EXECUTE_IF( 4975 "ib_build_indexes_too_many_concurrent_trxs", 4976 error = DB_TOO_MANY_CONCURRENT_TRXS; 4977 trx->error_state = error;); 4978 4979 if (fts_psort_initiated) { 4980 /* Clean up FTS psort related resource */ 4981 row_fts_psort_info_destroy(psort_info, merge_info); 4982 fts_psort_initiated = false; 4983 } 4984 4985 row_merge_file_destroy_low(tmpfd); 4986 4987 for (i = 0; i < n_merge_files; i++) { 4988 row_merge_file_destroy(&merge_files[i]); 4989 } 4990 4991 if (fts_sort_idx) { 4992 dict_mem_index_free(fts_sort_idx); 4993 } 4994 4995 ut_free(merge_files); 4996 4997 alloc.deallocate_large(block, &block_pfx, block_size); 4998 4999 if (crypt_block) { 5000 alloc.deallocate_large(crypt_block, &crypt_pfx, block_size); 5001 } 5002 5003 DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID); 5004 5005 if (online && old_table == new_table && error != DB_SUCCESS) { 5006 /* On error, flag all online secondary index creation 5007 as aborted. */ 5008 for (i = 0; i < n_indexes; i++) { 5009 ut_ad(!(indexes[i]->type & DICT_FTS)); 5010 ut_ad(!indexes[i]->is_committed()); 5011 ut_ad(!dict_index_is_clust(indexes[i])); 5012 5013 /* Completed indexes should be dropped as 5014 well, and indexes whose creation was aborted 5015 should be dropped from the persistent 5016 storage. However, at this point we can only 5017 set some flags in the not-yet-published 5018 indexes. These indexes will be dropped later 5019 in row_merge_drop_indexes(), called by 5020 rollback_inplace_alter_table(). */ 5021 5022 switch (dict_index_get_online_status(indexes[i])) { 5023 case ONLINE_INDEX_COMPLETE: 5024 break; 5025 case ONLINE_INDEX_CREATION: 5026 rw_lock_x_lock( 5027 dict_index_get_lock(indexes[i])); 5028 row_log_abort_sec(indexes[i]); 5029 indexes[i]->type |= DICT_CORRUPT; 5030 rw_lock_x_unlock( 5031 dict_index_get_lock(indexes[i])); 5032 new_table->drop_aborted = TRUE; 5033 /* fall through */ 5034 case ONLINE_INDEX_ABORTED_DROPPED: 5035 case ONLINE_INDEX_ABORTED: 5036 MONITOR_ATOMIC_INC( 5037 MONITOR_BACKGROUND_DROP_INDEX); 5038 } 5039 } 5040 } 5041 5042 DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE();); 5043 5044 if (FlushObserver* flush_observer = trx->get_flush_observer()) { 5045 5046 DBUG_EXECUTE_IF("ib_index_build_fail_before_flush", 5047 error = DB_INTERRUPTED; 5048 ); 5049 5050 if (error != DB_SUCCESS) { 5051 flush_observer->interrupted(); 5052 } 5053 5054 flush_observer->flush(); 5055 5056 if (old_table != new_table) { 5057 for (const dict_index_t* index 5058 = dict_table_get_first_index(new_table); 5059 index != NULL; 5060 index = dict_table_get_next_index(index)) { 5061 if (!(index->type 5062 & (DICT_FTS | DICT_SPATIAL))) { 5063 row_merge_write_redo(index); 5064 } 5065 } 5066 } 5067 5068 trx->remove_flush_observer(); 5069 5070 if (trx_is_interrupted(trx)) { 5071 error = DB_INTERRUPTED; 5072 } 5073 } 5074 5075 DBUG_RETURN(error); 5076 } 5077