1 /*****************************************************************************
2 
3 Copyright (c) 2005, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0merge.cc
29 New index creation routines using a merge sort
30 
31 Created 12/4/2005 Jan Lindstrom
32 Completed by Sunny Bains and Marko Makela
33 *******************************************************/
34 
35 #include <math.h>
36 
37 #include "ha_prototypes.h"
38 
39 #include "row0merge.h"
40 #include "row0ext.h"
41 #include "row0log.h"
42 #include "row0ins.h"
43 #include "row0sel.h"
44 #include "dict0crea.h"
45 #include "trx0purge.h"
46 #include "lock0lock.h"
47 #include "pars0pars.h"
48 #include "ut0sort.h"
49 #include "row0ftsort.h"
50 #include "row0import.h"
51 #include "handler0alter.h"
52 #include "btr0bulk.h"
53 #include "fsp0sysspace.h"
54 #include "ut0new.h"
55 #include "ut0stage.h"
56 #include "os0file.h"
57 #include "my_aes.h"
58 
59 /* Ignore posix_fadvise() on those platforms where it does not exist */
60 #if defined _WIN32
61 # define posix_fadvise(fd, offset, len, advice) /* nothing */
62 #endif /* _WIN32 */
63 
64 /* Whether to disable file system cache */
65 char	srv_disable_sort_file_cache;
66 
67 /** Class that caches index row tuples made from a single cluster
68 index page scan, and then insert into corresponding index tree */
69 class index_tuple_info_t {
70 public:
71 	/** constructor
72 	@param[in]	heap	memory heap
73 	@param[in]	index	index to be created */
index_tuple_info_t(mem_heap_t * heap,dict_index_t * index)74 	index_tuple_info_t(
75 		mem_heap_t*	heap,
76 		dict_index_t*	index) UNIV_NOTHROW
77 	{
78 		m_heap = heap;
79 		m_index = index;
80 		m_dtuple_vec = UT_NEW_NOKEY(idx_tuple_vec());
81 	}
82 
83 	/** destructor */
~index_tuple_info_t()84 	~index_tuple_info_t()
85 	{
86 		UT_DELETE(m_dtuple_vec);
87 	}
88 
89 	/** Get the index object
90 	@return the index object */
get_index()91 	dict_index_t*   get_index() UNIV_NOTHROW
92 	{
93 		return(m_index);
94 	}
95 
96 	/** Caches an index row into index tuple vector
97 	@param[in]	row	table row
98 	@param[in]	ext	externally stored column
99 	prefixes, or NULL */
add(const dtuple_t * row,const row_ext_t * ext)100 	void add(
101 		const dtuple_t*		row,
102 		const row_ext_t*	ext) UNIV_NOTHROW
103 	{
104 		dtuple_t*	dtuple;
105 
106 		dtuple = row_build_index_entry(row, ext, m_index, m_heap);
107 
108 		ut_ad(dtuple);
109 
110 		m_dtuple_vec->push_back(dtuple);
111 	}
112 
113 	/** Insert spatial index rows cached in vector into spatial index
114 	@param[in]	trx_id		transaction id
115 	@param[in,out]	row_heap	memory heap
116 	@param[in]	pcur		cluster index scanning cursor
117 	@param[in,out]	scan_mtr	mini-transaction for pcur
118 	@param[out]	mtr_committed	whether scan_mtr got committed
119 	@return DB_SUCCESS if successful, else error number */
insert(trx_id_t trx_id,mem_heap_t * row_heap,btr_pcur_t * pcur,mtr_t * scan_mtr,bool * mtr_committed)120 	dberr_t insert(
121 		trx_id_t		trx_id,
122 		mem_heap_t*		row_heap,
123 		btr_pcur_t*		pcur,
124 		mtr_t*			scan_mtr,
125 		bool*			mtr_committed)
126 	{
127 		big_rec_t*      big_rec;
128 		rec_t*          rec;
129 		btr_cur_t       ins_cur;
130 		mtr_t           mtr;
131 		rtr_info_t      rtr_info;
132 		ulint*		ins_offsets = NULL;
133 		dberr_t		error = DB_SUCCESS;
134 		dtuple_t*	dtuple;
135 		ulint		count = 0;
136 		const ulint	flag = BTR_NO_UNDO_LOG_FLAG
137 				       | BTR_NO_LOCKING_FLAG
138 				       | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG;
139 
140 		ut_ad(dict_index_is_spatial(m_index));
141 
142 		DBUG_EXECUTE_IF("row_merge_instrument_log_check_flush",
143 			log_sys->check_flush_or_checkpoint = true;
144 		);
145 
146 		for (idx_tuple_vec::iterator it = m_dtuple_vec->begin();
147 		     it != m_dtuple_vec->end();
148 		     ++it) {
149 			dtuple = *it;
150 			ut_ad(dtuple);
151 
152 			if (log_sys->check_flush_or_checkpoint) {
153 				if (!(*mtr_committed)) {
154 					btr_pcur_move_to_prev_on_page(pcur);
155 					btr_pcur_store_position(pcur, scan_mtr);
156 					mtr_commit(scan_mtr);
157 					*mtr_committed = true;
158 				}
159 
160 				log_free_check();
161 			}
162 
163 			mtr.start();
164 			mtr.set_named_space(m_index->space);
165 
166 			ins_cur.index = m_index;
167 			rtr_init_rtr_info(&rtr_info, false, &ins_cur, m_index,
168 					  false);
169 			rtr_info_update_btr(&ins_cur, &rtr_info);
170 
171 			btr_cur_search_to_nth_level(m_index, 0, dtuple,
172 						    PAGE_CUR_RTREE_INSERT,
173 						    BTR_MODIFY_LEAF, &ins_cur,
174 						    0, __FILE__, __LINE__,
175 						    &mtr);
176 
177 			/* It need to update MBR in parent entry,
178 			so change search mode to BTR_MODIFY_TREE */
179 			if (rtr_info.mbr_adj) {
180 				mtr_commit(&mtr);
181 				rtr_clean_rtr_info(&rtr_info, true);
182 				rtr_init_rtr_info(&rtr_info, false, &ins_cur,
183 						  m_index, false);
184 				rtr_info_update_btr(&ins_cur, &rtr_info);
185 				mtr_start(&mtr);
186 				mtr.set_named_space(m_index->space);
187 				btr_cur_search_to_nth_level(
188 					m_index, 0, dtuple,
189 					PAGE_CUR_RTREE_INSERT,
190 					BTR_MODIFY_TREE, &ins_cur, 0,
191 					__FILE__, __LINE__, &mtr);
192 			}
193 
194 			error = btr_cur_optimistic_insert(
195 				flag, &ins_cur, &ins_offsets, &row_heap,
196 				dtuple, &rec, &big_rec, 0, NULL, &mtr);
197 
198 			if (error == DB_FAIL) {
199 				ut_ad(!big_rec);
200 				mtr.commit();
201 				mtr.start();
202 				mtr.set_named_space(m_index->space);
203 
204 				rtr_clean_rtr_info(&rtr_info, true);
205 				rtr_init_rtr_info(&rtr_info, false,
206 						  &ins_cur, m_index, false);
207 
208 				rtr_info_update_btr(&ins_cur, &rtr_info);
209 				btr_cur_search_to_nth_level(
210 					m_index, 0, dtuple,
211 					PAGE_CUR_RTREE_INSERT,
212 					BTR_MODIFY_TREE,
213 					&ins_cur, 0,
214 					__FILE__, __LINE__, &mtr);
215 
216 
217 				error = btr_cur_pessimistic_insert(
218 						flag, &ins_cur, &ins_offsets,
219 						&row_heap, dtuple, &rec,
220 						&big_rec, 0, NULL, &mtr);
221 			}
222 
223 			DBUG_EXECUTE_IF(
224 				"row_merge_ins_spatial_fail",
225 				error = DB_FAIL;
226 			);
227 
228 			if (error == DB_SUCCESS) {
229 				if (rtr_info.mbr_adj) {
230 					error = rtr_ins_enlarge_mbr(
231 							&ins_cur, NULL, &mtr);
232 				}
233 
234 				if (error == DB_SUCCESS) {
235 					page_update_max_trx_id(
236 						btr_cur_get_block(&ins_cur),
237 						btr_cur_get_page_zip(&ins_cur),
238 						trx_id, &mtr);
239 				}
240 			}
241 
242 			mtr_commit(&mtr);
243 
244 			rtr_clean_rtr_info(&rtr_info, true);
245 			count++;
246 		}
247 
248 		m_dtuple_vec->clear();
249 
250 		return(error);
251 	}
252 
253 private:
254 	/** Cache index rows made from a cluster index scan. Usually
255 	for rows on single cluster index page */
256 	typedef std::vector<dtuple_t*, ut_allocator<dtuple_t*> >
257 		idx_tuple_vec;
258 
259 	/** vector used to cache index rows made from cluster index scan */
260 	idx_tuple_vec*		m_dtuple_vec;
261 
262 	/** the index being built */
263 	dict_index_t*		m_index;
264 
265 	/** memory heap for creating index tuples */
266 	mem_heap_t*		m_heap;
267 };
268 
269 /* Maximum pending doc memory limit in bytes for a fts tokenization thread */
270 #define FTS_PENDING_DOC_MEMORY_LIMIT	1000000
271 
272 /** Insert sorted data tuples to the index.
273 @param[in]	trx_id		transaction identifier
274 @param[in]	index		index to be inserted
275 @param[in]	old_table	old table
276 @param[in]	fd		file descriptor
277 @param[in,out]	block		file buffer
278 @param[in,out]	crypt_block	encrypted file buffer
279 @param[in]	space_id	tablespace id
280 @param[in]	row_buf		row_buf the sorted data tuples,
281 or NULL if fd, block will be used instead
282 @param[in,out]	btr_bulk	btr bulk instance
283 @param[in,out]	stage		performance schema accounting object, used by
284 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
285 and then stage->inc() will be called for each record that is processed.
286 @return DB_SUCCESS or error number */
287 static	MY_ATTRIBUTE((warn_unused_result))
288 dberr_t
289 row_merge_insert_index_tuples(
290 	trx_id_t		trx_id,
291 	dict_index_t*		index,
292 	const dict_table_t*	old_table,
293 	int			fd,
294 	row_merge_block_t*	block,
295 	row_merge_block_t*	crypt_block,
296 	ulint			space_id,
297 	const row_merge_buf_t*	row_buf,
298 	BtrBulk*		btr_bulk,
299 	ut_stage_alter_t*	stage = NULL);
300 
301 /******************************************************//**
302 Encode an index record. */
303 static
304 void
row_merge_buf_encode(byte ** b,const dict_index_t * index,const mtuple_t * entry,ulint n_fields)305 row_merge_buf_encode(
306 /*=================*/
307 	byte**			b,		/*!< in/out: pointer to
308 						current end of output buffer */
309 	const dict_index_t*	index,		/*!< in: index */
310 	const mtuple_t*		entry,		/*!< in: index fields
311 						of the record to encode */
312 	ulint			n_fields)	/*!< in: number of fields
313 						in the entry */
314 {
315 	ulint	size;
316 	ulint	extra_size;
317 
318 	size = rec_get_converted_size_temp(
319 		index, entry->fields, n_fields, NULL, &extra_size);
320 	ut_ad(size >= extra_size);
321 
322 	/* Encode extra_size + 1 */
323 	if (extra_size + 1 < 0x80) {
324 		*(*b)++ = (byte) (extra_size + 1);
325 	} else {
326 		ut_ad((extra_size + 1) < 0x8000);
327 		*(*b)++ = (byte) (0x80 | ((extra_size + 1) >> 8));
328 		*(*b)++ = (byte) (extra_size + 1);
329 	}
330 
331 	rec_convert_dtuple_to_temp(*b + extra_size, index,
332 				   entry->fields, n_fields, NULL);
333 
334 	*b += size;
335 }
336 
337 /******************************************************//**
338 Allocate a sort buffer.
339 @return own: sort buffer */
340 static MY_ATTRIBUTE((malloc))
341 row_merge_buf_t*
row_merge_buf_create_low(mem_heap_t * heap,dict_index_t * index,ulint max_tuples,ulint buf_size)342 row_merge_buf_create_low(
343 /*=====================*/
344 	mem_heap_t*	heap,		/*!< in: heap where allocated */
345 	dict_index_t*	index,		/*!< in: secondary index */
346 	ulint		max_tuples,	/*!< in: maximum number of
347 					data tuples */
348 	ulint		buf_size)	/*!< in: size of the buffer,
349 					in bytes */
350 {
351 	row_merge_buf_t*	buf;
352 
353 	ut_ad(max_tuples > 0);
354 
355 	ut_ad(max_tuples <= srv_sort_buf_size);
356 
357 	buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
358 	buf->heap = heap;
359 	buf->index = index;
360 	buf->max_tuples = max_tuples;
361 	buf->tuples = static_cast<mtuple_t*>(
362 		ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
363 	buf->tmp_tuples = buf->tuples + max_tuples;
364 
365 	return(buf);
366 }
367 
368 /******************************************************//**
369 Allocate a sort buffer.
370 @return own: sort buffer */
371 row_merge_buf_t*
row_merge_buf_create(dict_index_t * index)372 row_merge_buf_create(
373 /*=================*/
374 	dict_index_t*	index)	/*!< in: secondary index */
375 {
376 	row_merge_buf_t*	buf;
377 	ulint			max_tuples;
378 	ulint			buf_size;
379 	mem_heap_t*		heap;
380 
381 	max_tuples = static_cast<ulint>(srv_sort_buf_size)
382 		/ ut_max(static_cast<ulint>(1),
383 			 dict_index_get_min_size(index));
384 
385 	buf_size = (sizeof *buf);
386 
387 	heap = mem_heap_create(buf_size);
388 
389 	buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
390 
391 	return(buf);
392 }
393 
394 /******************************************************//**
395 Empty a sort buffer.
396 @return sort buffer */
397 row_merge_buf_t*
row_merge_buf_empty(row_merge_buf_t * buf)398 row_merge_buf_empty(
399 /*================*/
400 	row_merge_buf_t*	buf)	/*!< in,own: sort buffer */
401 {
402 	ulint		buf_size	= sizeof *buf;
403 	ulint		max_tuples	= buf->max_tuples;
404 	mem_heap_t*	heap		= buf->heap;
405 	dict_index_t*	index		= buf->index;
406 	mtuple_t*	tuples		= buf->tuples;
407 
408 	mem_heap_empty(heap);
409 
410 	buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
411 	buf->heap = heap;
412 	buf->index = index;
413 	buf->max_tuples = max_tuples;
414 	buf->tuples = tuples;
415 	buf->tmp_tuples = buf->tuples + max_tuples;
416 
417 	return(buf);
418 }
419 
420 /******************************************************//**
421 Deallocate a sort buffer. */
422 void
row_merge_buf_free(row_merge_buf_t * buf)423 row_merge_buf_free(
424 /*===============*/
425 	row_merge_buf_t*	buf)	/*!< in,own: sort buffer to be freed */
426 {
427 	ut_free(buf->tuples);
428 	mem_heap_free(buf->heap);
429 }
430 
431 /** Convert the field data from compact to redundant format.
432 @param[in]	row_field	field to copy from
433 @param[out]	field		field to copy to
434 @param[in]	len		length of the field data
435 @param[in]	zip_size	compressed BLOB page size,
436 				zero for uncompressed BLOBs
437 @param[in,out]	heap		memory heap where to allocate data when
438 				converting to ROW_FORMAT=REDUNDANT, or NULL
439 				when not to invoke
440 				row_merge_buf_redundant_convert(). */
441 static
442 void
row_merge_buf_redundant_convert(const dfield_t * row_field,dfield_t * field,ulint len,const page_size_t & page_size,mem_heap_t * heap)443 row_merge_buf_redundant_convert(
444 	const dfield_t*		row_field,
445 	dfield_t*		field,
446 	ulint			len,
447 	const page_size_t&	page_size,
448 	mem_heap_t*		heap)
449 {
450 	ut_ad(DATA_MBMINLEN(field->type.mbminmaxlen) == 1);
451 	ut_ad(DATA_MBMAXLEN(field->type.mbminmaxlen) > 1);
452 
453 	byte*		buf = (byte*) mem_heap_alloc(heap, len);
454 	ulint		field_len = row_field->len;
455 	ut_ad(field_len <= len);
456 
457 	if (row_field->ext) {
458 		const byte*	field_data = static_cast<byte*>(
459 			dfield_get_data(row_field));
460 		ulint		ext_len;
461 
462 		ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
463 		ut_a(memcmp(field_data + field_len - BTR_EXTERN_FIELD_REF_SIZE,
464 			    field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
465 
466 		byte*	data = btr_copy_externally_stored_field(
467 			&ext_len, field_data, page_size, field_len, heap);
468 
469 		ut_ad(ext_len < len);
470 
471 		memcpy(buf, data, ext_len);
472 		field_len = ext_len;
473 	} else {
474 		memcpy(buf, row_field->data, field_len);
475 	}
476 
477 	memset(buf + field_len, 0x20, len - field_len);
478 
479 	dfield_set_data(field, buf, len);
480 }
481 
482 /** Insert a data tuple into a sort buffer.
483 @param[in,out]	buf		sort buffer
484 @param[in]	fts_index	fts index to be created
485 @param[in]	old_table	original table
486 @param[in]	new_table	new table
487 @param[in,out]	psort_info	parallel sort info
488 @param[in]	row		table row
489 @param[in]	ext		cache of externally stored
490 				column prefixes, or NULL
491 @param[in,out]	doc_id		Doc ID if we are creating
492 				FTS index
493 @param[in,out]	conv_heap	memory heap where to allocate data when
494 				converting to ROW_FORMAT=REDUNDANT, or NULL
495 				when not to invoke
496 				row_merge_buf_redundant_convert()
497 @param[in,out]	err		set if error occurs
498 @param[in,out]	v_heap		heap memory to process data for virtual column
499 @param[in,out]	my_table	mysql table object
500 @param[in]	trx		transaction object
501 @param[in]	prebuilt	compress_heap must be taken from here
502 @return number of rows added, 0 if out of space */
503 static
504 ulint
row_merge_buf_add(row_merge_buf_t * buf,dict_index_t * fts_index,const dict_table_t * old_table,const dict_table_t * new_table,fts_psort_t * psort_info,const dtuple_t * row,const row_ext_t * ext,doc_id_t * doc_id,mem_heap_t * conv_heap,dberr_t * err,mem_heap_t ** v_heap,TABLE * my_table,trx_t * trx,row_prebuilt_t * prebuilt)505 row_merge_buf_add(
506 	row_merge_buf_t*	buf,
507 	dict_index_t*		fts_index,
508 	const dict_table_t*	old_table,
509 	const dict_table_t*	new_table,
510 	fts_psort_t*		psort_info,
511 	const dtuple_t*		row,
512 	const row_ext_t*	ext,
513 	doc_id_t*		doc_id,
514 	mem_heap_t*		conv_heap,
515 	dberr_t*		err,
516 	mem_heap_t**		v_heap,
517 	TABLE*			my_table,
518 	trx_t*			trx,
519 	row_prebuilt_t*		prebuilt)
520 {
521 	ulint			i;
522 	const dict_index_t*	index;
523 	mtuple_t*		entry;
524 	dfield_t*		field;
525 	const dict_field_t*	ifield;
526 	ulint			n_fields;
527 	ulint			data_size;
528 	ulint			extra_size;
529 	ulint			bucket = 0;
530 	doc_id_t		write_doc_id;
531 	ulint			n_row_added = 0;
532 	DBUG_ENTER("row_merge_buf_add");
533 
534 	if (buf->n_tuples >= buf->max_tuples) {
535 		DBUG_RETURN(0);
536 	}
537 
538 	DBUG_EXECUTE_IF(
539 		"ib_row_merge_buf_add_two",
540 		if (buf->n_tuples >= 2) DBUG_RETURN(0););
541 
542 	UNIV_PREFETCH_R(row->fields);
543 
544 	/* If we are building FTS index, buf->index points to
545 	the 'fts_sort_idx', and real FTS index is stored in
546 	fts_index */
547 	index = (buf->index->type & DICT_FTS) ? fts_index : buf->index;
548 
549 	/* create spatial index should not come here */
550 	ut_ad(!dict_index_is_spatial(index));
551 
552 	n_fields = dict_index_get_n_fields(index);
553 
554 	entry = &buf->tuples[buf->n_tuples];
555 	field = entry->fields = static_cast<dfield_t*>(
556 		mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
557 
558 	data_size = 0;
559 	extra_size = UT_BITS_IN_BYTES(index->n_nullable);
560 
561 	ifield = dict_index_get_nth_field(index, 0);
562 
563 	for (i = 0; i < n_fields; i++, field++, ifield++) {
564 		ulint			len;
565 		const dict_col_t*	col;
566 		const dict_v_col_t*	v_col = NULL;
567 		ulint			col_no;
568 		ulint			fixed_len;
569 		const dfield_t*		row_field;
570 
571 		col = ifield->col;
572 		if (dict_col_is_virtual(col)) {
573 			v_col = reinterpret_cast<const dict_v_col_t*>(col);
574 		}
575 
576 		col_no = dict_col_get_no(col);
577 
578 		/* Process the Doc ID column */
579 		if (*doc_id > 0
580 		    && col_no == index->table->fts->doc_col
581 		    && !dict_col_is_virtual(col)) {
582 			fts_write_doc_id((byte*) &write_doc_id, *doc_id);
583 
584 			/* Note: field->data now points to a value on the
585 			stack: &write_doc_id after dfield_set_data(). Because
586 			there is only one doc_id per row, it shouldn't matter.
587 			We allocate a new buffer before we leave the function
588 			later below. */
589 
590 			dfield_set_data(
591 				field, &write_doc_id, sizeof(write_doc_id));
592 
593 			field->type.mtype = ifield->col->mtype;
594 			field->type.prtype = ifield->col->prtype;
595 			field->type.mbminmaxlen = DATA_MBMINMAXLEN(0, 0);
596 			field->type.len = ifield->col->len;
597 		} else {
598 			/* Use callback to get the virtual column value */
599 			if (dict_col_is_virtual(col)) {
600 				dict_index_t*	clust_index
601 					= dict_table_get_first_index(new_table);
602 
603 				row_field = innobase_get_computed_value(
604 					row, v_col, clust_index,
605 					v_heap, NULL, ifield, trx->mysql_thd,
606 					my_table, old_table, NULL, NULL,
607 					prebuilt);
608 
609 				if (row_field == NULL) {
610 					*err = DB_COMPUTE_VALUE_FAILED;
611 					DBUG_RETURN(0);
612 				}
613 				dfield_copy(field, row_field);
614 			} else {
615 				row_field = dtuple_get_nth_field(row, col_no);
616 				dfield_copy(field, row_field);
617 			}
618 
619 
620 			/* Tokenize and process data for FTS */
621 			if (index->type & DICT_FTS) {
622 				fts_doc_item_t*	doc_item;
623 				byte*		value;
624 				void*		ptr;
625 				const ulint	max_trial_count = 10000;
626 				ulint		trial_count = 0;
627 
628 				/* fetch Doc ID if it already exists
629 				in the row, and not supplied by the
630 				caller. Even if the value column is
631 				NULL, we still need to get the Doc
632 				ID so to maintain the correct max
633 				Doc ID */
634 				if (*doc_id == 0) {
635 					const dfield_t*	doc_field;
636 					doc_field = dtuple_get_nth_field(
637 						row,
638 						index->table->fts->doc_col);
639 					*doc_id = (doc_id_t) mach_read_from_8(
640 						static_cast<byte*>(
641 						dfield_get_data(doc_field)));
642 
643 					if (*doc_id == 0) {
644 						ib::warn() << "FTS Doc ID is"
645 							" zero. Record"
646 							" skipped";
647 						DBUG_RETURN(0);
648 					}
649 				}
650 
651 				if (dfield_is_null(field)) {
652 					n_row_added = 1;
653 					continue;
654 				}
655 
656 				ptr = ut_malloc_nokey(sizeof(*doc_item)
657 						      + field->len);
658 
659 				doc_item = static_cast<fts_doc_item_t*>(ptr);
660 				value = static_cast<byte*>(ptr)
661 					+ sizeof(*doc_item);
662 				memcpy(value, field->data, field->len);
663 				field->data = value;
664 
665 				doc_item->field = field;
666 				doc_item->doc_id = *doc_id;
667 
668 				bucket = *doc_id % fts_sort_pll_degree;
669 
670 				/* Add doc item to fts_doc_list */
671 				mutex_enter(&psort_info[bucket].mutex);
672 
673 				if (psort_info[bucket].error == DB_SUCCESS) {
674 					UT_LIST_ADD_LAST(
675 						psort_info[bucket].fts_doc_list,
676 						doc_item);
677 					psort_info[bucket].memory_used +=
678 						sizeof(*doc_item) + field->len;
679 				} else {
680 					ut_free(doc_item);
681 				}
682 
683 				mutex_exit(&psort_info[bucket].mutex);
684 
685 				/* Sleep when memory used exceeds limit*/
686 				while (psort_info[bucket].memory_used
687 				       > FTS_PENDING_DOC_MEMORY_LIMIT
688 				       && trial_count++ < max_trial_count) {
689 					os_thread_sleep(1000);
690 				}
691 
692 				n_row_added = 1;
693 				continue;
694 			}
695 
696 			if (field->len != UNIV_SQL_NULL
697 			    && col->mtype == DATA_MYSQL
698 			    && col->len != field->len) {
699 				if (conv_heap != NULL) {
700 					row_merge_buf_redundant_convert(
701 						row_field, field, col->len,
702 						dict_table_page_size(old_table),
703 						conv_heap);
704 				} else {
705 					/* Field length mismatch should not
706 					happen when rebuilding redundant row
707 					format table. */
708 					ut_ad(dict_table_is_comp(index->table));
709 				}
710 			}
711 		}
712 
713 		len = dfield_get_len(field);
714 
715 		if (dfield_is_null(field)) {
716 			ut_ad(!(col->prtype & DATA_NOT_NULL));
717 			continue;
718 		} else if (!ext) {
719 		} else if (dict_index_is_clust(index)) {
720 			/* Flag externally stored fields. */
721 			const byte*	buf = row_ext_lookup(ext, col_no,
722 							     &len);
723 			if (UNIV_LIKELY_NULL(buf)) {
724 				ut_a(buf != field_ref_zero);
725 				if (i < dict_index_get_n_unique(index)) {
726 					dfield_set_data(field, buf, len);
727 				} else {
728 					dfield_set_ext(field);
729 					len = dfield_get_len(field);
730 				}
731 			}
732 		} else if (!dict_col_is_virtual(col)) {
733 			/* Only non-virtual column are stored externally */
734 			const byte*	buf = row_ext_lookup(ext, col_no,
735 							     &len);
736 			if (UNIV_LIKELY_NULL(buf)) {
737 				ut_a(buf != field_ref_zero);
738 				dfield_set_data(field, buf, len);
739 			}
740 		}
741 
742 		/* If a column prefix index, take only the prefix */
743 
744 		if (ifield->prefix_len) {
745 			len = dtype_get_at_most_n_mbchars(
746 				col->prtype,
747 				col->mbminmaxlen,
748 				ifield->prefix_len,
749 				len,
750 				static_cast<char*>(dfield_get_data(field)));
751 			dfield_set_len(field, len);
752 		}
753 
754 		ut_ad(len <= col->len
755 		  || DATA_LARGE_MTYPE(col->mtype)
756 		  || (col->mtype == DATA_POINT
757 		      && len == DATA_MBR_LEN)
758 		  || ((col->mtype == DATA_VARCHAR
759 		       || col->mtype == DATA_BINARY
760 		       || col->mtype == DATA_VARMYSQL)
761 		      && (col->len == 0
762 		          || len <= col->len +
763 			     prtype_get_compression_extra(col->prtype))));
764 
765 		fixed_len = ifield->fixed_len;
766 		if (fixed_len && !dict_table_is_comp(index->table)
767 		    && DATA_MBMINLEN(col->mbminmaxlen)
768 		    != DATA_MBMAXLEN(col->mbminmaxlen)) {
769 			/* CHAR in ROW_FORMAT=REDUNDANT is always
770 			fixed-length, but in the temporary file it is
771 			variable-length for variable-length character
772 			sets. */
773 			fixed_len = 0;
774 		}
775 
776 		if (fixed_len) {
777 #ifdef UNIV_DEBUG
778 			ulint	mbminlen = DATA_MBMINLEN(col->mbminmaxlen);
779 			ulint	mbmaxlen = DATA_MBMAXLEN(col->mbminmaxlen);
780 
781 			/* len should be between size calcualted base on
782 			mbmaxlen and mbminlen */
783 			ut_ad(len <= fixed_len);
784 			ut_ad(!mbmaxlen || len >= mbminlen
785 			      * (fixed_len / mbmaxlen));
786 
787 			ut_ad(!dfield_is_ext(field));
788 #endif /* UNIV_DEBUG */
789 		} else if (dfield_is_ext(field)) {
790 			extra_size += 2;
791 		} else if (len < 128
792 			   || (!DATA_BIG_COL(col))) {
793 			extra_size++;
794 		} else {
795 			/* For variable-length columns, we look up the
796 			maximum length from the column itself.  If this
797 			is a prefix index column shorter than 256 bytes,
798 			this will waste one byte. */
799 			extra_size += 2;
800 		}
801 		data_size += len;
802 	}
803 
804 	/* If this is FTS index, we already populated the sort buffer, return
805 	here */
806 	if (index->type & DICT_FTS) {
807 		DBUG_RETURN(n_row_added);
808 	}
809 
810 #ifdef UNIV_DEBUG
811 	{
812 		ulint	size;
813 		ulint	extra;
814 
815 		size = rec_get_converted_size_temp(
816 			index, entry->fields, n_fields, NULL, &extra);
817 
818 		ut_ad(data_size + extra_size == size);
819 		ut_ad(extra_size == extra);
820 	}
821 #endif /* UNIV_DEBUG */
822 
823 	/* Add to the total size of the record in row_merge_block_t
824 	the encoded length of extra_size and the extra bytes (extra_size).
825 	See row_merge_buf_write() for the variable-length encoding
826 	of extra_size. */
827 	data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
828 
829 	/* Record size can exceed page size while converting to
830 	redundant row format. But there is assert
831 	ut_ad(size < UNIV_PAGE_SIZE) in rec_offs_data_size().
832 	It may hit the assert before attempting to insert the row. */
833 	if (conv_heap != NULL && data_size > UNIV_PAGE_SIZE) {
834 		*err = DB_TOO_BIG_RECORD;
835 	}
836 
837 	ut_ad(data_size < srv_sort_buf_size);
838 
839 	/* Reserve one byte for the end marker of row_merge_block_t. */
840 	if (buf->total_size + data_size >= srv_sort_buf_size - 1) {
841 		DBUG_RETURN(0);
842 	}
843 
844 	buf->total_size += data_size;
845 	buf->n_tuples++;
846 	n_row_added++;
847 
848 	field = entry->fields;
849 
850 	/* Copy the data fields. */
851 
852 	do {
853 		dfield_dup(field++, buf->heap);
854 	} while (--n_fields);
855 
856 	if (conv_heap != NULL) {
857 		mem_heap_empty(conv_heap);
858 	}
859 
860 	DBUG_RETURN(n_row_added);
861 }
862 
863 /*************************************************************//**
864 Report a duplicate key. */
865 void
row_merge_dup_report(row_merge_dup_t * dup,const dfield_t * entry)866 row_merge_dup_report(
867 /*=================*/
868 	row_merge_dup_t*	dup,	/*!< in/out: for reporting duplicates */
869 	const dfield_t*		entry)	/*!< in: duplicate index entry */
870 {
871 	if (!dup->n_dup++) {
872 		/* Only report the first duplicate record,
873 		but count all duplicate records. */
874 		innobase_fields_to_mysql(dup->table, dup->index, entry);
875 	}
876 }
877 
878 /*************************************************************//**
879 Compare two tuples.
880 @return positive, 0, negative if a is greater, equal, less, than b,
881 respectively */
882 static MY_ATTRIBUTE((warn_unused_result))
883 int
row_merge_tuple_cmp(ulint n_uniq,ulint n_field,const mtuple_t & a,const mtuple_t & b,row_merge_dup_t * dup)884 row_merge_tuple_cmp(
885 /*================*/
886 	ulint			n_uniq,	/*!< in: number of unique fields */
887 	ulint			n_field,/*!< in: number of fields */
888 	const mtuple_t&		a,	/*!< in: first tuple to be compared */
889 	const mtuple_t&		b,	/*!< in: second tuple to be compared */
890 	row_merge_dup_t*	dup)	/*!< in/out: for reporting duplicates,
891 					NULL if non-unique index */
892 {
893 	int		cmp;
894 	const dfield_t*	af	= a.fields;
895 	const dfield_t*	bf	= b.fields;
896 	ulint		n	= n_uniq;
897 
898 	ut_ad(n_uniq > 0);
899 	ut_ad(n_uniq <= n_field);
900 
901 	/* Compare the fields of the tuples until a difference is
902 	found or we run out of fields to compare.  If !cmp at the
903 	end, the tuples are equal. */
904 	do {
905 		cmp = cmp_dfield_dfield(af++, bf++);
906 	} while (!cmp && --n);
907 
908 	if (cmp) {
909 		return(cmp);
910 	}
911 
912 	if (dup) {
913 		/* Report a duplicate value error if the tuples are
914 		logically equal.  NULL columns are logically inequal,
915 		although they are equal in the sorting order.  Find
916 		out if any of the fields are NULL. */
917 		for (const dfield_t* df = a.fields; df != af; df++) {
918 			if (dfield_is_null(df)) {
919 				goto no_report;
920 			}
921 		}
922 
923 		row_merge_dup_report(dup, a.fields);
924 	}
925 
926 no_report:
927 	/* The n_uniq fields were equal, but we compare all fields so
928 	that we will get the same (internal) order as in the B-tree. */
929 	for (n = n_field - n_uniq + 1; --n; ) {
930 		cmp = cmp_dfield_dfield(af++, bf++);
931 		if (cmp) {
932 			return(cmp);
933 		}
934 	}
935 
936 	/* This should never be reached, except in a secondary index
937 	when creating a secondary index and a PRIMARY KEY, and there
938 	is a duplicate in the PRIMARY KEY that has not been detected
939 	yet. Internally, an index must never contain duplicates. */
940 	return(cmp);
941 }
942 
943 /** Wrapper for row_merge_tuple_sort() to inject some more context to
944 UT_SORT_FUNCTION_BODY().
945 @param tuples array of tuples that being sorted
946 @param aux work area, same size as tuples[]
947 @param low lower bound of the sorting area, inclusive
948 @param high upper bound of the sorting area, inclusive */
949 #define row_merge_tuple_sort_ctx(tuples, aux, low, high)		\
950 	row_merge_tuple_sort(n_uniq, n_field, dup, tuples, aux, low, high)
951 /** Wrapper for row_merge_tuple_cmp() to inject some more context to
952 UT_SORT_FUNCTION_BODY().
953 @param a first tuple to be compared
954 @param b second tuple to be compared
955 @return positive, 0, negative, if a is greater, equal, less, than b,
956 respectively */
957 #define row_merge_tuple_cmp_ctx(a,b)			\
958 	row_merge_tuple_cmp(n_uniq, n_field, a, b, dup)
959 
960 /**********************************************************************//**
961 Merge sort the tuple buffer in main memory. */
962 static
963 void
row_merge_tuple_sort(ulint n_uniq,ulint n_field,row_merge_dup_t * dup,mtuple_t * tuples,mtuple_t * aux,ulint low,ulint high)964 row_merge_tuple_sort(
965 /*=================*/
966 	ulint			n_uniq,	/*!< in: number of unique fields */
967 	ulint			n_field,/*!< in: number of fields */
968 	row_merge_dup_t*	dup,	/*!< in/out: reporter of duplicates
969 					(NULL if non-unique index) */
970 	mtuple_t*		tuples,	/*!< in/out: tuples */
971 	mtuple_t*		aux,	/*!< in/out: work area */
972 	ulint			low,	/*!< in: lower bound of the
973 					sorting area, inclusive */
974 	ulint			high)	/*!< in: upper bound of the
975 					sorting area, exclusive */
976 {
977 	ut_ad(n_field > 0);
978 	ut_ad(n_uniq <= n_field);
979 
980 	UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
981 			      tuples, aux, low, high, row_merge_tuple_cmp_ctx);
982 }
983 
984 /******************************************************//**
985 Sort a buffer. */
986 void
row_merge_buf_sort(row_merge_buf_t * buf,row_merge_dup_t * dup)987 row_merge_buf_sort(
988 /*===============*/
989 	row_merge_buf_t*	buf,	/*!< in/out: sort buffer */
990 	row_merge_dup_t*	dup)	/*!< in/out: reporter of duplicates
991 					(NULL if non-unique index) */
992 {
993 	ut_ad(!dict_index_is_spatial(buf->index));
994 
995 	row_merge_tuple_sort(dict_index_get_n_unique(buf->index),
996 			     dict_index_get_n_fields(buf->index),
997 			     dup,
998 			     buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
999 }
1000 
1001 /******************************************************//**
1002 Write a buffer to a block. */
1003 void
row_merge_buf_write(const row_merge_buf_t * buf,const merge_file_t * of UNIV_UNUSED,row_merge_block_t * block)1004 row_merge_buf_write(
1005 /*================*/
1006 	const row_merge_buf_t*	buf,	/*!< in: sorted buffer */
1007 	const merge_file_t*	of UNIV_UNUSED,
1008 					/*!< in: output file */
1009 	row_merge_block_t*	block)	/*!< out: buffer for writing to file */
1010 {
1011 	const dict_index_t*	index	= buf->index;
1012 	ulint			n_fields= dict_index_get_n_fields(index);
1013 	byte*			b	= &block[0];
1014 
1015 	DBUG_ENTER("row_merge_buf_write");
1016 
1017 	for (ulint i = 0; i < buf->n_tuples; i++) {
1018 		const mtuple_t*	entry	= &buf->tuples[i];
1019 
1020 		row_merge_buf_encode(&b, index, entry, n_fields);
1021 		ut_ad(b < &block[srv_sort_buf_size]);
1022 
1023 		DBUG_PRINT("ib_merge_sort",
1024 			   ("%p,fd=%d,%lu %lu: %s",
1025 			    reinterpret_cast<const void*>(b), of->fd,
1026 			    ulong(of->offset), ulong(i),
1027 			    rec_printer(entry->fields,
1028 					n_fields).str().c_str()));
1029 	}
1030 
1031 	/* Write an "end-of-chunk" marker. */
1032 	ut_a(b < &block[srv_sort_buf_size]);
1033 	ut_a(b == &block[0] + buf->total_size);
1034 	*b++ = 0;
1035 #ifdef UNIV_DEBUG_VALGRIND
1036 	/* The rest of the block is uninitialized.  Initialize it
1037 	to avoid bogus warnings. */
1038 	memset(b, 0xff, &block[srv_sort_buf_size] - b);
1039 #endif /* UNIV_DEBUG_VALGRIND */
1040 	DBUG_PRINT("ib_merge_sort",
1041 		   ("write %p,%d,%lu EOF",
1042 		    reinterpret_cast<const void*>(b), of->fd,
1043 		    ulong(of->offset)));
1044 	DBUG_VOID_RETURN;
1045 }
1046 
1047 /******************************************************//**
1048 Create a memory heap and allocate space for row_merge_rec_offsets()
1049 and mrec_buf_t[3].
1050 @return memory heap */
1051 static
1052 mem_heap_t*
row_merge_heap_create(const dict_index_t * index,mrec_buf_t ** buf,ulint ** offsets1,ulint ** offsets2)1053 row_merge_heap_create(
1054 /*==================*/
1055 	const dict_index_t*	index,		/*!< in: record descriptor */
1056 	mrec_buf_t**		buf,		/*!< out: 3 buffers */
1057 	ulint**			offsets1,	/*!< out: offsets */
1058 	ulint**			offsets2)	/*!< out: offsets */
1059 {
1060 	ulint		i	= 1 + REC_OFFS_HEADER_SIZE
1061 		+ dict_index_get_n_fields(index);
1062 	mem_heap_t*	heap	= mem_heap_create(2 * i * sizeof **offsets1
1063 						  + 3 * sizeof **buf);
1064 
1065 	*buf = static_cast<mrec_buf_t*>(
1066 		mem_heap_alloc(heap, 3 * sizeof **buf));
1067 	*offsets1 = static_cast<ulint*>(
1068 		mem_heap_alloc(heap, i * sizeof **offsets1));
1069 	*offsets2 = static_cast<ulint*>(
1070 		mem_heap_alloc(heap, i * sizeof **offsets2));
1071 
1072 	(*offsets1)[0] = (*offsets2)[0] = i;
1073 	(*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
1074 
1075 	return(heap);
1076 }
1077 
1078 /********************************************************************//**
1079 Read a merge block from the file system.
1080 @return TRUE if request was successful, FALSE if fail */
1081 ibool
row_merge_read(int fd,ulint offset,row_merge_block_t * buf,row_merge_block_t * crypt_buf,ulint space_id)1082 row_merge_read(
1083 /*===========*/
1084 	int			fd,		/*!< in: file descriptor */
1085 	ulint			offset,		/*!< in: offset where to read
1086 						in number of row_merge_block_t
1087 						elements */
1088 	row_merge_block_t*	buf,		/*!< out: data */
1089 	row_merge_block_t*	crypt_buf,	/*!< in: crypt buf or NULL */
1090 	ulint			space_id)	/*!< in: tablespace id */
1091 {
1092 	os_offset_t	ofs = ((os_offset_t) offset) * srv_sort_buf_size;
1093 	dberr_t		err;
1094 
1095 	DBUG_ENTER("row_merge_read");
1096 	DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
1097 	DBUG_EXECUTE_IF("row_merge_read_failure", DBUG_RETURN(FALSE););
1098 
1099 	IORequest	request;
1100 
1101 	/* Merge sort pages are never compressed. */
1102 	request.disable_compression();
1103 
1104 	err = os_file_read_no_error_handling_int_fd(
1105 		request,
1106 		fd, buf, ofs, srv_sort_buf_size, NULL);
1107 
1108 	/* For encrypted tables, decrypt data after reading and copy data */
1109 	if (err == DB_SUCCESS && log_tmp_is_encrypted()) {
1110 
1111 		if (log_tmp_block_decrypt(static_cast<const byte*>(buf),
1112 			srv_sort_buf_size, static_cast<byte*>(crypt_buf),
1113 			offset, space_id)) {
1114 			srv_stats.n_merge_blocks_decrypted.inc();
1115 			memcpy(buf, crypt_buf, srv_sort_buf_size);
1116 		} else {
1117 			err = DB_IO_DECRYPT_FAIL;
1118 		}
1119 	}
1120 
1121 #ifdef POSIX_FADV_DONTNEED
1122 	/* Each block is read exactly once.  Free up the file cache. */
1123 	posix_fadvise(fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
1124 #endif /* POSIX_FADV_DONTNEED */
1125 
1126 	if (err != DB_SUCCESS) {
1127 		ib::error() << "Failed to read merge block at " << ofs;
1128 	}
1129 
1130 	DBUG_RETURN(err == DB_SUCCESS);
1131 }
1132 
1133 /********************************************************************//**
1134 Write a merge block to the file system.
1135 @return TRUE if request was successful, FALSE if fail */
1136 ibool
row_merge_write(int fd,ulint offset,void * buf,void * crypt_buf,ulint space_id)1137 row_merge_write(
1138 /*============*/
1139 	int		fd,		/*!< in: file descriptor */
1140 	ulint		offset,		/*!< in: offset where to write,
1141 					in number of row_merge_block_t
1142 					elements */
1143 	void*		buf,		/*!< in/out: data */
1144 	void*		crypt_buf,	/*!< in: crypt buf or NULL */
1145 	ulint		space_id)	/*!< in: tablespace id */
1146 {
1147 	size_t		buf_len = srv_sort_buf_size;
1148 	os_offset_t	ofs = buf_len * (os_offset_t) offset;
1149 	dberr_t		err;
1150 	void*		out_buf = buf;
1151 
1152 	DBUG_ENTER("row_merge_write");
1153 	DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
1154 	DBUG_EXECUTE_IF("row_merge_write_failure", DBUG_RETURN(FALSE););
1155 
1156 	IORequest	request(IORequest::WRITE);
1157 
1158 	request.disable_compression();
1159 
1160 	/* For encrypted tables, encrypt data before writing */
1161 	if (log_tmp_is_encrypted()) {
1162 		if (!log_tmp_block_encrypt(static_cast<const byte*>(buf),
1163 					   srv_sort_buf_size,
1164 					   static_cast<byte*>(crypt_buf),
1165 					   offset, space_id)) {
1166 			ib::error() << "Failed encrypt block at " << offset;
1167 			DBUG_RETURN(FALSE);
1168 		}
1169 		srv_stats.n_merge_blocks_encrypted.inc();
1170 		out_buf = crypt_buf;
1171 	}
1172 
1173 	err = os_file_write_int_fd(
1174 		request,
1175 		"(merge)", fd, out_buf, ofs, buf_len);
1176 
1177 #ifdef POSIX_FADV_DONTNEED
1178 	/* The block will be needed on the next merge pass,
1179 	but it can be evicted from the file cache meanwhile. */
1180 	posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
1181 #endif /* POSIX_FADV_DONTNEED */
1182 
1183 	DBUG_RETURN(err == DB_SUCCESS);
1184 }
1185 
1186 /********************************************************************//**
1187 Read a merge record.
1188 @return pointer to next record, or NULL on I/O error or end of list */
1189 const byte*
row_merge_read_rec(row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,mrec_buf_t * buf,const byte * b,const dict_index_t * index,int fd,ulint * foffs,const mrec_t ** mrec,ulint * offsets)1190 row_merge_read_rec(
1191 /*===============*/
1192 	row_merge_block_t*	block,		/*!< in/out: file buffer */
1193 	row_merge_block_t*	crypt_block,	/*!< in: crypt buf or NULL */
1194 	ulint			space_id,	/*!< in: tablespace id */
1195 	mrec_buf_t*		buf,		/*!< in/out: secondary buffer */
1196 	const byte*		b,		/*!< in: pointer to record */
1197 	const dict_index_t*	index,		/*!< in: index of the record */
1198 	int			fd,		/*!< in: file descriptor */
1199 	ulint*			foffs,		/*!< in/out: file offset */
1200 	const mrec_t**		mrec,		/*!< out: pointer to merge
1201 						record, or NULL on end of list
1202 						(non-NULL on I/O error) */
1203 	ulint*			offsets)	/*!< out: offsets of mrec */
1204 {
1205 	ulint	extra_size;
1206 	ulint	data_size;
1207 	ulint	avail_size;
1208 
1209 	ut_ad(block);
1210 	ut_ad(buf);
1211 	ut_ad(b >= &block[0]);
1212 	ut_ad(b < &block[srv_sort_buf_size]);
1213 	ut_ad(index);
1214 	ut_ad(foffs);
1215 	ut_ad(mrec);
1216 	ut_ad(offsets);
1217 
1218 	ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
1219 	      + dict_index_get_n_fields(index));
1220 
1221 	DBUG_ENTER("row_merge_read_rec");
1222 
1223 	extra_size = *b++;
1224 
1225 	if (UNIV_UNLIKELY(!extra_size)) {
1226 		/* End of list */
1227 		*mrec = NULL;
1228 		DBUG_PRINT("ib_merge_sort",
1229 			   ("read %p,%p,%d,%lu EOF\n",
1230 			    reinterpret_cast<const void*>(b),
1231 			    reinterpret_cast<const void*>(block),
1232 			    fd, ulong(*foffs)));
1233 		DBUG_RETURN(NULL);
1234 	}
1235 
1236 	if (extra_size >= 0x80) {
1237 		/* Read another byte of extra_size. */
1238 
1239 		if (UNIV_UNLIKELY(b >= &block[srv_sort_buf_size])) {
1240 			if (!row_merge_read(fd, ++(*foffs), block,
1241 					    crypt_block, space_id)) {
1242 err_exit:
1243 				/* Signal I/O error. */
1244 				*mrec = b;
1245 				DBUG_RETURN(NULL);
1246 			}
1247 
1248 			/* Wrap around to the beginning of the buffer. */
1249 			b = &block[0];
1250 		}
1251 
1252 		extra_size = (extra_size & 0x7f) << 8;
1253 		extra_size |= *b++;
1254 	}
1255 
1256 	/* Normalize extra_size.  Above, value 0 signals "end of list". */
1257 	extra_size--;
1258 
1259 	/* Read the extra bytes. */
1260 
1261 	if (UNIV_UNLIKELY(b + extra_size >= &block[srv_sort_buf_size])) {
1262 		/* The record spans two blocks.  Copy the entire record
1263 		to the auxiliary buffer and handle this as a special
1264 		case. */
1265 
1266 		avail_size = &block[srv_sort_buf_size] - b;
1267 		ut_ad(avail_size < sizeof *buf);
1268 		memcpy(*buf, b, avail_size);
1269 
1270 		if (!row_merge_read(fd, ++(*foffs), block, crypt_block,
1271 				    space_id)) {
1272 
1273 			goto err_exit;
1274 		}
1275 
1276 		/* Wrap around to the beginning of the buffer. */
1277 		b = &block[0];
1278 
1279 		/* Copy the record. */
1280 		memcpy(*buf + avail_size, b, extra_size - avail_size);
1281 		b += extra_size - avail_size;
1282 
1283 		*mrec = *buf + extra_size;
1284 
1285 		rec_init_offsets_temp(*mrec, index, offsets);
1286 
1287 		data_size = rec_offs_data_size(offsets);
1288 
1289 		/* These overflows should be impossible given that
1290 		records are much smaller than either buffer, and
1291 		the record starts near the beginning of each buffer. */
1292 		ut_a(extra_size + data_size < sizeof *buf);
1293 		ut_a(b + data_size < &block[srv_sort_buf_size]);
1294 
1295 		/* Copy the data bytes. */
1296 		memcpy(*buf + extra_size, b, data_size);
1297 		b += data_size;
1298 
1299 		goto func_exit;
1300 	}
1301 
1302 	*mrec = b + extra_size;
1303 
1304 	rec_init_offsets_temp(*mrec, index, offsets);
1305 
1306 	data_size = rec_offs_data_size(offsets);
1307 	ut_ad(extra_size + data_size < sizeof *buf);
1308 
1309 	b += extra_size + data_size;
1310 
1311 	if (UNIV_LIKELY(b < &block[srv_sort_buf_size])) {
1312 		/* The record fits entirely in the block.
1313 		This is the normal case. */
1314 		goto func_exit;
1315 	}
1316 
1317 	/* The record spans two blocks.  Copy it to buf. */
1318 
1319 	b -= extra_size + data_size;
1320 	avail_size = &block[srv_sort_buf_size] - b;
1321 	memcpy(*buf, b, avail_size);
1322 	*mrec = *buf + extra_size;
1323 
1324 	/* We cannot invoke rec_offs_make_valid() here, because there
1325 	are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
1326 	Similarly, rec_offs_validate() would fail, because it invokes
1327 	rec_get_status(). */
1328 	ut_d(offsets[2] = (ulint) *mrec);
1329 	ut_d(offsets[3] = (ulint) index);
1330 
1331 	if (!row_merge_read(fd, ++(*foffs), block, crypt_block, space_id)) {
1332 
1333 		goto err_exit;
1334 	}
1335 
1336 	/* Wrap around to the beginning of the buffer. */
1337 	b = &block[0];
1338 
1339 	/* Copy the rest of the record. */
1340 	memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
1341 	b += extra_size + data_size - avail_size;
1342 
1343 func_exit:
1344 	DBUG_PRINT("ib_merge_sort",
1345 		   ("%p,%p,fd=%d,%lu: %s",
1346 		    reinterpret_cast<const void*>(b),
1347 		    reinterpret_cast<const void*>(block),
1348 		    fd, ulong(*foffs),
1349 		    rec_printer(*mrec, 0, offsets).str().c_str()));
1350 	DBUG_RETURN(b);
1351 }
1352 
1353 /********************************************************************//**
1354 Write a merge record. */
1355 static
1356 void
row_merge_write_rec_low(byte * b,ulint e,ulint size,int fd,ulint foffs,const mrec_t * mrec,const ulint * offsets)1357 row_merge_write_rec_low(
1358 /*====================*/
1359 	byte*		b,	/*!< out: buffer */
1360 	ulint		e,	/*!< in: encoded extra_size */
1361 #ifndef NDEBUG
1362 	ulint		size,	/*!< in: total size to write */
1363 	int		fd,	/*!< in: file descriptor */
1364 	ulint		foffs,	/*!< in: file offset */
1365 #endif /* !NDEBUG */
1366 	const mrec_t*	mrec,	/*!< in: record to write */
1367 	const ulint*	offsets)/*!< in: offsets of mrec */
1368 #ifdef NDEBUG
1369 # define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets)	\
1370 	row_merge_write_rec_low(b, e, mrec, offsets)
1371 #endif /* NDEBUG */
1372 {
1373 	DBUG_ENTER("row_merge_write_rec_low");
1374 
1375 #ifndef NDEBUG
1376 	const byte* const end = b + size;
1377 #endif /* NDEBUG */
1378 	assert(e == rec_offs_extra_size(offsets) + 1);
1379 	DBUG_PRINT("ib_merge_sort",
1380 		   ("%p,fd=%d,%lu: %s",
1381 		    reinterpret_cast<const void*>(b), fd, ulong(foffs),
1382 		    rec_printer(mrec, 0, offsets).str().c_str()));
1383 
1384 	if (e < 0x80) {
1385 		*b++ = (byte) e;
1386 	} else {
1387 		*b++ = (byte) (0x80 | (e >> 8));
1388 		*b++ = (byte) e;
1389 	}
1390 
1391 	memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
1392 	assert(b + rec_offs_size(offsets) == end);
1393 	DBUG_VOID_RETURN;
1394 }
1395 
1396 /********************************************************************//**
1397 Write a merge record.
1398 @return pointer to end of block, or NULL on error */
1399 static
1400 byte*
row_merge_write_rec(row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,mrec_buf_t * buf,byte * b,int fd,ulint * foffs,const mrec_t * mrec,const ulint * offsets)1401 row_merge_write_rec(
1402 /*================*/
1403 	row_merge_block_t*	block,		/*!< in/out: file buffer */
1404 	row_merge_block_t*	crypt_block,	/*!< in: crypt buf or NULL */
1405 	ulint			space_id,	/*!< in: tablespace id */
1406 	mrec_buf_t*		buf,		/*!< in/out: secondary buffer */
1407 	byte*			b,		/*!< in: pointer to end of
1408 						block */
1409 	int			fd,		/*!< in: file descriptor */
1410 	ulint*			foffs,		/*!< in/out: file offset */
1411 	const mrec_t*		mrec,		/*!< in: record to write */
1412 	const ulint*		offsets)	/*!< in: offsets of mrec */
1413 {
1414 	ulint	extra_size;
1415 	ulint	size;
1416 	ulint	avail_size;
1417 
1418 	ut_ad(block);
1419 	ut_ad(buf);
1420 	ut_ad(b >= &block[0]);
1421 	ut_ad(b < &block[srv_sort_buf_size]);
1422 	ut_ad(mrec);
1423 	ut_ad(foffs);
1424 	ut_ad(mrec < &block[0] || mrec > &block[srv_sort_buf_size]);
1425 	ut_ad(mrec < buf[0] || mrec > buf[1]);
1426 
1427 	/* Normalize extra_size.  Value 0 signals "end of list". */
1428 	extra_size = rec_offs_extra_size(offsets) + 1;
1429 
1430 	size = extra_size + (extra_size >= 0x80)
1431 		+ rec_offs_data_size(offsets);
1432 
1433 	if (UNIV_UNLIKELY(b + size >= &block[srv_sort_buf_size])) {
1434 		/* The record spans two blocks.
1435 		Copy it to the temporary buffer first. */
1436 		avail_size = &block[srv_sort_buf_size] - b;
1437 
1438 		row_merge_write_rec_low(buf[0],
1439 					extra_size, size, fd, *foffs,
1440 					mrec, offsets);
1441 
1442 		/* Copy the head of the temporary buffer, write
1443 		the completed block, and copy the tail of the
1444 		record to the head of the new block. */
1445 		memcpy(b, buf[0], avail_size);
1446 
1447 		if (!row_merge_write(fd, (*foffs)++, block, crypt_block,
1448 				     space_id)) {
1449 			return(NULL);
1450 		}
1451 
1452 		UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1453 
1454 		if (crypt_block) {
1455 			UNIV_MEM_INVALID(&crypt_block[0], srv_sort_buf_size);
1456 		}
1457 
1458 		/* Copy the rest. */
1459 		b = &block[0];
1460 		memcpy(b, buf[0] + avail_size, size - avail_size);
1461 		b += size - avail_size;
1462 	} else {
1463 		row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
1464 					mrec, offsets);
1465 		b += size;
1466 	}
1467 
1468 	return(b);
1469 }
1470 
1471 /********************************************************************//**
1472 Write an end-of-list marker.
1473 @return pointer to end of block, or NULL on error */
1474 static
1475 byte*
row_merge_write_eof(row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,byte * b,int fd,ulint * foffs)1476 row_merge_write_eof(
1477 /*================*/
1478 	row_merge_block_t*	block,		/*!< in/out: file buffer */
1479 	row_merge_block_t*	crypt_block, 	/*!< in: crypt buf or NULL */
1480 	ulint			space_id,	/*!< in: tablespace id */
1481 	byte*			b,		/*!< in: pointer to end of
1482 						block */
1483 	int			fd,		/*!< in: file descriptor */
1484 	ulint*			foffs)		/*!< in/out: file offset */
1485 {
1486 	ut_ad(block);
1487 	ut_ad(b >= &block[0]);
1488 	ut_ad(b < &block[srv_sort_buf_size]);
1489 	ut_ad(foffs);
1490 
1491 	DBUG_ENTER("row_merge_write_eof");
1492 	DBUG_PRINT("ib_merge_sort",
1493 		   ("%p,%p,fd=%d,%lu",
1494 		    reinterpret_cast<const void*>(b),
1495 		    reinterpret_cast<const void*>(block),
1496 		    fd, ulong(*foffs)));
1497 
1498 	*b++ = 0;
1499 	UNIV_MEM_ASSERT_RW(&block[0], b - &block[0]);
1500 	UNIV_MEM_ASSERT_W(&block[0], srv_sort_buf_size);
1501 #ifdef UNIV_DEBUG_VALGRIND
1502 	/* The rest of the block is uninitialized.  Initialize it
1503 	to avoid bogus warnings. */
1504 	memset(b, 0xff, &block[srv_sort_buf_size] - b);
1505 #endif /* UNIV_DEBUG_VALGRIND */
1506 
1507 	if (!row_merge_write(fd, (*foffs)++, block, crypt_block, space_id)) {
1508 		DBUG_RETURN(NULL);
1509 	}
1510 
1511 	UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1512 	DBUG_RETURN(&block[0]);
1513 }
1514 
1515 /** Create a temporary file if it has not been created already.
1516 @param[in,out]	tmpfd	temporary file handle
1517 @param[in]	path	location for creating temporary file
1518 @return file descriptor, or -1 on failure */
1519 static MY_ATTRIBUTE((warn_unused_result))
1520 int
row_merge_tmpfile_if_needed(int * tmpfd,const char * path)1521 row_merge_tmpfile_if_needed(
1522 	int*		tmpfd,
1523 	const char*	path)
1524 {
1525 	if (*tmpfd < 0) {
1526 		*tmpfd = row_merge_file_create_low(path);
1527 		if (*tmpfd >= 0) {
1528 			MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1529 		}
1530 	}
1531 
1532 	return(*tmpfd);
1533 }
1534 
1535 /** Create a temporary file for merge sort if it was not created already.
1536 @param[in,out]	file	merge file structure
1537 @param[in]	nrec	number of records in the file
1538 @param[in]	path	location for creating temporary file
1539 @return file descriptor, or -1 on failure */
1540 static MY_ATTRIBUTE((warn_unused_result))
1541 int
row_merge_file_create_if_needed(merge_file_t * file,int * tmpfd,ulint nrec,const char * path)1542 row_merge_file_create_if_needed(
1543 	merge_file_t*	file,
1544 	int*		tmpfd,
1545 	ulint		nrec,
1546 	const char*	path)
1547 {
1548 	ut_ad(file->fd < 0 || *tmpfd >=0);
1549 	if (file->fd < 0 && row_merge_file_create(file, path) >= 0) {
1550 		MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1551 		if (row_merge_tmpfile_if_needed(tmpfd, path) < 0) {
1552 			return(-1);
1553 		}
1554 
1555 		file->n_rec = nrec;
1556 	}
1557 
1558 	ut_ad(file->fd < 0 || *tmpfd >=0);
1559 	return(file->fd);
1560 }
1561 
1562 /** Copy the merge data tuple from another merge data tuple.
1563 @param[in]	mtuple		source merge data tuple
1564 @param[in,out]	prev_mtuple	destination merge data tuple
1565 @param[in]	n_unique	number of unique fields exist in the mtuple
1566 @param[in,out]	heap		memory heap where last_mtuple allocated */
1567 static
1568 void
row_mtuple_create(const mtuple_t * mtuple,mtuple_t * prev_mtuple,ulint n_unique,mem_heap_t * heap)1569 row_mtuple_create(
1570 	const mtuple_t*	mtuple,
1571 	mtuple_t*	prev_mtuple,
1572 	ulint		n_unique,
1573 	mem_heap_t*	heap)
1574 {
1575 	memcpy(prev_mtuple->fields, mtuple->fields,
1576 	       n_unique * sizeof *mtuple->fields);
1577 
1578 	dfield_t*	field = prev_mtuple->fields;
1579 
1580 	for (ulint i = 0; i < n_unique; i++) {
1581 		dfield_dup(field++, heap);
1582 	}
1583 }
1584 
1585 /** Compare two merge data tuples.
1586 @param[in]	prev_mtuple	merge data tuple
1587 @param[in]	current_mtuple	merge data tuple
1588 @param[in,out]	dup		reporter of duplicates
1589 @retval positive, 0, negative if current_mtuple is greater, equal, less, than
1590 last_mtuple. */
1591 static
1592 int
row_mtuple_cmp(const mtuple_t * prev_mtuple,const mtuple_t * current_mtuple,row_merge_dup_t * dup)1593 row_mtuple_cmp(
1594 	const mtuple_t*		prev_mtuple,
1595 	const mtuple_t*		current_mtuple,
1596 	row_merge_dup_t*	dup)
1597 {
1598 	ut_ad(dict_index_is_clust(dup->index));
1599 	const ulint	n_unique = dict_index_get_n_unique(dup->index);
1600 
1601 	return(row_merge_tuple_cmp(
1602 		       n_unique, n_unique, *current_mtuple, *prev_mtuple, dup));
1603 }
1604 
1605 /** Insert cached spatial index rows.
1606 @param[in]	trx_id		transaction id
1607 @param[in]	sp_tuples	cached spatial rows
1608 @param[in]	num_spatial	number of spatial indexes
1609 @param[in,out]	row_heap	heap for insert
1610 @param[in,out]	sp_heap		heap for tuples
1611 @param[in,out]	pcur		cluster index cursor
1612 @param[in,out]	mtr		mini transaction
1613 @param[in,out]	mtr_committed	whether scan_mtr got committed
1614 @return DB_SUCCESS or error number */
1615 static
1616 dberr_t
row_merge_spatial_rows(trx_id_t trx_id,index_tuple_info_t ** sp_tuples,ulint num_spatial,mem_heap_t * row_heap,mem_heap_t * sp_heap,btr_pcur_t * pcur,mtr_t * mtr,bool * mtr_committed)1617 row_merge_spatial_rows(
1618 	trx_id_t		trx_id,
1619 	index_tuple_info_t**	sp_tuples,
1620 	ulint			num_spatial,
1621 	mem_heap_t*		row_heap,
1622 	mem_heap_t*		sp_heap,
1623 	btr_pcur_t*		pcur,
1624 	mtr_t*			mtr,
1625 	bool*			mtr_committed)
1626 {
1627 	dberr_t			err = DB_SUCCESS;
1628 
1629 	if (sp_tuples == NULL) {
1630 		return(DB_SUCCESS);
1631 	}
1632 
1633 	ut_ad(sp_heap != NULL);
1634 
1635 	for (ulint j = 0; j < num_spatial; j++) {
1636 		err = sp_tuples[j]->insert(
1637 			trx_id, row_heap,
1638 			pcur, mtr, mtr_committed);
1639 
1640 		if (err != DB_SUCCESS) {
1641 			return(err);
1642 		}
1643 	}
1644 
1645 	mem_heap_empty(sp_heap);
1646 
1647 	return(err);
1648 }
1649 
1650 /** Check if the geometry field is valid.
1651 @param[in]	row		the row
1652 @param[in]	index		spatial index
1653 @return true if it's valid, false if it's invalid. */
1654 static
1655 bool
row_geo_field_is_valid(const dtuple_t * row,dict_index_t * index)1656 row_geo_field_is_valid(
1657 	const dtuple_t*		row,
1658 	dict_index_t*		index)
1659 {
1660 	const dict_field_t*	ind_field
1661 		= dict_index_get_nth_field(index, 0);
1662 	const dict_col_t*	col
1663 		= ind_field->col;
1664 	ulint			col_no
1665 		= dict_col_get_no(col);
1666 	const dfield_t*		dfield
1667 		= dtuple_get_nth_field(row, col_no);
1668 
1669 	if (dfield_is_null(dfield)
1670 	    || dfield_get_len(dfield) < GEO_DATA_HEADER_SIZE) {
1671 		return(false);
1672 	}
1673 
1674 	return(true);
1675 }
1676 
1677 /** Reads clustered index of the table and create temporary files
1678 containing the index entries for the indexes to be built.
1679 @param[in]	trx		transaction
1680 @param[in,out]	table		MySQL table object, for reporting erroneous
1681 records
1682 @param[in]	old_table	table where rows are read from
1683 @param[in]	new_table	table where indexes are created; identical to
1684 old_table unless creating a PRIMARY KEY
1685 @param[in]	online		true if creating indexes online
1686 @param[in]	index		indexes to be created
1687 @param[in]	fts_sort_idx	full-text index to be created, or NULL
1688 @param[in]	psort_info	parallel sort info for fts_sort_idx creation,
1689 or NULL
1690 @param[in]	files		temporary files
1691 @param[in]	key_numbers	MySQL key numbers to create
1692 @param[in]	n_index		number of indexes to create
1693 @param[in]	add_cols	default values of added columns, or NULL
1694 @param[in]	add_v		newly added virtual columns along with indexes
1695 @param[in]	col_map		mapping of old column numbers to new ones, or
1696 NULL if old_table == new_table
1697 @param[in]	add_autoinc	number of added AUTO_INCREMENT columns, or
1698 ULINT_UNDEFINED if none is added
1699 @param[in,out]	sequence	autoinc sequence
1700 @param[in,out]	block		file buffer
1701 @param[in,out]	crypt_block	encrypted file buffer
1702 @param[in]	skip_pk_sort	whether the new PRIMARY KEY will follow
1703 existing order
1704 @param[in,out]	tmpfd		temporary file handle
1705 @param[in,out]	stage		performance schema accounting object, used by
1706 ALTER TABLE. stage->n_pk_recs_inc() will be called for each record read and
1707 stage->inc() will be called for each page read.
1708 @param[in]	eval_table	mysql table used to evaluate virtual column
1709 				value, see innobase_get_computed_value().
1710 @param[in]	prebuilt	compress_heap must be taken from here
1711 @return DB_SUCCESS or error */
1712 static MY_ATTRIBUTE((warn_unused_result))
1713 dberr_t
row_merge_read_clustered_index(trx_t * trx,struct TABLE * table,const dict_table_t * old_table,const dict_table_t * new_table,bool online,dict_index_t ** index,dict_index_t * fts_sort_idx,fts_psort_t * psort_info,merge_file_t * files,const ulint * key_numbers,ulint n_index,const dtuple_t * add_cols,const dict_add_v_col_t * add_v,const ulint * col_map,ulint add_autoinc,ib_sequence_t & sequence,row_merge_block_t * block,row_merge_block_t * crypt_block,bool skip_pk_sort,int * tmpfd,ut_stage_alter_t * stage,struct TABLE * eval_table,row_prebuilt_t * prebuilt)1714 row_merge_read_clustered_index(
1715 	trx_t*			trx,
1716 	struct TABLE*		table,
1717 	const dict_table_t*	old_table,
1718 	const dict_table_t*	new_table,
1719 	bool			online,
1720 	dict_index_t**		index,
1721 	dict_index_t*		fts_sort_idx,
1722 	fts_psort_t*		psort_info,
1723 	merge_file_t*		files,
1724 	const ulint*		key_numbers,
1725 	ulint			n_index,
1726 	const dtuple_t*		add_cols,
1727 	const dict_add_v_col_t*	add_v,
1728 	const ulint*		col_map,
1729 	ulint			add_autoinc,
1730 	ib_sequence_t&		sequence,
1731 	row_merge_block_t*	block,
1732 	row_merge_block_t*	crypt_block,
1733 	bool			skip_pk_sort,
1734 	int*			tmpfd,
1735 	ut_stage_alter_t*	stage,
1736 	struct TABLE*		eval_table,
1737 	row_prebuilt_t*		prebuilt)
1738 {
1739 	dict_index_t*		clust_index;	/* Clustered index */
1740 	mem_heap_t*		row_heap;	/* Heap memory to create
1741 						clustered index tuples */
1742 	row_merge_buf_t**	merge_buf;	/* Temporary list for records*/
1743 	mem_heap_t*		v_heap = NULL;	/* Heap memory to process large
1744 						data for virtual column */
1745 	btr_pcur_t		pcur;		/* Cursor on the clustered
1746 						index */
1747 	mtr_t			mtr;		/* Mini transaction */
1748 	dberr_t			err = DB_SUCCESS;/* Return code */
1749 	ulint			n_nonnull = 0;	/* number of columns
1750 						changed to NOT NULL */
1751 	ulint*			nonnull = NULL;	/* NOT NULL columns */
1752 	dict_index_t*		fts_index = NULL;/* FTS index */
1753 	doc_id_t		doc_id = 0;
1754 	doc_id_t		max_doc_id = 0;
1755 	ibool			add_doc_id = FALSE;
1756 	os_event_t		fts_parallel_sort_event = NULL;
1757 	ibool			fts_pll_sort = FALSE;
1758 	int64_t			sig_count = 0;
1759 	index_tuple_info_t**	sp_tuples = NULL;
1760 	mem_heap_t*		sp_heap = NULL;
1761 	ulint			num_spatial = 0;
1762 	BtrBulk*		clust_btr_bulk = NULL;
1763 	bool			clust_temp_file = false;
1764 	mem_heap_t*		mtuple_heap = NULL;
1765 	mtuple_t		prev_mtuple;
1766 	mem_heap_t*		conv_heap = NULL;
1767 	FlushObserver*		observer = trx->flush_observer;
1768 	DBUG_ENTER("row_merge_read_clustered_index");
1769 
1770 	ut_ad((old_table == new_table) == !col_map);
1771 	ut_ad(!add_cols || col_map);
1772 
1773 	trx->op_info = "reading clustered index";
1774 
1775 #ifdef FTS_INTERNAL_DIAG_PRINT
1776 	DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
1777 #endif
1778 
1779 	/* Create and initialize memory for record buffers */
1780 
1781 	merge_buf = static_cast<row_merge_buf_t**>(
1782 		ut_malloc_nokey(n_index * sizeof *merge_buf));
1783 
1784 	row_merge_dup_t	clust_dup = {index[0], table, col_map, 0};
1785 	dfield_t*	prev_fields;
1786 	const ulint	n_uniq = dict_index_get_n_unique(index[0]);
1787 
1788 	ut_ad(trx->mysql_thd != NULL);
1789 
1790 	const char*	path = thd_innodb_tmpdir(trx->mysql_thd);
1791 
1792 	ut_ad(!skip_pk_sort || dict_index_is_clust(index[0]));
1793 	/* There is no previous tuple yet. */
1794 	prev_mtuple.fields = NULL;
1795 
1796 	for (ulint i = 0; i < n_index; i++) {
1797 		if (index[i]->type & DICT_FTS) {
1798 
1799 			/* We are building a FT index, make sure
1800 			we have the temporary 'fts_sort_idx' */
1801 			ut_a(fts_sort_idx);
1802 
1803 			fts_index = index[i];
1804 
1805 			merge_buf[i] = row_merge_buf_create(fts_sort_idx);
1806 
1807 			add_doc_id = DICT_TF2_FLAG_IS_SET(
1808 				new_table, DICT_TF2_FTS_ADD_DOC_ID);
1809 
1810 			/* If Doc ID does not exist in the table itself,
1811 			fetch the first FTS Doc ID */
1812 			if (add_doc_id) {
1813 				fts_get_next_doc_id(
1814 					(dict_table_t*) new_table,
1815 					&doc_id);
1816 				ut_ad(doc_id > 0);
1817 			}
1818 
1819 			fts_pll_sort = TRUE;
1820 			row_fts_start_psort(psort_info);
1821 			fts_parallel_sort_event =
1822 				 psort_info[0].psort_common->sort_event;
1823 		} else {
1824 			if (dict_index_is_spatial(index[i])) {
1825 				num_spatial++;
1826 			}
1827 
1828 			merge_buf[i] = row_merge_buf_create(index[i]);
1829 		}
1830 	}
1831 
1832 	if (num_spatial > 0) {
1833 		ulint	count = 0;
1834 
1835 		sp_heap = mem_heap_create(512);
1836 
1837 		sp_tuples = static_cast<index_tuple_info_t**>(
1838 			ut_malloc_nokey(num_spatial
1839 					* sizeof(*sp_tuples)));
1840 
1841 		for (ulint i = 0; i < n_index; i++) {
1842 			if (dict_index_is_spatial(index[i])) {
1843 				sp_tuples[count]
1844 					= UT_NEW_NOKEY(
1845 						index_tuple_info_t(
1846 							sp_heap,
1847 							index[i]));
1848 				count++;
1849 			}
1850 		}
1851 
1852 		ut_ad(count == num_spatial);
1853 	}
1854 
1855 	mtr_start(&mtr);
1856 
1857 	/* Find the clustered index and create a persistent cursor
1858 	based on that. */
1859 
1860 	clust_index = dict_table_get_first_index(old_table);
1861 
1862 	btr_pcur_open_at_index_side(
1863 		true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
1864 
1865 	if (old_table != new_table) {
1866 		/* The table is being rebuilt.  Identify the columns
1867 		that were flagged NOT NULL in the new table, so that
1868 		we can quickly check that the records in the old table
1869 		do not violate the added NOT NULL constraints. */
1870 
1871 		nonnull = static_cast<ulint*>(
1872 			ut_malloc_nokey(dict_table_get_n_cols(new_table)
1873 				  * sizeof *nonnull));
1874 
1875 		for (ulint i = 0; i < dict_table_get_n_cols(old_table); i++) {
1876 			if (dict_table_get_nth_col(old_table, i)->prtype
1877 			    & DATA_NOT_NULL) {
1878 				continue;
1879 			}
1880 
1881 			const ulint j = col_map[i];
1882 
1883 			if (j == ULINT_UNDEFINED) {
1884 				/* The column was dropped. */
1885 				continue;
1886 			}
1887 
1888 			if (dict_table_get_nth_col(new_table, j)->prtype
1889 			    & DATA_NOT_NULL) {
1890 				nonnull[n_nonnull++] = j;
1891 			}
1892 		}
1893 
1894 		if (!n_nonnull) {
1895 			ut_free(nonnull);
1896 			nonnull = NULL;
1897 		}
1898 	}
1899 
1900 	row_heap = mem_heap_create(sizeof(mrec_buf_t));
1901 
1902 	if (dict_table_is_comp(old_table)
1903 	    && !dict_table_is_comp(new_table)) {
1904 		conv_heap = mem_heap_create(sizeof(mrec_buf_t));
1905 	}
1906 
1907 	if (skip_pk_sort) {
1908 		prev_fields = static_cast<dfield_t*>(
1909 			ut_malloc_nokey(n_uniq * sizeof *prev_fields));
1910 		mtuple_heap = mem_heap_create(sizeof(mrec_buf_t));
1911 	} else {
1912 		prev_fields = NULL;
1913 	}
1914 
1915 	/* Scan the clustered index. */
1916 	for (;;) {
1917 		const rec_t*	rec;
1918 		ulint*		offsets;
1919 		const dtuple_t*	row;
1920 		row_ext_t*	ext;
1921 		page_cur_t*	cur	= btr_pcur_get_page_cur(&pcur);
1922 
1923 		mem_heap_empty(row_heap);
1924 
1925 		/* Do not continue if table pages are still encrypted */
1926 		if (!old_table->is_readable() ||
1927 		    !new_table->is_readable()) {
1928 			err = DB_DECRYPTION_FAILED;
1929 			trx->error_key_num = 0;
1930 			goto func_exit;
1931 		}
1932 
1933 		page_cur_move_to_next(cur);
1934 
1935 		stage->n_pk_recs_inc();
1936 
1937 		if (page_cur_is_after_last(cur)) {
1938 
1939 			stage->inc();
1940 
1941 			if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
1942 				err = DB_INTERRUPTED;
1943 				trx->error_key_num = 0;
1944 				goto func_exit;
1945 			}
1946 
1947 			if (online && old_table != new_table) {
1948 				err = row_log_table_get_error(clust_index);
1949 				if (err != DB_SUCCESS) {
1950 					trx->error_key_num = 0;
1951 					goto func_exit;
1952 				}
1953 			}
1954 
1955 #ifdef NDEBUG
1956 # define dbug_run_purge	false
1957 #else /* NDEBUG */
1958 			bool	dbug_run_purge = false;
1959 #endif /* NDEBUG */
1960 			DBUG_EXECUTE_IF(
1961 				"ib_purge_on_create_index_page_switch",
1962 				dbug_run_purge = true;);
1963 
1964 			/* Insert the cached spatial index rows. */
1965 			bool	mtr_committed = false;
1966 
1967 			err = row_merge_spatial_rows(
1968 				trx->id, sp_tuples, num_spatial,
1969 				row_heap, sp_heap, &pcur,
1970 				&mtr, &mtr_committed);
1971 
1972 			if (err != DB_SUCCESS) {
1973 				goto func_exit;
1974 			}
1975 
1976 			if (mtr_committed) {
1977 				goto scan_next;
1978 			}
1979 
1980 			if (dbug_run_purge
1981 			    || rw_lock_get_waiters(
1982 				    dict_index_get_lock(clust_index))) {
1983 				/* There are waiters on the clustered
1984 				index tree lock, likely the purge
1985 				thread. Store and restore the cursor
1986 				position, and yield so that scanning a
1987 				large table will not starve other
1988 				threads. */
1989 
1990 				/* Store the cursor position on the last user
1991 				record on the page. */
1992 				btr_pcur_move_to_prev_on_page(&pcur);
1993 				/* Leaf pages must never be empty, unless
1994 				this is the only page in the index tree. */
1995 				ut_ad(btr_pcur_is_on_user_rec(&pcur)
1996 				      || btr_pcur_get_block(
1997 					      &pcur)->page.id.page_no()
1998 				      == clust_index->page);
1999 
2000 				btr_pcur_store_position(&pcur, &mtr);
2001 				mtr_commit(&mtr);
2002 
2003 				if (dbug_run_purge) {
2004 					/* This is for testing
2005 					purposes only (see
2006 					DBUG_EXECUTE_IF above).  We
2007 					signal the purge thread and
2008 					hope that the purge batch will
2009 					complete before we execute
2010 					btr_pcur_restore_position(). */
2011 					trx_purge_run();
2012 					os_thread_sleep(1000000);
2013 				}
2014 
2015 				/* Give the waiters a chance to proceed. */
2016 				os_thread_yield();
2017 scan_next:
2018 				mtr_start(&mtr);
2019 				/* Restore position on the record, or its
2020 				predecessor if the record was purged
2021 				meanwhile. */
2022 				btr_pcur_restore_position(
2023 					BTR_SEARCH_LEAF, &pcur, &mtr);
2024 				/* Move to the successor of the
2025 				original record. */
2026 				if (!btr_pcur_move_to_next_user_rec(
2027 					    &pcur, &mtr)) {
2028 end_of_index:
2029 					row = NULL;
2030 					mtr_commit(&mtr);
2031 					mem_heap_free(row_heap);
2032 					ut_free(nonnull);
2033 					goto write_buffers;
2034 				}
2035 			} else {
2036 				ulint		next_page_no;
2037 				buf_block_t*	block;
2038 
2039 				next_page_no = btr_page_get_next(
2040 					page_cur_get_page(cur), &mtr);
2041 
2042 				if (next_page_no == FIL_NULL) {
2043 					goto end_of_index;
2044 				}
2045 
2046 				block = page_cur_get_block(cur);
2047 				block = btr_block_get(
2048 					page_id_t(block->page.id.space(),
2049 						  next_page_no),
2050 					block->page.size,
2051 					BTR_SEARCH_LEAF,
2052 					clust_index, &mtr);
2053 
2054 				btr_leaf_page_release(page_cur_get_block(cur),
2055 						      BTR_SEARCH_LEAF, &mtr);
2056 				page_cur_set_before_first(block, cur);
2057 				page_cur_move_to_next(cur);
2058 
2059 				ut_ad(!page_cur_is_after_last(cur));
2060 			}
2061 		}
2062 
2063 		rec = page_cur_get_rec(cur);
2064 
2065 		SRV_CORRUPT_TABLE_CHECK(rec,
2066 		{
2067 			err = DB_CORRUPTION;
2068 			goto func_exit;
2069 		});
2070 
2071 		offsets = rec_get_offsets(rec, clust_index, NULL,
2072 					  ULINT_UNDEFINED, &row_heap);
2073 
2074 		if (online) {
2075 			/* Perform a REPEATABLE READ.
2076 
2077 			When rebuilding the table online,
2078 			row_log_table_apply() must not see a newer
2079 			state of the table when applying the log.
2080 			This is mainly to prevent false duplicate key
2081 			errors, because the log will identify records
2082 			by the PRIMARY KEY, and also to prevent unsafe
2083 			BLOB access.
2084 
2085 			When creating a secondary index online, this
2086 			table scan must not see records that have only
2087 			been inserted to the clustered index, but have
2088 			not been written to the online_log of
2089 			index[]. If we performed READ UNCOMMITTED, it
2090 			could happen that the ADD INDEX reaches
2091 			ONLINE_INDEX_COMPLETE state between the time
2092 			the DML thread has updated the clustered index
2093 			but has not yet accessed secondary index. */
2094 			ut_ad(MVCC::is_view_active(trx->read_view));
2095 
2096 			if (!trx->read_view->changes_visible(
2097 				    row_get_rec_trx_id(
2098 					    rec, clust_index, offsets),
2099 				    old_table->name)) {
2100 				rec_t*	old_vers;
2101 
2102 				row_vers_build_for_consistent_read(
2103 					rec, &mtr, clust_index, &offsets,
2104 					trx->read_view, &row_heap,
2105 					row_heap, &old_vers, NULL);
2106 
2107 				rec = old_vers;
2108 
2109 				if (!rec) {
2110 					continue;
2111 				}
2112 			}
2113 
2114 			if (rec_get_deleted_flag(
2115 				    rec,
2116 				    dict_table_is_comp(old_table))) {
2117 				/* This record was deleted in the latest
2118 				committed version, or it was deleted and
2119 				then reinserted-by-update before purge
2120 				kicked in. Skip it. */
2121 				continue;
2122 			}
2123 
2124 			ut_ad(!rec_offs_any_null_extern(rec, offsets));
2125 		} else if (rec_get_deleted_flag(
2126 				   rec, dict_table_is_comp(old_table))) {
2127 			/* Skip delete-marked records.
2128 
2129 			Skipping delete-marked records will make the
2130 			created indexes unuseable for transactions
2131 			whose read views were created before the index
2132 			creation completed, but preserving the history
2133 			would make it tricky to detect duplicate
2134 			keys. */
2135 			continue;
2136 		}
2137 
2138 		/* When !online, we are holding a lock on old_table, preventing
2139 		any inserts that could have written a record 'stub' before
2140 		writing out off-page columns. */
2141 		ut_ad(!rec_offs_any_null_extern(rec, offsets));
2142 
2143 		/* Build a row based on the clustered index. */
2144 
2145 		row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index,
2146 					   rec, offsets, new_table,
2147 					   add_cols, add_v, col_map, &ext,
2148 					   row_heap);
2149 		ut_ad(row);
2150 
2151 		for (ulint i = 0; i < n_nonnull; i++) {
2152 			const dfield_t*	field	= &row->fields[nonnull[i]];
2153 
2154 			ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL);
2155 
2156 			if (dfield_is_null(field)) {
2157 				err = DB_INVALID_NULL;
2158 				trx->error_key_num = 0;
2159 				goto func_exit;
2160 			}
2161 		}
2162 
2163 		/* Get the next Doc ID */
2164 		if (add_doc_id) {
2165 			doc_id++;
2166 		} else {
2167 			doc_id = 0;
2168 		}
2169 
2170 		if (add_autoinc != ULINT_UNDEFINED) {
2171 
2172 			ut_ad(add_autoinc
2173 			      < dict_table_get_n_user_cols(new_table));
2174 
2175 			const dfield_t*	dfield;
2176 
2177 			dfield = dtuple_get_nth_field(row, add_autoinc);
2178 			if (dfield_is_null(dfield)) {
2179 				goto write_buffers;
2180 			}
2181 
2182 			const dtype_t*  dtype = dfield_get_type(dfield);
2183 			byte*	b = static_cast<byte*>(dfield_get_data(dfield));
2184 
2185 			if (sequence.eof()) {
2186 				err = DB_ERROR;
2187 				trx->error_key_num = 0;
2188 
2189 				ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
2190 					ER_AUTOINC_READ_FAILED, "[NULL]");
2191 
2192 				goto func_exit;
2193 			}
2194 
2195 			ulonglong	value = sequence++;
2196 
2197 			switch (dtype_get_mtype(dtype)) {
2198 			case DATA_INT: {
2199 				ibool	usign;
2200 				ulint	len = dfield_get_len(dfield);
2201 
2202 				usign = dtype_get_prtype(dtype) & DATA_UNSIGNED;
2203 				mach_write_ulonglong(b, value, len, usign);
2204 
2205 				break;
2206 				}
2207 
2208 			case DATA_FLOAT:
2209 				mach_float_write(
2210 					b, static_cast<float>(value));
2211 				break;
2212 
2213 			case DATA_DOUBLE:
2214 				mach_double_write(
2215 					b, static_cast<double>(value));
2216 				break;
2217 
2218 			default:
2219 				ut_ad(0);
2220 			}
2221 		}
2222 
2223 write_buffers:
2224 		/* Build all entries for all the indexes to be created
2225 		in a single scan of the clustered index. */
2226 
2227 		ulint	s_idx_cnt = 0;
2228 		bool	skip_sort = skip_pk_sort
2229 			&& dict_index_is_clust(merge_buf[0]->index);
2230 
2231 		for (ulint i = 0; i < n_index; i++, skip_sort = false) {
2232 			row_merge_buf_t*	buf	= merge_buf[i];
2233 			merge_file_t*		file	= &files[i];
2234 			ulint			rows_added = 0;
2235 
2236 			if (dict_index_is_spatial(buf->index)) {
2237 				if (!row) {
2238 					continue;
2239 				}
2240 
2241 				ut_ad(sp_tuples[s_idx_cnt]->get_index()
2242 				      == buf->index);
2243 
2244 				/* If the geometry field is invalid, report
2245 				error. */
2246 				if (!row_geo_field_is_valid(row, buf->index)) {
2247 					err = DB_CANT_CREATE_GEOMETRY_OBJECT;
2248 					break;
2249 				}
2250 
2251 				sp_tuples[s_idx_cnt]->add(row, ext);
2252 				s_idx_cnt++;
2253 
2254 				continue;
2255 			}
2256 
2257 			if (UNIV_LIKELY
2258 			    (row && (rows_added = row_merge_buf_add(
2259 					buf, fts_index, old_table, new_table,
2260 					psort_info, row, ext, &doc_id,
2261 					conv_heap, &err, &v_heap, eval_table,
2262 					trx, prebuilt)))) {
2263 
2264 				/* If we are creating FTS index,
2265 				a single row can generate more
2266 				records for tokenized word */
2267 				file->n_rec += rows_added;
2268 
2269 				if (err != DB_SUCCESS) {
2270 					ut_ad(err == DB_TOO_BIG_RECORD);
2271 					break;
2272 				}
2273 
2274 				if (doc_id > max_doc_id) {
2275 					max_doc_id = doc_id;
2276 				}
2277 
2278 				if (buf->index->type & DICT_FTS) {
2279 					/* Check if error occurs in child thread */
2280 					for (ulint j = 0;
2281 					     j < fts_sort_pll_degree; j++) {
2282 						if (psort_info[j].error
2283 							!= DB_SUCCESS) {
2284 							err = psort_info[j].error;
2285 							trx->error_key_num = i;
2286 							break;
2287 						}
2288 					}
2289 
2290 					if (err != DB_SUCCESS) {
2291 						break;
2292 					}
2293 				}
2294 
2295 				if (skip_sort) {
2296 					ut_ad(buf->n_tuples > 0);
2297 					const mtuple_t*	curr =
2298 						&buf->tuples[buf->n_tuples - 1];
2299 
2300 					ut_ad(i == 0);
2301 					ut_ad(dict_index_is_clust(merge_buf[0]->index));
2302 					/* Detect duplicates by comparing the
2303 					current record with previous record.
2304 					When temp file is not used, records
2305 					should be in sorted order. */
2306 					if (prev_mtuple.fields != NULL
2307 					    && (row_mtuple_cmp(
2308 						&prev_mtuple, curr,
2309 						&clust_dup) == 0)) {
2310 
2311 						err = DB_DUPLICATE_KEY;
2312 						trx->error_key_num
2313 							= key_numbers[0];
2314 						goto func_exit;
2315 					}
2316 
2317 					prev_mtuple.fields = curr->fields;
2318 				}
2319 
2320 				continue;
2321 			}
2322 
2323 			if (err == DB_COMPUTE_VALUE_FAILED) {
2324 				trx->error_key_num = i;
2325 				goto func_exit;
2326 			}
2327 
2328 			if (buf->index->type & DICT_FTS) {
2329 				if (!row || !doc_id) {
2330 					continue;
2331 				}
2332 			}
2333 
2334 			/* The buffer must be sufficiently large
2335 			to hold at least one record. It may only
2336 			be empty when we reach the end of the
2337 			clustered index. row_merge_buf_add()
2338 			must not have been called in this loop. */
2339 			ut_ad(buf->n_tuples || row == NULL);
2340 
2341 			/* We have enough data tuples to form a block.
2342 			Sort them and write to disk if temp file is used
2343 			or insert into index if temp file is not used. */
2344 			ut_ad(old_table == new_table
2345 			      ? !dict_index_is_clust(buf->index)
2346 			      : (i == 0) == dict_index_is_clust(buf->index));
2347 
2348 			/* We have enough data tuples to form a block.
2349 			Sort them (if !skip_sort) and write to disk. */
2350 
2351 			if (buf->n_tuples) {
2352 				if (skip_sort) {
2353 					/* Temporary File is not used.
2354 					so insert sorted block to the index */
2355 					if (row != NULL) {
2356 						bool	mtr_committed = false;
2357 
2358 						/* We have to do insert the
2359 						cached spatial index rows, since
2360 						after the mtr_commit, the cluster
2361 						index page could be updated, then
2362 						the data in cached rows become
2363 						invalid. */
2364 						err = row_merge_spatial_rows(
2365 							trx->id, sp_tuples,
2366 							num_spatial,
2367 							row_heap, sp_heap,
2368 							&pcur, &mtr,
2369 							&mtr_committed);
2370 
2371 						if (err != DB_SUCCESS) {
2372 							goto func_exit;
2373 						}
2374 
2375 						/* We are not at the end of
2376 						the scan yet. We must
2377 						mtr_commit() in order to be
2378 						able to call log_free_check()
2379 						in row_merge_insert_index_tuples().
2380 						Due to mtr_commit(), the
2381 						current row will be invalid, and
2382 						we must reread it on the next
2383 						loop iteration. */
2384 						if (!mtr_committed) {
2385 							btr_pcur_move_to_prev_on_page(
2386 								&pcur);
2387 							btr_pcur_store_position(
2388 								&pcur, &mtr);
2389 
2390 							mtr_commit(&mtr);
2391 						}
2392 					}
2393 
2394 					mem_heap_empty(mtuple_heap);
2395 					prev_mtuple.fields = prev_fields;
2396 
2397 					row_mtuple_create(
2398 						&buf->tuples[buf->n_tuples - 1],
2399 						&prev_mtuple, n_uniq,
2400 						mtuple_heap);
2401 
2402 					if (clust_btr_bulk == NULL) {
2403 						clust_btr_bulk = UT_NEW_NOKEY(
2404 							BtrBulk(index[i],
2405 								trx->id,
2406 								observer));
2407 
2408 						clust_btr_bulk->init();
2409 					} else {
2410 						clust_btr_bulk->latch();
2411 					}
2412 
2413 					err = row_merge_insert_index_tuples(
2414 						trx->id, index[i], old_table,
2415 						-1, NULL, NULL,
2416 						new_table->space, buf,
2417 						clust_btr_bulk);
2418 
2419 					if (row == NULL) {
2420 						err = clust_btr_bulk->finish(
2421 							err);
2422 						UT_DELETE(clust_btr_bulk);
2423 						clust_btr_bulk = NULL;
2424 					} else {
2425 						/* Release latches for possible
2426 						log_free_chck in spatial index
2427 						build. */
2428 						clust_btr_bulk->release();
2429 					}
2430 
2431 					if (err != DB_SUCCESS) {
2432 						break;
2433 					}
2434 
2435 					if (row != NULL) {
2436 						/* Restore the cursor on the
2437 						previous clustered index record,
2438 						and empty the buffer. The next
2439 						iteration of the outer loop will
2440 						advance the cursor and read the
2441 						next record (the one which we
2442 						had to ignore due to the buffer
2443 						overflow). */
2444 						mtr_start(&mtr);
2445 						btr_pcur_restore_position(
2446 							BTR_SEARCH_LEAF, &pcur,
2447 							&mtr);
2448 						buf = row_merge_buf_empty(buf);
2449 						/* Restart the outer loop on the
2450 						record. We did not insert it
2451 						into any index yet. */
2452 						ut_ad(i == 0);
2453 						break;
2454 					}
2455 				} else if (dict_index_is_unique(buf->index)) {
2456 					row_merge_dup_t	dup = {
2457 						buf->index, table, col_map, 0};
2458 
2459 					row_merge_buf_sort(buf, &dup);
2460 
2461 					if (dup.n_dup) {
2462 						err = DB_DUPLICATE_KEY;
2463 						trx->error_key_num
2464 							= key_numbers[i];
2465 						break;
2466 					}
2467 				} else {
2468 					row_merge_buf_sort(buf, NULL);
2469 				}
2470 			} else if (online && new_table == old_table) {
2471 				/* Note the newest transaction that
2472 				modified this index when the scan was
2473 				completed. We prevent older readers
2474 				from accessing this index, to ensure
2475 				read consistency. */
2476 
2477 				trx_id_t	max_trx_id;
2478 
2479 				ut_a(row == NULL);
2480 				rw_lock_x_lock(
2481 					dict_index_get_lock(buf->index));
2482 				ut_a(dict_index_get_online_status(buf->index)
2483 				     == ONLINE_INDEX_CREATION);
2484 
2485 				max_trx_id = row_log_get_max_trx(buf->index);
2486 
2487 				if (max_trx_id > buf->index->trx_id) {
2488 					buf->index->trx_id = max_trx_id;
2489 				}
2490 
2491 				rw_lock_x_unlock(
2492 					dict_index_get_lock(buf->index));
2493 			}
2494 
2495 			/* Secondary index and clustered index which is
2496 			not in sorted order can use the temporary file.
2497 			Fulltext index should not use the temporary file. */
2498 			if (!skip_sort && !(buf->index->type & DICT_FTS)) {
2499 				/* In case we can have all rows in sort buffer,
2500 				we can insert directly into the index without
2501 				temporary file if clustered index does not uses
2502 				temporary file. */
2503 				if (row == NULL && file->fd == -1
2504 				    && !clust_temp_file) {
2505 					DBUG_EXECUTE_IF(
2506 						"row_merge_write_failure",
2507 						err = DB_TEMP_FILE_WRITE_FAIL;
2508 						trx->error_key_num = i;
2509 						goto all_done;);
2510 
2511 					DBUG_EXECUTE_IF(
2512 						"row_merge_tmpfile_fail",
2513 						err = DB_OUT_OF_MEMORY;
2514 						trx->error_key_num = i;
2515 						goto all_done;);
2516 
2517 					BtrBulk	btr_bulk(index[i], trx->id,
2518 							 observer);
2519 					btr_bulk.init();
2520 
2521 					err = row_merge_insert_index_tuples(
2522 						trx->id, index[i], old_table,
2523 						-1, NULL, NULL,
2524 						new_table->space, buf,
2525 						&btr_bulk);
2526 
2527 					err = btr_bulk.finish(err);
2528 
2529 					DBUG_EXECUTE_IF(
2530 						"row_merge_insert_big_row",
2531 						err = DB_TOO_BIG_RECORD;);
2532 
2533 					if (err != DB_SUCCESS) {
2534 						break;
2535 					}
2536 				} else {
2537 					if (row_merge_file_create_if_needed(
2538 						file, tmpfd,
2539 						buf->n_tuples, path) < 0) {
2540 						err = DB_OUT_OF_MEMORY;
2541 						trx->error_key_num = i;
2542 						goto func_exit;
2543 					}
2544 
2545 					/* Ensure that duplicates in the
2546 					clustered index will be detected before
2547 					inserting secondary index records. */
2548 					if (dict_index_is_clust(buf->index)) {
2549 						clust_temp_file = true;
2550 					}
2551 
2552 					ut_ad(file->n_rec > 0);
2553 
2554 					row_merge_buf_write(buf, file, block);
2555 
2556 					if (!row_merge_write(
2557 						    file->fd, file->offset++,
2558 						    block, crypt_block,
2559 						    new_table->space)) {
2560 						err = DB_TEMP_FILE_WRITE_FAIL;
2561 						trx->error_key_num = i;
2562 						break;
2563 					}
2564 
2565 					UNIV_MEM_INVALID(
2566 						&block[0], srv_sort_buf_size);
2567 				}
2568 			}
2569 			merge_buf[i] = row_merge_buf_empty(buf);
2570 
2571 			if (UNIV_LIKELY(row != NULL)) {
2572 				/* Try writing the record again, now
2573 				that the buffer has been written out
2574 				and emptied. */
2575 
2576 				if (UNIV_UNLIKELY
2577 				    (!(rows_added = row_merge_buf_add(
2578 						buf, fts_index, old_table,
2579 						new_table, psort_info, row, ext,
2580 						&doc_id, conv_heap,
2581 						&err, &v_heap, table, trx,
2582 						prebuilt)))) {
2583 					/* An empty buffer should have enough
2584 					room for at least one record. */
2585 					ut_error;
2586 				}
2587 
2588 				if (err != DB_SUCCESS) {
2589 					break;
2590 				}
2591 
2592 				file->n_rec += rows_added;
2593 			}
2594 		}
2595 
2596 		if (row == NULL) {
2597 			goto all_done;
2598 		}
2599 
2600 		if (err != DB_SUCCESS) {
2601 			goto func_exit;
2602 		}
2603 
2604 		if (v_heap) {
2605 			mem_heap_empty(v_heap);
2606 		}
2607 	}
2608 
2609 func_exit:
2610 	/* row_merge_spatial_rows may have committed
2611 	the mtr	before an error occurs. */
2612 	if (mtr.is_active()) {
2613 		mtr_commit(&mtr);
2614 	}
2615 	mem_heap_free(row_heap);
2616 	ut_free(nonnull);
2617 
2618 all_done:
2619 	if (clust_btr_bulk != NULL) {
2620 		ut_ad(err != DB_SUCCESS);
2621 		clust_btr_bulk->latch();
2622 		err = clust_btr_bulk->finish(
2623 			err);
2624 		UT_DELETE(clust_btr_bulk);
2625 	}
2626 
2627 	if (prev_fields != NULL) {
2628 		ut_free(prev_fields);
2629 		mem_heap_free(mtuple_heap);
2630 	}
2631 
2632 	if (v_heap) {
2633 		mem_heap_free(v_heap);
2634 	}
2635 
2636 	if (conv_heap != NULL) {
2637 		mem_heap_free(conv_heap);
2638 	}
2639 
2640 #ifdef FTS_INTERNAL_DIAG_PRINT
2641 	DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
2642 #endif
2643 	if (fts_pll_sort) {
2644 		bool	all_exit = false;
2645 		ulint	trial_count = 0;
2646 		const ulint max_trial_count = 10000;
2647 
2648 wait_again:
2649                 /* Check if error occurs in child thread */
2650 		for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2651 			if (psort_info[j].error != DB_SUCCESS) {
2652 				err = psort_info[j].error;
2653 				trx->error_key_num = j;
2654 				break;
2655 			}
2656 		}
2657 
2658 		/* Tell all children that parent has done scanning */
2659 		for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2660 			if (err == DB_SUCCESS) {
2661 				psort_info[i].state = FTS_PARENT_COMPLETE;
2662 			} else {
2663 				psort_info[i].state = FTS_PARENT_EXITING;
2664 			}
2665 		}
2666 
2667 		/* Now wait all children to report back to be completed */
2668 		os_event_wait_time_low(fts_parallel_sort_event,
2669 				       1000000, sig_count);
2670 
2671 		for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2672 			if (psort_info[i].child_status != FTS_CHILD_COMPLETE
2673 			    && psort_info[i].child_status != FTS_CHILD_EXITING) {
2674 				sig_count = os_event_reset(
2675 					fts_parallel_sort_event);
2676 				goto wait_again;
2677 			}
2678 		}
2679 
2680 		/* Now all children should complete, wait a bit until
2681 		they all finish setting the event, before we free everything.
2682 		This has a 10 second timeout */
2683 		do {
2684 			all_exit = true;
2685 
2686 			for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2687 				if (psort_info[j].child_status
2688 				    != FTS_CHILD_EXITING) {
2689 					all_exit = false;
2690 					os_thread_sleep(1000);
2691 					break;
2692 				}
2693 			}
2694 			trial_count++;
2695 		} while (!all_exit && trial_count < max_trial_count);
2696 
2697 		if (!all_exit) {
2698 			ib::fatal() << "Not all child sort threads exited"
2699 				" when creating FTS index '"
2700 				<< fts_sort_idx->name << "'";
2701 		}
2702 	}
2703 
2704 #ifdef FTS_INTERNAL_DIAG_PRINT
2705 	DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Tokenization\n");
2706 #endif
2707 	for (ulint i = 0; i < n_index; i++) {
2708 		row_merge_buf_free(merge_buf[i]);
2709 	}
2710 
2711 	row_fts_free_pll_merge_buf(psort_info);
2712 
2713 	ut_free(merge_buf);
2714 
2715 	btr_pcur_close(&pcur);
2716 
2717 	if (sp_tuples != NULL) {
2718 		for (ulint i = 0; i < num_spatial; i++) {
2719 			UT_DELETE(sp_tuples[i]);
2720 		}
2721 		ut_free(sp_tuples);
2722 
2723 		if (sp_heap) {
2724 			mem_heap_free(sp_heap);
2725 		}
2726 	}
2727 
2728 	/* Update the next Doc ID we used. Table should be locked, so
2729 	no concurrent DML */
2730 	if (max_doc_id && err == DB_SUCCESS) {
2731 		/* Sync fts cache for other fts indexes to keep all
2732 		fts indexes consistent in sync_doc_id. */
2733 		err = fts_sync_table(const_cast<dict_table_t*>(new_table),
2734 				     false, true, false);
2735 
2736 		if (err == DB_SUCCESS) {
2737 			fts_update_next_doc_id(
2738 				0, new_table,
2739 				old_table->name.m_name, max_doc_id);
2740 		}
2741 	}
2742 
2743 	trx->op_info = "";
2744 
2745 	DBUG_RETURN(err);
2746 }
2747 
2748 /** Write a record via buffer 2 and read the next record to buffer N.
2749 @param N number of the buffer (0 or 1)
2750 @param INDEX record descriptor
2751 @param AT_END statement to execute at end of input */
2752 #define ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END)			\
2753 	do {								\
2754 		b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], \
2755 					 crypt_block ?			\
2756 					 &crypt_block[2 * srv_sort_buf_size] :\
2757 					 NULL,				\
2758 					 space_id,			\
2759 					 &buf[2], b2,			\
2760 					 of->fd, &of->offset,		\
2761 					 mrec##N, offsets##N);		\
2762 		if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) {	\
2763 			goto corrupt;					\
2764 		}							\
2765 		b##N = row_merge_read_rec(&block[N * srv_sort_buf_size],\
2766 					  crypt_block ?			\
2767 					  &crypt_block[N * srv_sort_buf_size] :\
2768 					  NULL,				\
2769 					  space_id,			\
2770 					  &buf[N], b##N, INDEX,		\
2771 					  file->fd, foffs##N,		\
2772 					  &mrec##N, offsets##N);	\
2773 		if (UNIV_UNLIKELY(!b##N)) {				\
2774 			if (mrec##N) {					\
2775 				goto corrupt;				\
2776 			}						\
2777 			AT_END;						\
2778 		}							\
2779 	} while (0)
2780 
2781 #ifdef HAVE_PSI_STAGE_INTERFACE
2782 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END)			\
2783 	do {								\
2784 		if (stage != NULL) {					\
2785 			stage->inc();					\
2786 		}							\
2787 		ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END);		\
2788 	} while (0)
2789 #else /* HAVE_PSI_STAGE_INTERFACE */
2790 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END)			\
2791 	ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END)
2792 #endif /* HAVE_PSI_STAGE_INTERFACE */
2793 
2794 /** Merge two blocks of records on disk and write a bigger block.
2795 @param[in]	dup		descriptor of index being created
2796 @param[in]	file		file containing index entries
2797 @param[in,out]	block		3 buffers
2798 @param[in,out]	crypt_block	encrypted file buffer
2799 @param[in]	space_id	tablespace id
2800 @param[in,out]	foffs0		offset of first source list in the file
2801 @param[in,out]	foffs1		offset of second source list in the file
2802 @param[in,out]	of		output file
2803 @param[in,out]	stage		performance schema accounting object, used by
2804 ALTER TABLE. If not NULL stage->inc() will be called for each record
2805 processed.
2806 @return DB_SUCCESS or error code */
2807 static MY_ATTRIBUTE((warn_unused_result))
2808 dberr_t
row_merge_blocks(const row_merge_dup_t * dup,const merge_file_t * file,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,ulint * foffs0,ulint * foffs1,merge_file_t * of,ut_stage_alter_t * stage)2809 row_merge_blocks(
2810 	const row_merge_dup_t*	dup,
2811 	const merge_file_t*	file,
2812 	row_merge_block_t*	block,
2813 	row_merge_block_t*	crypt_block,
2814 	ulint			space_id,
2815 	ulint*			foffs0,
2816 	ulint*			foffs1,
2817 	merge_file_t*		of,
2818 	ut_stage_alter_t*	stage)
2819 {
2820 	mem_heap_t*	heap;	/*!< memory heap for offsets0, offsets1 */
2821 
2822 	mrec_buf_t*	buf;	/*!< buffer for handling
2823 				split mrec in block[] */
2824 	const byte*	b0;	/*!< pointer to block[0] */
2825 	const byte*	b1;	/*!< pointer to block[srv_sort_buf_size] */
2826 	byte*		b2;	/*!< pointer to block[2 * srv_sort_buf_size] */
2827 	const mrec_t*	mrec0;	/*!< merge rec, points to block[0] or buf[0] */
2828 	const mrec_t*	mrec1;	/*!< merge rec, points to
2829 				block[srv_sort_buf_size] or buf[1] */
2830 	ulint*		offsets0;/* offsets of mrec0 */
2831 	ulint*		offsets1;/* offsets of mrec1 */
2832 
2833 	DBUG_ENTER("row_merge_blocks");
2834 	DBUG_PRINT("ib_merge_sort",
2835 		   ("fd=%d,%lu+%lu to fd=%d,%lu",
2836 		    file->fd, ulong(*foffs0), ulong(*foffs1),
2837 		    of->fd, ulong(of->offset)));
2838 
2839 	heap = row_merge_heap_create(dup->index, &buf, &offsets0, &offsets1);
2840 
2841 	/* Write a record and read the next record.  Split the output
2842 	file in two halves, which can be merged on the following pass. */
2843 
2844 	if (!row_merge_read(file->fd, *foffs0, &block[0],
2845 			    &crypt_block[0], space_id)
2846 	    || !row_merge_read(file->fd, *foffs1, &block[srv_sort_buf_size],
2847 			crypt_block ? &crypt_block[srv_sort_buf_size] : NULL,
2848 			space_id)) {
2849 corrupt:
2850 		mem_heap_free(heap);
2851 		DBUG_RETURN(DB_CORRUPTION);
2852 	}
2853 
2854 	b0 = &block[0];
2855 	b1 = &block[srv_sort_buf_size];
2856 	b2 = &block[2 * srv_sort_buf_size];
2857 
2858 	b0 = row_merge_read_rec(
2859 		&block[0], &crypt_block[0], space_id, &buf[0], b0, dup->index,
2860 		file->fd, foffs0, &mrec0, offsets0);
2861 	b1 = row_merge_read_rec(
2862 		&block[srv_sort_buf_size],
2863 		crypt_block ? &crypt_block[srv_sort_buf_size] : NULL,
2864 		space_id, &buf[srv_sort_buf_size], b1, dup->index,
2865 		file->fd, foffs1, &mrec1, offsets1);
2866 	if (UNIV_UNLIKELY(!b0 && mrec0)
2867 	    || UNIV_UNLIKELY(!b1 && mrec1)) {
2868 
2869 		goto corrupt;
2870 	}
2871 
2872 	while (mrec0 && mrec1) {
2873 		int cmp = cmp_rec_rec_simple(
2874 			mrec0, mrec1, offsets0, offsets1,
2875 			dup->index, dup->table);
2876 		if (cmp < 0) {
2877 			ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto merged);
2878 		} else if (cmp) {
2879 			ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto merged);
2880 		} else {
2881 			mem_heap_free(heap);
2882 			DBUG_RETURN(DB_DUPLICATE_KEY);
2883 		}
2884 	}
2885 
2886 merged:
2887 	if (mrec0) {
2888 		/* append all mrec0 to output */
2889 		for (;;) {
2890 			ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto done0);
2891 		}
2892 	}
2893 done0:
2894 	if (mrec1) {
2895 		/* append all mrec1 to output */
2896 		for (;;) {
2897 			ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto done1);
2898 		}
2899 	}
2900 done1:
2901 
2902 	mem_heap_free(heap);
2903 	b2 = row_merge_write_eof(&block[2 * srv_sort_buf_size],
2904 				 crypt_block ?
2905 				 &crypt_block[2 * srv_sort_buf_size] : NULL,
2906 				 space_id, b2, of->fd, &of->offset);
2907 	DBUG_RETURN(b2 ? DB_SUCCESS : DB_CORRUPTION);
2908 }
2909 
2910 /** Copy a block of index entries.
2911 @param[in]	index		index being created
2912 @param[in]	file		input file
2913 @param[in,out]	block		3 buffers
2914 @param[in,out]	crypt_block	encrypted file buffer
2915 @param[in]	space_id	tablespace id
2916 @param[in,out]	foffs0		input file offset
2917 @param[in,out]	of		output file
2918 @param[in,out]	stage		performance schema accounting object, used by
2919 ALTER TABLE. If not NULL stage->inc() will be called for each record
2920 processed.
2921 @return TRUE on success, FALSE on failure */
2922 static MY_ATTRIBUTE((warn_unused_result))
2923 ibool
row_merge_blocks_copy(const dict_index_t * index,const merge_file_t * file,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,ulint * foffs0,merge_file_t * of,ut_stage_alter_t * stage)2924 row_merge_blocks_copy(
2925 	const dict_index_t*	index,
2926 	const merge_file_t*	file,
2927 	row_merge_block_t*	block,
2928 	row_merge_block_t*	crypt_block,
2929 	ulint			space_id,
2930 	ulint*			foffs0,
2931 	merge_file_t*		of,
2932 	ut_stage_alter_t*	stage)
2933 {
2934 	mem_heap_t*	heap;	/*!< memory heap for offsets0, offsets1 */
2935 
2936 	mrec_buf_t*	buf;	/*!< buffer for handling
2937 				split mrec in block[] */
2938 	const byte*	b0;	/*!< pointer to block[0] */
2939 	byte*		b2;	/*!< pointer to block[2 * srv_sort_buf_size] */
2940 	const mrec_t*	mrec0;	/*!< merge rec, points to block[0] */
2941 	ulint*		offsets0;/* offsets of mrec0 */
2942 	ulint*		offsets1;/* dummy offsets */
2943 
2944 	DBUG_ENTER("row_merge_blocks_copy");
2945 	DBUG_PRINT("ib_merge_sort",
2946 		   ("fd=%d," ULINTPF " to fd=%d," ULINTPF,
2947 		    file->fd, *foffs0,
2948 		    of->fd, of->offset));
2949 
2950 	heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
2951 
2952 	/* Write a record and read the next record.  Split the output
2953 	file in two halves, which can be merged on the following pass. */
2954 
2955 	if (!row_merge_read(file->fd, *foffs0, &block[0], &crypt_block[0],
2956 			    space_id)) {
2957 corrupt:
2958 		mem_heap_free(heap);
2959 		DBUG_RETURN(FALSE);
2960 	}
2961 
2962 	b0 = &block[0];
2963 
2964 	b2 = &block[2 * srv_sort_buf_size];
2965 
2966 	b0 = row_merge_read_rec(&block[0], &crypt_block[0], space_id, &buf[0],
2967 				b0, index, file->fd, foffs0, &mrec0, offsets0);
2968 	if (UNIV_UNLIKELY(!b0 && mrec0)) {
2969 
2970 		goto corrupt;
2971 	}
2972 
2973 	if (mrec0) {
2974 		/* append all mrec0 to output */
2975 		for (;;) {
2976 			ROW_MERGE_WRITE_GET_NEXT(0, index, goto done0);
2977 		}
2978 	}
2979 done0:
2980 
2981 	/* The file offset points to the beginning of the last page
2982 	that has been read.  Update it to point to the next block. */
2983 	(*foffs0)++;
2984 
2985 	mem_heap_free(heap);
2986 	DBUG_RETURN(row_merge_write_eof(&block[2 * srv_sort_buf_size],
2987 					crypt_block ?
2988 					&crypt_block[2 * srv_sort_buf_size] :
2989 					NULL, space_id,
2990 					b2, of->fd, &of->offset)
2991 		    != NULL);
2992 }
2993 
2994 /** Merge disk files.
2995 @param[in]	trx		transaction
2996 @param[in]	dup		descriptor of index being created
2997 @param[in,out]	file		file containing index entries
2998 @param[in,out]	block		3 buffers
2999 @param[in,out]	crypt_block	encrypted file buffer
3000 @param[in]	space_id	tablespace id
3001 @param[in,out]	tmpfd		temporary file handle
3002 @param[in,out]	num_run		Number of runs that remain to be merged
3003 @param[in,out]	run_offset	Array that contains the first offset number
3004 for each merge run
3005 @param[in,out]	stage		performance schema accounting object, used by
3006 ALTER TABLE. If not NULL stage->inc() will be called for each record
3007 processed.
3008 @return DB_SUCCESS or error code */
3009 static
3010 dberr_t
row_merge(trx_t * trx,const row_merge_dup_t * dup,merge_file_t * file,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,int * tmpfd,ulint * num_run,ulint * run_offset,ut_stage_alter_t * stage)3011 row_merge(
3012 	trx_t*			trx,
3013 	const row_merge_dup_t*	dup,
3014 	merge_file_t*		file,
3015 	row_merge_block_t*	block,
3016 	row_merge_block_t*	crypt_block,
3017 	ulint			space_id,
3018 	int*			tmpfd,
3019 	ulint*			num_run,
3020 	ulint*			run_offset,
3021 	ut_stage_alter_t*	stage)
3022 {
3023 	ulint		foffs0;	/*!< first input offset */
3024 	ulint		foffs1;	/*!< second input offset */
3025 	dberr_t		error;	/*!< error code */
3026 	merge_file_t	of;	/*!< output file */
3027 	const ulint	ihalf	= run_offset[*num_run / 2];
3028 				/*!< half the input file */
3029 	ulint		n_run	= 0;
3030 				/*!< num of runs generated from this merge */
3031 
3032 	UNIV_MEM_ASSERT_W(&block[0], 3 * srv_sort_buf_size);
3033 	if (crypt_block) {
3034 		UNIV_MEM_ASSERT_W(&crypt_block[0], 3 * srv_sort_buf_size);
3035 	}
3036 
3037 	ut_ad(ihalf < file->offset);
3038 
3039 	of.fd = *tmpfd;
3040 	of.offset = 0;
3041 	of.n_rec = 0;
3042 
3043 #ifdef POSIX_FADV_SEQUENTIAL
3044 	/* The input file will be read sequentially, starting from the
3045 	beginning and the middle.  In Linux, the POSIX_FADV_SEQUENTIAL
3046 	affects the entire file.  Each block will be read exactly once. */
3047 	posix_fadvise(file->fd, 0, 0,
3048 		      POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
3049 #endif /* POSIX_FADV_SEQUENTIAL */
3050 
3051 	/* Merge blocks to the output file. */
3052 	foffs0 = 0;
3053 	foffs1 = ihalf;
3054 
3055 	UNIV_MEM_INVALID(run_offset, *num_run * sizeof *run_offset);
3056 
3057 	for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
3058 
3059 		if (trx_is_interrupted(trx)) {
3060 			return(DB_INTERRUPTED);
3061 		}
3062 
3063 		/* Remember the offset number for this run */
3064 		run_offset[n_run++] = of.offset;
3065 
3066 		error = row_merge_blocks(dup, file, block, crypt_block,
3067 					 space_id, &foffs0, &foffs1, &of,
3068 					 stage);
3069 
3070 		if (error != DB_SUCCESS) {
3071 			return(error);
3072 		}
3073 
3074 	}
3075 
3076 	/* Copy the last blocks, if there are any. */
3077 
3078 	while (foffs0 < ihalf) {
3079 		if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
3080 			return(DB_INTERRUPTED);
3081 		}
3082 
3083 		/* Remember the offset number for this run */
3084 		run_offset[n_run++] = of.offset;
3085 
3086 		if (!row_merge_blocks_copy(dup->index, file, block, crypt_block,
3087 					   space_id, &foffs0, &of, stage)) {
3088 			return(DB_CORRUPTION);
3089 		}
3090 	}
3091 
3092 	ut_ad(foffs0 == ihalf);
3093 
3094 	while (foffs1 < file->offset) {
3095 		if (trx_is_interrupted(trx)) {
3096 			return(DB_INTERRUPTED);
3097 		}
3098 
3099 		/* Remember the offset number for this run */
3100 		run_offset[n_run++] = of.offset;
3101 
3102 		if (!row_merge_blocks_copy(dup->index, file, block, crypt_block,
3103 					   space_id, &foffs1, &of, stage)) {
3104 			return(DB_CORRUPTION);
3105 		}
3106 	}
3107 
3108 	ut_ad(foffs1 == file->offset);
3109 
3110 	if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
3111 		return(DB_CORRUPTION);
3112 	}
3113 
3114 	ut_ad(n_run <= *num_run);
3115 
3116 	*num_run = n_run;
3117 
3118 	/* Each run can contain one or more offsets. As merge goes on,
3119 	the number of runs (to merge) will reduce until we have one
3120 	single run. So the number of runs will always be smaller than
3121 	the number of offsets in file */
3122 	ut_ad((*num_run) <= file->offset);
3123 
3124 	/* The number of offsets in output file is always equal or
3125 	smaller than input file */
3126 	ut_ad(of.offset <= file->offset);
3127 
3128 	/* Swap file descriptors for the next pass. */
3129 	*tmpfd = file->fd;
3130 	*file = of;
3131 
3132 	UNIV_MEM_INVALID(&block[0], 3 * srv_sort_buf_size);
3133 
3134 	return(DB_SUCCESS);
3135 }
3136 
3137 /** Merge disk files.
3138 @param[in]	trx		transaction
3139 @param[in]	dup		descriptor of index being created
3140 @param[in,out]	file		file containing index entries
3141 @param[in,out]	block		3 buffers
3142 @param[in,out]	crypt_block	crypt buf or NULL
3143 @param[in]	space_id	tablespace id
3144 @param[in,out]	tmpfd		temporary file handle
3145 @param[in,out]	stage		performance schema accounting object, used by
3146 ALTER TABLE. If not NULL, stage->begin_phase_sort() will be called initially
3147 and then stage->inc() will be called for each record processed.
3148 @return DB_SUCCESS or error code */
3149 dberr_t
row_merge_sort(trx_t * trx,const row_merge_dup_t * dup,merge_file_t * file,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,int * tmpfd,ut_stage_alter_t * stage)3150 row_merge_sort(
3151 	trx_t*			trx,
3152 	const row_merge_dup_t*	dup,
3153 	merge_file_t*		file,
3154 	row_merge_block_t*	block,
3155 	row_merge_block_t*	crypt_block,
3156 	ulint			space_id,
3157 	int*			tmpfd,
3158 	ut_stage_alter_t*	stage /* = NULL */)
3159 {
3160 	const ulint	half	= file->offset / 2;
3161 	ulint		num_runs;
3162 	ulint*		run_offset;
3163 	dberr_t		error	= DB_SUCCESS;
3164 	DBUG_ENTER("row_merge_sort");
3165 
3166 	/* Record the number of merge runs we need to perform */
3167 	num_runs = file->offset;
3168 
3169 	if (stage != NULL) {
3170 		stage->begin_phase_sort(log2(num_runs));
3171 	}
3172 
3173 	/* If num_runs are less than 1, nothing to merge */
3174 	if (num_runs <= 1) {
3175 		DBUG_RETURN(error);
3176 	}
3177 
3178 	/* "run_offset" records each run's first offset number */
3179 	run_offset = (ulint*) ut_malloc_nokey(file->offset * sizeof(ulint));
3180 
3181 	/* This tells row_merge() where to start for the first round
3182 	of merge. */
3183 	run_offset[half] = half;
3184 
3185 	/* The file should always contain at least one byte (the end
3186 	of file marker).  Thus, it must be at least one block. */
3187 	ut_ad(file->offset > 0);
3188 
3189 	/* Merge the runs until we have one big run */
3190 	do {
3191 		error = row_merge(trx, dup, file, block, crypt_block, space_id,
3192 				  tmpfd, &num_runs, run_offset, stage);
3193 
3194 		if (error != DB_SUCCESS) {
3195 			break;
3196 		}
3197 
3198 		UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
3199 	} while (num_runs > 1);
3200 
3201 	ut_free(run_offset);
3202 
3203 	DBUG_RETURN(error);
3204 }
3205 
3206 /** Copy externally stored columns to the data tuple.
3207 @param[in]	mrec		record containing BLOB pointers,
3208 or NULL to use tuple instead
3209 @param[in]	offsets		offsets of mrec
3210 @param[in]	zip_size	compressed page size in bytes, or 0
3211 @param[in,out]	tuple		data tuple
3212 @param[in,out]	heap		memory heap */
3213 static
3214 void
row_merge_copy_blobs(const mrec_t * mrec,const ulint * offsets,const page_size_t & page_size,dtuple_t * tuple,mem_heap_t * heap)3215 row_merge_copy_blobs(
3216 	const mrec_t*		mrec,
3217 	const ulint*		offsets,
3218 	const page_size_t&	page_size,
3219 	dtuple_t*		tuple,
3220 	mem_heap_t*		heap)
3221 {
3222 	ut_ad(mrec == NULL || rec_offs_any_extern(offsets));
3223 
3224 	for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
3225 		ulint		len;
3226 		const void*	data;
3227 		dfield_t*	field = dtuple_get_nth_field(tuple, i);
3228 		ulint		field_len;
3229 		const byte*	field_data;
3230 
3231 		if (!dfield_is_ext(field)) {
3232 			continue;
3233 		}
3234 
3235 		ut_ad(!dfield_is_null(field));
3236 
3237 		/* During the creation of a PRIMARY KEY, the table is
3238 		X-locked, and we skip copying records that have been
3239 		marked for deletion. Therefore, externally stored
3240 		columns cannot possibly be freed between the time the
3241 		BLOB pointers are read (row_merge_read_clustered_index())
3242 		and dereferenced (below). */
3243 		if (mrec == NULL) {
3244 			field_data
3245 				= static_cast<byte*>(dfield_get_data(field));
3246 			field_len = dfield_get_len(field);
3247 
3248 			ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
3249 
3250 			ut_a(memcmp(field_data + field_len
3251 				     - BTR_EXTERN_FIELD_REF_SIZE,
3252 				     field_ref_zero,
3253 				     BTR_EXTERN_FIELD_REF_SIZE));
3254 
3255 			data = btr_copy_externally_stored_field(
3256 				&len, field_data, page_size, field_len, heap);
3257 		} else {
3258 			data = btr_rec_copy_externally_stored_field(
3259 				mrec, offsets, page_size, i, &len, heap);
3260 		}
3261 
3262 		/* Because we have locked the table, any records
3263 		written by incomplete transactions must have been
3264 		rolled back already. There must not be any incomplete
3265 		BLOB columns. */
3266 		ut_a(data);
3267 
3268 		dfield_set_data(field, data, len);
3269 	}
3270 }
3271 
3272 /** Convert a merge record to a typed data tuple. Note that externally
3273 stored fields are not copied to heap.
3274 @param[in,out]	index	index on the table
3275 @param[in]	mtuple	merge record
3276 @param[in]	heap	memory heap from which memory needed is allocated
3277 @return	index entry built. */
3278 static
3279 void
row_merge_mtuple_to_dtuple(dict_index_t * index,dtuple_t * dtuple,const mtuple_t * mtuple)3280 row_merge_mtuple_to_dtuple(
3281 	dict_index_t*	index,
3282 	dtuple_t*	dtuple,
3283 	const mtuple_t* mtuple)
3284 {
3285 	ut_ad(!dict_index_is_ibuf(index));
3286 
3287 	memcpy(dtuple->fields, mtuple->fields,
3288 	       dtuple->n_fields * sizeof *mtuple->fields);
3289 }
3290 
3291 /** Insert sorted data tuples to the index.
3292 @param[in]	trx_id		transaction identifier
3293 @param[in]	index		index to be inserted
3294 @param[in]	old_table	old table
3295 @param[in]	fd		file descriptor
3296 @param[in,out]	block		file buffer
3297 @param[in,out]	crypt_block	crypt buf or NULL
3298 @param[in]	space_id	tablespace id
3299 @param[in]	row_buf		row_buf the sorted data tuples,
3300 or NULL if fd, block will be used instead
3301 @param[in,out]	btr_bulk	btr bulk instance
3302 @param[in,out]	stage		performance schema accounting object, used by
3303 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
3304 and then stage->inc() will be called for each record that is processed.
3305 @return DB_SUCCESS or error number */
3306 static	MY_ATTRIBUTE((warn_unused_result))
3307 dberr_t
row_merge_insert_index_tuples(trx_id_t trx_id,dict_index_t * index,const dict_table_t * old_table,int fd,row_merge_block_t * block,row_merge_block_t * crypt_block,ulint space_id,const row_merge_buf_t * row_buf,BtrBulk * btr_bulk,ut_stage_alter_t * stage)3308 row_merge_insert_index_tuples(
3309 	trx_id_t		trx_id,
3310 	dict_index_t*		index,
3311 	const dict_table_t*	old_table,
3312 	int			fd,
3313 	row_merge_block_t*	block,
3314 	row_merge_block_t*	crypt_block,
3315 	ulint			space_id,
3316 	const row_merge_buf_t*	row_buf,
3317 	BtrBulk*		btr_bulk,
3318 	ut_stage_alter_t*	stage /* = NULL */)
3319 {
3320 	const byte*		b;
3321 	mem_heap_t*		heap;
3322 	mem_heap_t*		tuple_heap;
3323 	dberr_t			error = DB_SUCCESS;
3324 	ulint			foffs = 0;
3325 	ulint*			offsets;
3326 	mrec_buf_t*		buf;
3327 	ulint			n_rows = 0;
3328 	dtuple_t*		dtuple;
3329 	DBUG_ENTER("row_merge_insert_index_tuples");
3330 
3331 	ut_ad(!srv_read_only_mode);
3332 	ut_ad(!(index->type & DICT_FTS));
3333 	ut_ad(!dict_index_is_spatial(index));
3334 	ut_ad(trx_id);
3335 
3336 	if (stage != NULL) {
3337 		stage->begin_phase_insert();
3338 	}
3339 
3340 	tuple_heap = mem_heap_create(1000);
3341 
3342 	{
3343 		ulint i	= 1 + REC_OFFS_HEADER_SIZE
3344 			+ dict_index_get_n_fields(index);
3345 		heap = mem_heap_create(sizeof *buf + i * sizeof *offsets);
3346 		offsets = static_cast<ulint*>(
3347 			mem_heap_alloc(heap, i * sizeof *offsets));
3348 		offsets[0] = i;
3349 		offsets[1] = dict_index_get_n_fields(index);
3350 	}
3351 
3352 	if (row_buf != NULL) {
3353 		ut_ad(fd == -1);
3354 		ut_ad(block == NULL);
3355 		DBUG_EXECUTE_IF("row_merge_read_failure",
3356 				error = DB_CORRUPTION;
3357 				goto err_exit;);
3358 		buf = NULL;
3359 		b = NULL;
3360 		dtuple = dtuple_create(
3361 			heap, dict_index_get_n_fields(index));
3362 		dtuple_set_n_fields_cmp(
3363 			dtuple, dict_index_get_n_unique_in_tree(index));
3364 	} else {
3365 		b = block;
3366 		dtuple = NULL;
3367 
3368 		if (!row_merge_read(fd, foffs, block, crypt_block, space_id)) {
3369 			error = DB_CORRUPTION;
3370 			goto err_exit;
3371 		} else {
3372 			buf = static_cast<mrec_buf_t*>(
3373 				mem_heap_alloc(heap, sizeof *buf));
3374 		}
3375 	}
3376 
3377 
3378 	for (;;) {
3379 		const mrec_t*	mrec;
3380 		ulint		n_ext;
3381 		mtr_t		mtr;
3382 
3383 		if (stage != NULL) {
3384 			stage->inc();
3385 		}
3386 
3387 		if (row_buf != NULL) {
3388 			if (n_rows >= row_buf->n_tuples) {
3389 				break;
3390 			}
3391 
3392 			/* Convert merge tuple record from
3393 			row buffer to data tuple record */
3394 			row_merge_mtuple_to_dtuple(
3395 				index, dtuple, &row_buf->tuples[n_rows]);
3396 
3397 			n_ext = dtuple_get_n_ext(dtuple);
3398 			n_rows++;
3399 			/* BLOB pointers must be copied from dtuple */
3400 			mrec = NULL;
3401 		} else {
3402 			b = row_merge_read_rec(block, crypt_block, space_id,
3403 					       buf, b, index, fd, &foffs, &mrec,
3404 					       offsets);
3405 			if (UNIV_UNLIKELY(!b)) {
3406 				/* End of list, or I/O error */
3407 				if (mrec) {
3408 					error = DB_CORRUPTION;
3409 				}
3410 				break;
3411 			}
3412 
3413 			dtuple = row_rec_to_index_entry_low(
3414 				mrec, index, offsets, &n_ext, tuple_heap);
3415 		}
3416 
3417 		dict_index_t*	old_index
3418 			= dict_table_get_first_index(old_table);
3419 
3420 		if (dict_index_is_clust(index)
3421 		    && dict_index_is_online_ddl(old_index)) {
3422 			error = row_log_table_get_error(old_index);
3423 			if (error != DB_SUCCESS) {
3424 				break;
3425 			}
3426 		}
3427 
3428 		if (!n_ext) {
3429 			/* There are no externally stored columns. */
3430 		} else {
3431 			ut_ad(dict_index_is_clust(index));
3432 			/* Off-page columns can be fetched safely
3433 			when concurrent modifications to the table
3434 			are disabled. (Purge can process delete-marked
3435 			records, but row_merge_read_clustered_index()
3436 			would have skipped them.)
3437 
3438 			When concurrent modifications are enabled,
3439 			row_merge_read_clustered_index() will
3440 			only see rows from transactions that were
3441 			committed before the ALTER TABLE started
3442 			(REPEATABLE READ).
3443 
3444 			Any modifications after the
3445 			row_merge_read_clustered_index() scan
3446 			will go through row_log_table_apply().
3447 			Any modifications to off-page columns
3448 			will be tracked by
3449 			row_log_table_blob_alloc() and
3450 			row_log_table_blob_free(). */
3451 			row_merge_copy_blobs(
3452 				mrec, offsets,
3453 				dict_table_page_size(old_table),
3454 				dtuple, tuple_heap);
3455 		}
3456 
3457 		ut_ad(dtuple_validate(dtuple));
3458 
3459 		error = btr_bulk->insert(dtuple);
3460 
3461 		if (error != DB_SUCCESS) {
3462 			goto err_exit;
3463 		}
3464 
3465 		mem_heap_empty(tuple_heap);
3466 	}
3467 
3468 err_exit:
3469 	mem_heap_free(tuple_heap);
3470 	mem_heap_free(heap);
3471 
3472 	DBUG_RETURN(error);
3473 }
3474 
3475 /*********************************************************************//**
3476 Sets an exclusive lock on a table, for the duration of creating indexes.
3477 @return error code or DB_SUCCESS */
3478 dberr_t
row_merge_lock_table(trx_t * trx,dict_table_t * table,enum lock_mode mode)3479 row_merge_lock_table(
3480 /*=================*/
3481 	trx_t*		trx,		/*!< in/out: transaction */
3482 	dict_table_t*	table,		/*!< in: table to lock */
3483 	enum lock_mode	mode)		/*!< in: LOCK_X or LOCK_S */
3484 {
3485 	ut_ad(!srv_read_only_mode);
3486 	ut_ad(mode == LOCK_X || mode == LOCK_S);
3487 
3488 	trx->op_info = "setting table lock for creating or dropping index";
3489 	trx->ddl = true;
3490 	/* Trx for DDL should not be forced to rollback for now */
3491 	trx->in_innodb |= TRX_FORCE_ROLLBACK_DISABLE;
3492 
3493 	return(lock_table_for_trx(table, trx, mode));
3494 }
3495 
3496 /*********************************************************************//**
3497 Drop an index that was created before an error occurred.
3498 The data dictionary must have been locked exclusively by the caller,
3499 because the transaction will not be committed. */
3500 static
3501 void
row_merge_drop_index_dict(trx_t * trx,index_id_t index_id)3502 row_merge_drop_index_dict(
3503 /*======================*/
3504 	trx_t*		trx,	/*!< in/out: dictionary transaction */
3505 	index_id_t	index_id)/*!< in: index identifier */
3506 {
3507 	static const char sql[] =
3508 		"PROCEDURE DROP_INDEX_PROC () IS\n"
3509 		"BEGIN\n"
3510 		"DELETE FROM SYS_FIELDS WHERE INDEX_ID=:indexid;\n"
3511 		"DELETE FROM SYS_INDEXES WHERE ID=:indexid;\n"
3512 		"END;\n";
3513 	dberr_t		error;
3514 	pars_info_t*	info;
3515 
3516 	ut_ad(!srv_read_only_mode);
3517 	ut_ad(mutex_own(&dict_sys->mutex));
3518 	ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3519 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3520 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3521 
3522 	info = pars_info_create();
3523 	pars_info_add_ull_literal(info, "indexid", index_id);
3524 	trx->op_info = "dropping index from dictionary";
3525 	error = que_eval_sql(info, sql, FALSE, trx);
3526 
3527 	if (error != DB_SUCCESS) {
3528 		/* Even though we ensure that DDL transactions are WAIT
3529 		and DEADLOCK free, we could encounter other errors e.g.,
3530 		DB_TOO_MANY_CONCURRENT_TRXS. */
3531 		trx->error_state = DB_SUCCESS;
3532 
3533 		ib::error() << "row_merge_drop_index_dict failed with error "
3534 			<< error;
3535 	}
3536 
3537 	trx->op_info = "";
3538 }
3539 
3540 /*********************************************************************//**
3541 Drop indexes that were created before an error occurred.
3542 The data dictionary must have been locked exclusively by the caller,
3543 because the transaction will not be committed. */
3544 void
row_merge_drop_indexes_dict(trx_t * trx,table_id_t table_id)3545 row_merge_drop_indexes_dict(
3546 /*========================*/
3547 	trx_t*		trx,	/*!< in/out: dictionary transaction */
3548 	table_id_t	table_id)/*!< in: table identifier */
3549 {
3550 	static const char sql[] =
3551 		"PROCEDURE DROP_INDEXES_PROC () IS\n"
3552 		"ixid CHAR;\n"
3553 		"found INT;\n"
3554 
3555 		"DECLARE CURSOR index_cur IS\n"
3556 		" SELECT ID FROM SYS_INDEXES\n"
3557 		" WHERE TABLE_ID=:tableid AND\n"
3558 		" SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
3559 		"FOR UPDATE;\n"
3560 
3561 		"BEGIN\n"
3562 		"found := 1;\n"
3563 		"OPEN index_cur;\n"
3564 		"WHILE found = 1 LOOP\n"
3565 		"  FETCH index_cur INTO ixid;\n"
3566 		"  IF (SQL % NOTFOUND) THEN\n"
3567 		"    found := 0;\n"
3568 		"  ELSE\n"
3569 		"    DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
3570 		"    DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
3571 		"  END IF;\n"
3572 		"END LOOP;\n"
3573 		"CLOSE index_cur;\n"
3574 
3575 		"END;\n";
3576 	dberr_t		error;
3577 	pars_info_t*	info;
3578 
3579 	ut_ad(!srv_read_only_mode);
3580 	ut_ad(mutex_own(&dict_sys->mutex));
3581 	ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3582 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3583 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3584 
3585 	/* It is possible that table->n_ref_count > 1 when
3586 	locked=TRUE. In this case, all code that should have an open
3587 	handle to the table be waiting for the next statement to execute,
3588 	or waiting for a meta-data lock.
3589 
3590 	A concurrent purge will be prevented by dict_operation_lock. */
3591 
3592 	info = pars_info_create();
3593 	pars_info_add_ull_literal(info, "tableid", table_id);
3594 	trx->op_info = "dropping indexes";
3595 	error = que_eval_sql(info, sql, FALSE, trx);
3596 
3597 	switch (error) {
3598 	case DB_SUCCESS:
3599 		break;
3600 	default:
3601 		/* Even though we ensure that DDL transactions are WAIT
3602 		and DEADLOCK free, we could encounter other errors e.g.,
3603 		DB_TOO_MANY_CONCURRENT_TRXS. */
3604 		ib::error() << "row_merge_drop_indexes_dict failed with error "
3605 			<< error;
3606 		/* fall through */
3607 	case DB_TOO_MANY_CONCURRENT_TRXS:
3608 		trx->error_state = DB_SUCCESS;
3609 	}
3610 
3611 	trx->op_info = "";
3612 }
3613 
3614 /*********************************************************************//**
3615 Drop indexes that were created before an error occurred.
3616 The data dictionary must have been locked exclusively by the caller,
3617 because the transaction will not be committed. */
3618 void
row_merge_drop_indexes(trx_t * trx,dict_table_t * table,ibool locked)3619 row_merge_drop_indexes(
3620 /*===================*/
3621 	trx_t*		trx,	/*!< in/out: dictionary transaction */
3622 	dict_table_t*	table,	/*!< in/out: table containing the indexes */
3623 	ibool		locked)	/*!< in: TRUE=table locked,
3624 				FALSE=may need to do a lazy drop */
3625 {
3626 	dict_index_t*	index;
3627 	dict_index_t*	next_index;
3628 
3629 	ut_ad(!srv_read_only_mode);
3630 	ut_ad(mutex_own(&dict_sys->mutex));
3631 	ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3632 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3633 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3634 
3635 	index = dict_table_get_first_index(table);
3636 	ut_ad(dict_index_is_clust(index));
3637 	ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE);
3638 
3639 	/* the caller should have an open handle to the table */
3640 	ut_ad(table->get_ref_count() >= 1);
3641 
3642 	/* It is possible that table->n_ref_count > 1 when
3643 	locked=TRUE. In this case, all code that should have an open
3644 	handle to the table be waiting for the next statement to execute,
3645 	or waiting for a meta-data lock.
3646 
3647 	A concurrent purge will be prevented by dict_operation_lock. */
3648 
3649 	if (!locked && table->get_ref_count() > 1) {
3650 		/* We will have to drop the indexes later, when the
3651 		table is guaranteed to be no longer in use.  Mark the
3652 		indexes as incomplete and corrupted, so that other
3653 		threads will stop using them.  Let dict_table_close()
3654 		or crash recovery or the next invocation of
3655 		prepare_inplace_alter_table() take care of dropping
3656 		the indexes. */
3657 
3658 		while ((index = dict_table_get_next_index(index)) != NULL) {
3659 			ut_ad(!dict_index_is_clust(index));
3660 
3661 			switch (dict_index_get_online_status(index)) {
3662 			case ONLINE_INDEX_ABORTED_DROPPED:
3663 				continue;
3664 			case ONLINE_INDEX_COMPLETE:
3665 				if (index->is_committed()) {
3666 					/* Do nothing to already
3667 					published indexes. */
3668 				} else if (index->type & DICT_FTS) {
3669 					/* Drop a completed FULLTEXT
3670 					index, due to a timeout during
3671 					MDL upgrade for
3672 					commit_inplace_alter_table().
3673 					Because only concurrent reads
3674 					are allowed (and they are not
3675 					seeing this index yet) we
3676 					are safe to drop the index. */
3677 					dict_index_t* prev = UT_LIST_GET_PREV(
3678 						indexes, index);
3679 					/* At least there should be
3680 					the clustered index before
3681 					this one. */
3682 					ut_ad(prev);
3683 					ut_a(table->fts);
3684 					fts_drop_index(table, index, trx);
3685 					/* Since
3686 					INNOBASE_SHARE::idx_trans_tbl
3687 					is shared between all open
3688 					ha_innobase handles to this
3689 					table, no thread should be
3690 					accessing this dict_index_t
3691 					object. Also, we should be
3692 					holding LOCK=SHARED MDL on the
3693 					table even after the MDL
3694 					upgrade timeout. */
3695 
3696 					/* We can remove a DICT_FTS
3697 					index from the cache, because
3698 					we do not allow ADD FULLTEXT INDEX
3699 					with LOCK=NONE. If we allowed that,
3700 					we should exclude FTS entries from
3701 					prebuilt->ins_node->entry_list
3702 					in ins_node_create_entry_list(). */
3703 					dict_index_remove_from_cache(
3704 						table, index);
3705 					index = prev;
3706 				} else {
3707 					rw_lock_x_lock(
3708 						dict_index_get_lock(index));
3709 					dict_index_set_online_status(
3710 						index, ONLINE_INDEX_ABORTED);
3711 					index->type |= DICT_CORRUPT;
3712 					table->drop_aborted = TRUE;
3713 					goto drop_aborted;
3714 				}
3715 				continue;
3716 			case ONLINE_INDEX_CREATION:
3717 				rw_lock_x_lock(dict_index_get_lock(index));
3718 				ut_ad(!index->is_committed());
3719 				row_log_abort_sec(index);
3720 			drop_aborted:
3721 				rw_lock_x_unlock(dict_index_get_lock(index));
3722 
3723 				DEBUG_SYNC_C("merge_drop_index_after_abort");
3724 				/* covered by dict_sys->mutex */
3725 				MONITOR_INC(MONITOR_BACKGROUND_DROP_INDEX);
3726 				/* fall through */
3727 			case ONLINE_INDEX_ABORTED:
3728 				/* Drop the index tree from the
3729 				data dictionary and free it from
3730 				the tablespace, but keep the object
3731 				in the data dictionary cache. */
3732 				row_merge_drop_index_dict(trx, index->id);
3733 				rw_lock_x_lock(dict_index_get_lock(index));
3734 				dict_index_set_online_status(
3735 					index, ONLINE_INDEX_ABORTED_DROPPED);
3736 				rw_lock_x_unlock(dict_index_get_lock(index));
3737 				table->drop_aborted = TRUE;
3738 				continue;
3739 			}
3740 			ut_error;
3741 		}
3742 
3743 		return;
3744 	}
3745 
3746 	row_merge_drop_indexes_dict(trx, table->id);
3747 
3748 	/* Invalidate all row_prebuilt_t::ins_graph that are referring
3749 	to this table. That is, force row_get_prebuilt_insert_row() to
3750 	rebuild prebuilt->ins_node->entry_list). */
3751 	ut_ad(table->def_trx_id <= trx->id);
3752 	table->def_trx_id = trx->id;
3753 
3754 	next_index = dict_table_get_next_index(index);
3755 
3756 	while ((index = next_index) != NULL) {
3757 		/* read the next pointer before freeing the index */
3758 		next_index = dict_table_get_next_index(index);
3759 
3760 		ut_ad(!dict_index_is_clust(index));
3761 
3762 		if (!index->is_committed()) {
3763 			/* If it is FTS index, drop from table->fts
3764 			and also drop its auxiliary tables */
3765 			if (index->type & DICT_FTS) {
3766 				ut_a(table->fts);
3767 				fts_drop_index(table, index, trx);
3768 			}
3769 
3770 			switch (dict_index_get_online_status(index)) {
3771 			case ONLINE_INDEX_CREATION:
3772 				/* This state should only be possible
3773 				when prepare_inplace_alter_table() fails
3774 				after invoking row_merge_create_index().
3775 				In inplace_alter_table(),
3776 				row_merge_build_indexes()
3777 				should never leave the index in this state.
3778 				It would invoke row_log_abort_sec() on
3779 				failure. */
3780 			case ONLINE_INDEX_COMPLETE:
3781 				/* In these cases, we are able to drop
3782 				the index straight. The DROP INDEX was
3783 				never deferred. */
3784 				break;
3785 			case ONLINE_INDEX_ABORTED:
3786 			case ONLINE_INDEX_ABORTED_DROPPED:
3787 				/* covered by dict_sys->mutex */
3788 				MONITOR_DEC(MONITOR_BACKGROUND_DROP_INDEX);
3789 			}
3790 
3791 			dict_index_remove_from_cache(table, index);
3792 		}
3793 	}
3794 
3795 	table->drop_aborted = FALSE;
3796 	ut_d(dict_table_check_for_dup_indexes(table, CHECK_ALL_COMPLETE));
3797 }
3798 
3799 /*********************************************************************//**
3800 Drop all partially created indexes during crash recovery. */
3801 void
row_merge_drop_temp_indexes(void)3802 row_merge_drop_temp_indexes(void)
3803 /*=============================*/
3804 {
3805 	static const char sql[] =
3806 		"PROCEDURE DROP_TEMP_INDEXES_PROC () IS\n"
3807 		"ixid CHAR;\n"
3808 		"found INT;\n"
3809 
3810 		"DECLARE CURSOR index_cur IS\n"
3811 		" SELECT ID FROM SYS_INDEXES\n"
3812 		" WHERE SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
3813 		"FOR UPDATE;\n"
3814 
3815 		"BEGIN\n"
3816 		"found := 1;\n"
3817 		"OPEN index_cur;\n"
3818 		"WHILE found = 1 LOOP\n"
3819 		"  FETCH index_cur INTO ixid;\n"
3820 		"  IF (SQL % NOTFOUND) THEN\n"
3821 		"    found := 0;\n"
3822 		"  ELSE\n"
3823 		"    DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
3824 		"    DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
3825 		"  END IF;\n"
3826 		"END LOOP;\n"
3827 		"CLOSE index_cur;\n"
3828 		"END;\n";
3829 	trx_t*	trx;
3830 	dberr_t	error;
3831 
3832 	/* Load the table definitions that contain partially defined
3833 	indexes, so that the data dictionary information can be checked
3834 	when accessing the tablename.ibd files. */
3835 	trx = trx_allocate_for_background();
3836 	trx->op_info = "dropping partially created indexes";
3837 	row_mysql_lock_data_dictionary(trx);
3838 	/* Ensure that this transaction will be rolled back and locks
3839 	will be released, if the server gets killed before the commit
3840 	gets written to the redo log. */
3841 	trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
3842 
3843 	trx->op_info = "dropping indexes";
3844 	error = que_eval_sql(NULL, sql, FALSE, trx);
3845 
3846 	if (error != DB_SUCCESS) {
3847 		/* Even though we ensure that DDL transactions are WAIT
3848 		and DEADLOCK free, we could encounter other errors e.g.,
3849 		DB_TOO_MANY_CONCURRENT_TRXS. */
3850 		trx->error_state = DB_SUCCESS;
3851 
3852 		ib::error() << "row_merge_drop_temp_indexes failed with error"
3853 			<< error;
3854 	}
3855 
3856 	trx_commit_for_mysql(trx);
3857 	row_mysql_unlock_data_dictionary(trx);
3858 	trx_free_for_background(trx);
3859 }
3860 
3861 
3862 /** Create temporary merge files in the given paramater path, and if
3863 UNIV_PFS_IO defined, register the file descriptor with Performance Schema.
3864 @param[in]	path	location for creating temporary merge files.
3865 @return File descriptor */
3866 int
row_merge_file_create_low(const char * path)3867 row_merge_file_create_low(
3868 	const char*	path)
3869 {
3870     int fd;
3871     if (path == NULL) {
3872       path = innobase_mysql_tmpdir();
3873     }
3874 #ifdef UNIV_PFS_IO
3875 	/* This temp file open does not go through normal
3876 	file APIs, add instrumentation to register with
3877 	performance schema */
3878 	struct PSI_file_locker*	locker = NULL;
3879 	Datafile df;
3880 	df.make_filepath(path, "Innodb Merge Temp File", NO_EXT);
3881 
3882         PSI_file_locker_state	state;
3883         locker = PSI_FILE_CALL(get_thread_file_name_locker)(
3884             &state, innodb_temp_file_key.m_value, PSI_FILE_OPEN, df.filepath(),
3885             &locker);
3886         if (locker != NULL) {
3887 		PSI_FILE_CALL(start_file_open_wait)(locker,
3888 						__FILE__,
3889 						__LINE__);
3890         }
3891 #endif
3892 	fd = innobase_mysql_tmpfile(path);
3893 #ifdef UNIV_PFS_IO
3894 	 if (locker != NULL) {
3895 		PSI_FILE_CALL(end_file_open_wait_and_bind_to_descriptor)(
3896 				locker, fd);
3897 		}
3898 #endif
3899 
3900 	if (fd < 0) {
3901 		ib::error() << "Cannot create temporary merge file";
3902 		return(-1);
3903 	}
3904 	return(fd);
3905 }
3906 
3907 
3908 /** Create a merge file in the given location.
3909 @param[out]	merge_file	merge file structure
3910 @param[in]	path		location for creating temporary file
3911 @return file descriptor, or -1 on failure */
3912 int
row_merge_file_create(merge_file_t * merge_file,const char * path)3913 row_merge_file_create(
3914 	merge_file_t*	merge_file,
3915 	const char*	path)
3916 {
3917 	merge_file->fd = row_merge_file_create_low(path);
3918 	merge_file->offset = 0;
3919 	merge_file->n_rec = 0;
3920 
3921 	if (merge_file->fd >= 0) {
3922 		if (srv_disable_sort_file_cache) {
3923 			os_file_set_nocache(merge_file->fd,
3924 				"row0merge.cc", "sort");
3925 		}
3926 	}
3927 	return(merge_file->fd);
3928 }
3929 
3930 /*********************************************************************//**
3931 Destroy a merge file. And de-register the file from Performance Schema
3932 if UNIV_PFS_IO is defined. */
3933 void
row_merge_file_destroy_low(int fd)3934 row_merge_file_destroy_low(
3935 /*=======================*/
3936 	int		fd)	/*!< in: merge file descriptor */
3937 {
3938 #ifdef UNIV_PFS_IO
3939 	struct PSI_file_locker*	locker = NULL;
3940 	PSI_file_locker_state	state;
3941 	locker = PSI_FILE_CALL(get_thread_file_descriptor_locker)(
3942 			       &state, fd, PSI_FILE_CLOSE);
3943 	if (locker != NULL) {
3944 		PSI_FILE_CALL(start_file_wait)(
3945 			      locker, 0, __FILE__, __LINE__);
3946 	}
3947 #endif
3948 	if (fd >= 0) {
3949 		close(fd);
3950 	}
3951 #ifdef UNIV_PFS_IO
3952 	if (locker != NULL) {
3953 		PSI_FILE_CALL(end_file_wait)(locker, 0);
3954 	}
3955 #endif
3956 }
3957 /*********************************************************************//**
3958 Destroy a merge file. */
3959 void
row_merge_file_destroy(merge_file_t * merge_file)3960 row_merge_file_destroy(
3961 /*===================*/
3962 	merge_file_t*	merge_file)	/*!< in/out: merge file structure */
3963 {
3964 	ut_ad(!srv_read_only_mode);
3965 
3966 	if (merge_file->fd != -1) {
3967 		row_merge_file_destroy_low(merge_file->fd);
3968 		merge_file->fd = -1;
3969 	}
3970 }
3971 
3972 /*********************************************************************//**
3973 Rename an index in the dictionary that was created. The data
3974 dictionary must have been locked exclusively by the caller, because
3975 the transaction will not be committed.
3976 @return DB_SUCCESS if all OK */
3977 dberr_t
row_merge_rename_index_to_add(trx_t * trx,table_id_t table_id,index_id_t index_id)3978 row_merge_rename_index_to_add(
3979 /*==========================*/
3980 	trx_t*		trx,		/*!< in/out: transaction */
3981 	table_id_t	table_id,	/*!< in: table identifier */
3982 	index_id_t	index_id)	/*!< in: index identifier */
3983 {
3984 	dberr_t		err = DB_SUCCESS;
3985 	pars_info_t*	info = pars_info_create();
3986 
3987 	/* We use the private SQL parser of Innobase to generate the
3988 	query graphs needed in renaming indexes. */
3989 
3990 	static const char rename_index[] =
3991 		"PROCEDURE RENAME_INDEX_PROC () IS\n"
3992 		"BEGIN\n"
3993 		"UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
3994 		"WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
3995 		"END;\n";
3996 
3997 	ut_ad(trx);
3998 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
3999 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
4000 
4001 	trx->op_info = "renaming index to add";
4002 
4003 	pars_info_add_ull_literal(info, "tableid", table_id);
4004 	pars_info_add_ull_literal(info, "indexid", index_id);
4005 
4006 	err = que_eval_sql(info, rename_index, FALSE, trx);
4007 
4008 	if (err != DB_SUCCESS) {
4009 		/* Even though we ensure that DDL transactions are WAIT
4010 		and DEADLOCK free, we could encounter other errors e.g.,
4011 		DB_TOO_MANY_CONCURRENT_TRXS. */
4012 		trx->error_state = DB_SUCCESS;
4013 
4014 		ib::error() << "row_merge_rename_index_to_add failed with"
4015 			" error " << err;
4016 	}
4017 
4018 	trx->op_info = "";
4019 
4020 	return(err);
4021 }
4022 
4023 /*********************************************************************//**
4024 Rename an index in the dictionary that is to be dropped. The data
4025 dictionary must have been locked exclusively by the caller, because
4026 the transaction will not be committed.
4027 @return DB_SUCCESS if all OK */
4028 dberr_t
row_merge_rename_index_to_drop(trx_t * trx,table_id_t table_id,index_id_t index_id)4029 row_merge_rename_index_to_drop(
4030 /*===========================*/
4031 	trx_t*		trx,		/*!< in/out: transaction */
4032 	table_id_t	table_id,	/*!< in: table identifier */
4033 	index_id_t	index_id)	/*!< in: index identifier */
4034 {
4035 	dberr_t		err;
4036 	pars_info_t*	info = pars_info_create();
4037 
4038 	ut_ad(!srv_read_only_mode);
4039 
4040 	/* We use the private SQL parser of Innobase to generate the
4041 	query graphs needed in renaming indexes. */
4042 
4043 	static const char rename_index[] =
4044 		"PROCEDURE RENAME_INDEX_PROC () IS\n"
4045 		"BEGIN\n"
4046 		"UPDATE SYS_INDEXES SET NAME=CONCAT('"
4047 		TEMP_INDEX_PREFIX_STR "',NAME)\n"
4048 		"WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
4049 		"END;\n";
4050 
4051 	ut_ad(trx);
4052 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
4053 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
4054 
4055 	trx->op_info = "renaming index to drop";
4056 
4057 	pars_info_add_ull_literal(info, "tableid", table_id);
4058 	pars_info_add_ull_literal(info, "indexid", index_id);
4059 
4060 	err = que_eval_sql(info, rename_index, FALSE, trx);
4061 
4062 	if (err != DB_SUCCESS) {
4063 		/* Even though we ensure that DDL transactions are WAIT
4064 		and DEADLOCK free, we could encounter other errors e.g.,
4065 		DB_TOO_MANY_CONCURRENT_TRXS. */
4066 		trx->error_state = DB_SUCCESS;
4067 
4068 		ib::error() << "row_merge_rename_index_to_drop failed with"
4069 			" error " << err;
4070 	}
4071 
4072 	trx->op_info = "";
4073 
4074 	return(err);
4075 }
4076 
4077 /*********************************************************************//**
4078 Provide a new pathname for a table that is being renamed if it belongs to
4079 a file-per-table tablespace.  The caller is responsible for freeing the
4080 memory allocated for the return value.
4081 @return new pathname of tablespace file, or NULL if space = 0 */
4082 char*
row_make_new_pathname(dict_table_t * table,const char * new_name)4083 row_make_new_pathname(
4084 /*==================*/
4085 	dict_table_t*	table,		/*!< in: table to be renamed */
4086 	const char*	new_name)	/*!< in: new name */
4087 {
4088 	char*	new_path;
4089 	char*	old_path;
4090 
4091 	ut_ad(!is_system_tablespace(table->space));
4092 
4093 	old_path = fil_space_get_first_path(table->space);
4094 	ut_a(old_path);
4095 
4096 	new_path = os_file_make_new_pathname(old_path, new_name);
4097 
4098 	ut_free(old_path);
4099 
4100 	return(new_path);
4101 }
4102 
4103 /*********************************************************************//**
4104 Rename the tables in the data dictionary.  The data dictionary must
4105 have been locked exclusively by the caller, because the transaction
4106 will not be committed.
4107 @return error code or DB_SUCCESS */
4108 dberr_t
row_merge_rename_tables_dict(dict_table_t * old_table,dict_table_t * new_table,const char * tmp_name,trx_t * trx)4109 row_merge_rename_tables_dict(
4110 /*=========================*/
4111 	dict_table_t*	old_table,	/*!< in/out: old table, renamed to
4112 					tmp_name */
4113 	dict_table_t*	new_table,	/*!< in/out: new table, renamed to
4114 					old_table->name */
4115 	const char*	tmp_name,	/*!< in: new name for old_table */
4116 	trx_t*		trx)		/*!< in/out: dictionary transaction */
4117 {
4118 	dberr_t		err	= DB_ERROR;
4119 	pars_info_t*	info;
4120 
4121 	ut_ad(!srv_read_only_mode);
4122 	ut_ad(old_table != new_table);
4123 	ut_ad(mutex_own(&dict_sys->mutex));
4124 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
4125 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_TABLE
4126 	      || trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
4127 
4128 	trx->op_info = "renaming tables";
4129 
4130 	/* We use the private SQL parser of Innobase to generate the query
4131 	graphs needed in updating the dictionary data in system tables. */
4132 
4133 	info = pars_info_create();
4134 
4135 	pars_info_add_str_literal(info, "new_name", new_table->name.m_name);
4136 	pars_info_add_str_literal(info, "old_name", old_table->name.m_name);
4137 	pars_info_add_str_literal(info, "tmp_name", tmp_name);
4138 
4139 	err = que_eval_sql(info,
4140 			   "PROCEDURE RENAME_TABLES () IS\n"
4141 			   "BEGIN\n"
4142 			   "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
4143 			   " WHERE NAME = :old_name;\n"
4144 			   "UPDATE SYS_TABLES SET NAME = :old_name\n"
4145 			   " WHERE NAME = :new_name;\n"
4146 			   "END;\n", FALSE, trx);
4147 
4148 	/* Update SYS_TABLESPACES and SYS_DATAFILES if the old table being
4149 	renamed is a single-table tablespace, which must be implicitly
4150 	renamed along with the table. */
4151 	if (err == DB_SUCCESS
4152 	    && dict_table_is_file_per_table(old_table)
4153 	    && !old_table->file_unreadable) {
4154 		/* Make pathname to update SYS_DATAFILES. */
4155 		char* tmp_path = row_make_new_pathname(old_table, tmp_name);
4156 
4157 		info = pars_info_create();
4158 
4159 		pars_info_add_str_literal(info, "tmp_name", tmp_name);
4160 		pars_info_add_str_literal(info, "tmp_path", tmp_path);
4161 		pars_info_add_int4_literal(info, "old_space",
4162 					   (lint) old_table->space);
4163 
4164 		err = que_eval_sql(info,
4165 				   "PROCEDURE RENAME_OLD_SPACE () IS\n"
4166 				   "BEGIN\n"
4167 				   "UPDATE SYS_TABLESPACES"
4168 				   " SET NAME = :tmp_name\n"
4169 				   " WHERE SPACE = :old_space;\n"
4170 				   "UPDATE SYS_DATAFILES"
4171 				   " SET PATH = :tmp_path\n"
4172 				   " WHERE SPACE = :old_space;\n"
4173 				   "END;\n", FALSE, trx);
4174 
4175 		ut_free(tmp_path);
4176 	}
4177 
4178 	/* Update SYS_TABLESPACES and SYS_DATAFILES if the new table being
4179 	renamed is a single-table tablespace, which must be implicitly
4180 	renamed along with the table. */
4181 	if (err == DB_SUCCESS
4182 	    && dict_table_is_file_per_table(new_table)) {
4183 		/* Make pathname to update SYS_DATAFILES. */
4184 		char* old_path = row_make_new_pathname(
4185 			new_table, old_table->name.m_name);
4186 
4187 		info = pars_info_create();
4188 
4189 		pars_info_add_str_literal(info, "old_name",
4190 					  old_table->name.m_name);
4191 		pars_info_add_str_literal(info, "old_path", old_path);
4192 		pars_info_add_int4_literal(info, "new_space",
4193 					   (lint) new_table->space);
4194 
4195 		err = que_eval_sql(info,
4196 				   "PROCEDURE RENAME_NEW_SPACE () IS\n"
4197 				   "BEGIN\n"
4198 				   "UPDATE SYS_TABLESPACES"
4199 				   " SET NAME = :old_name\n"
4200 				   " WHERE SPACE = :new_space;\n"
4201 				   "UPDATE SYS_DATAFILES"
4202 				   " SET PATH = :old_path\n"
4203 				   " WHERE SPACE = :new_space;\n"
4204 				   "END;\n", FALSE, trx);
4205 
4206 		ut_free(old_path);
4207 	}
4208 
4209 	if (err == DB_SUCCESS && dict_table_is_discarded(new_table)) {
4210 		err = row_import_update_discarded_flag(
4211 			trx, new_table->id, true, true);
4212 	}
4213 
4214 	trx->op_info = "";
4215 
4216 	return(err);
4217 }
4218 
4219 /** Create and execute a query graph for creating an index.
4220 @param[in,out]	trx	trx
4221 @param[in,out]	table	table
4222 @param[in,out]	index	index
4223 @param[in]	add_v	new virtual columns added along with add index call
4224 @return DB_SUCCESS or error code */
4225 static MY_ATTRIBUTE((warn_unused_result))
4226 dberr_t
row_merge_create_index_graph(trx_t * trx,dict_table_t * table,dict_index_t * index,const dict_add_v_col_t * add_v)4227 row_merge_create_index_graph(
4228 	trx_t*			trx,
4229 	dict_table_t*		table,
4230 	dict_index_t*		index,
4231 	const dict_add_v_col_t* add_v)
4232 {
4233 	ind_node_t*	node;		/*!< Index creation node */
4234 	mem_heap_t*	heap;		/*!< Memory heap */
4235 	que_thr_t*	thr;		/*!< Query thread */
4236 	dberr_t		err;
4237 
4238 	DBUG_ENTER("row_merge_create_index_graph");
4239 
4240 	ut_ad(trx);
4241 	ut_ad(table);
4242 	ut_ad(index);
4243 
4244 	heap = mem_heap_create(512);
4245 
4246 	index->table = table;
4247 	node = ind_create_graph_create(index, heap, add_v);
4248 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
4249 
4250 	ut_a(thr == que_fork_start_command(
4251 			static_cast<que_fork_t*>(que_node_get_parent(thr))));
4252 
4253 	que_run_threads(thr);
4254 
4255 	err = trx->error_state;
4256 
4257 	que_graph_free((que_t*) que_node_get_parent(thr));
4258 
4259 	DBUG_RETURN(err);
4260 }
4261 
4262 /** Create the index and load in to the dictionary.
4263 @param[in,out]	trx		trx (sets error_state)
4264 @param[in,out]	table		the index is on this table
4265 @param[in]	index_def	the index definition
4266 @param[in]	add_v		new virtual columns added along with add
4267 				index call
4268 @return index, or NULL on error */
4269 dict_index_t*
row_merge_create_index(trx_t * trx,dict_table_t * table,const index_def_t * index_def,const dict_add_v_col_t * add_v)4270 row_merge_create_index(
4271 	trx_t*			trx,
4272 	dict_table_t*		table,
4273 	const index_def_t*	index_def,
4274 	const dict_add_v_col_t*	add_v)
4275 {
4276 	dict_index_t*	index;
4277 	dberr_t		err;
4278 	ulint		n_fields = index_def->n_fields;
4279 	ulint		i;
4280 	bool		has_new_v_col = false;
4281 
4282 	DBUG_ENTER("row_merge_create_index");
4283 
4284 	ut_ad(!srv_read_only_mode);
4285 
4286 	/* Create the index prototype, using the passed in def, this is not
4287 	a persistent operation. We pass 0 as the space id, and determine at
4288 	a lower level the space id where to store the table. */
4289 
4290 	index = dict_mem_index_create(table->name.m_name, index_def->name,
4291 				      0, index_def->ind_type, n_fields);
4292 
4293 	ut_a(index);
4294 
4295 	index->set_committed(index_def->rebuild);
4296 
4297 	for (i = 0; i < n_fields; i++) {
4298 		const char*	name;
4299 		index_field_t*	ifield = &index_def->fields[i];
4300 
4301 		if (ifield->is_v_col) {
4302 			if (ifield->col_no >= table->n_v_def) {
4303 				ut_ad(ifield->col_no < table->n_v_def
4304 				      + add_v->n_v_col);
4305 				ut_ad(ifield->col_no >= table->n_v_def);
4306 				name = add_v->v_col_name[
4307 					ifield->col_no - table->n_v_def];
4308 
4309 				has_new_v_col = true;
4310 			} else {
4311 				name = dict_table_get_v_col_name(
4312 					table, ifield->col_no);
4313 			}
4314 		} else {
4315 			name = dict_table_get_col_name(table, ifield->col_no);
4316 
4317 		}
4318 
4319 		dict_mem_index_add_field(index, name, ifield->prefix_len);
4320 	}
4321 
4322 	/* Add the index to SYS_INDEXES, using the index prototype. */
4323 	err = row_merge_create_index_graph(trx, table, index, add_v);
4324 
4325 	if (err == DB_SUCCESS) {
4326 
4327 		index = dict_table_get_index_on_name(table, index_def->name,
4328 						     index_def->rebuild);
4329 
4330 		ut_a(index);
4331 
4332 		index->parser = index_def->parser;
4333 		index->is_ngram = index_def->is_ngram;
4334 		index->has_new_v_col = has_new_v_col;
4335 
4336 		/* Note the id of the transaction that created this
4337 		index, we use it to restrict readers from accessing
4338 		this index, to ensure read consistency. */
4339 		ut_ad(index->trx_id == trx->id);
4340 	} else {
4341 		/* In case we were unable to assign an undo record for this index
4342 		we won't free index memory object */
4343 		if (err == DB_TOO_MANY_CONCURRENT_TRXS) {
4344 			dict_mem_index_free(index);
4345 		}
4346 		index = NULL;
4347 	}
4348 
4349 	DBUG_RETURN(index);
4350 }
4351 
4352 /*********************************************************************//**
4353 Check if a transaction can use an index. */
4354 ibool
row_merge_is_index_usable(const trx_t * trx,const dict_index_t * index)4355 row_merge_is_index_usable(
4356 /*======================*/
4357 	const trx_t*		trx,	/*!< in: transaction */
4358 	const dict_index_t*	index)	/*!< in: index to check */
4359 {
4360 	if (!dict_index_is_clust(index)
4361 	    && dict_index_is_online_ddl(index)) {
4362 		/* Indexes that are being created are not useable. */
4363 		return(FALSE);
4364 	}
4365 
4366 	return(!dict_index_is_corrupted(index)
4367 	       && (dict_table_is_temporary(index->table)
4368 		   || index->trx_id == 0
4369 		   || !MVCC::is_view_active(trx->read_view)
4370 		   || trx->read_view->changes_visible(
4371 			   index->trx_id,
4372 			   index->table->name)));
4373 }
4374 
4375 /*********************************************************************//**
4376 Drop a table. The caller must have ensured that the background stats
4377 thread is not processing the table. This can be done by calling
4378 dict_stats_wait_bg_to_stop_using_table() after locking the dictionary and
4379 before calling this function.
4380 @return DB_SUCCESS or error code */
4381 dberr_t
row_merge_drop_table(trx_t * trx,dict_table_t * table)4382 row_merge_drop_table(
4383 /*=================*/
4384 	trx_t*		trx,		/*!< in: transaction */
4385 	dict_table_t*	table)		/*!< in: table to drop */
4386 {
4387 	ut_ad(!srv_read_only_mode);
4388 
4389 	/* There must be no open transactions on the table. */
4390 	ut_a(table->get_ref_count() == 0);
4391 
4392 	return(row_drop_table_for_mysql(table->name.m_name,
4393 					trx, false, false));
4394 }
4395 
4396 /** Write an MLOG_INDEX_LOAD record to indicate in the redo-log
4397 that redo-logging of individual index pages was disabled, and
4398 the flushing of such pages to the data files was completed.
4399 @param[in]	index	an index tree on which redo logging was disabled */
4400 static
4401 void
row_merge_write_redo(const dict_index_t * index)4402 row_merge_write_redo(
4403 	const dict_index_t*	index)
4404 {
4405 	mtr_t	mtr;
4406 	byte*	log_ptr;
4407 
4408 	ut_ad(!dict_table_is_temporary(index->table));
4409 	mtr.start();
4410 	log_ptr = mlog_open(&mtr, 11 + 8);
4411 	log_ptr = mlog_write_initial_log_record_low(
4412 		MLOG_INDEX_LOAD,
4413 		index->space, index->page, log_ptr, &mtr);
4414 	mach_write_to_8(log_ptr, index->id);
4415 	mlog_close(&mtr, log_ptr + 8);
4416 	mtr.commit();
4417 }
4418 
4419 /** Build indexes on a table by reading a clustered index, creating a temporary
4420 file containing index entries, merge sorting these index entries and inserting
4421 sorted index entries to indexes.
4422 @param[in]	trx		transaction
4423 @param[in]	old_table	table where rows are read from
4424 @param[in]	new_table	table where indexes are created; identical to
4425 old_table unless creating a PRIMARY KEY
4426 @param[in]	online		true if creating indexes online
4427 @param[in]	indexes		indexes to be created
4428 @param[in]	key_numbers	MySQL key numbers
4429 @param[in]	n_indexes	size of indexes[]
4430 @param[in,out]	table		MySQL table, for reporting erroneous key value
4431 if applicable
4432 @param[in]	add_cols	default values of added columns, or NULL
4433 @param[in]	col_map		mapping of old column numbers to new ones, or
4434 NULL if old_table == new_table
4435 @param[in]	add_autoinc	number of added AUTO_INCREMENT columns, or
4436 ULINT_UNDEFINED if none is added
4437 @param[in,out]	sequence	autoinc sequence
4438 @param[in]	skip_pk_sort	whether the new PRIMARY KEY will follow
4439 existing order
4440 @param[in,out]	stage		performance schema accounting object, used by
4441 ALTER TABLE. stage->begin_phase_read_pk() will be called at the beginning of
4442 this function and it will be passed to other functions for further accounting.
4443 @param[in]	add_v		new virtual columns added along with indexes
4444 @param[in]	eval_table	mysql table used to evaluate virtual column
4445 				value, see innobase_get_computed_value().
4446 @param[in]	prebuilt	compress_heap must be taken from here
4447 @return DB_SUCCESS or error code */
4448 dberr_t
row_merge_build_indexes(trx_t * trx,dict_table_t * old_table,dict_table_t * new_table,bool online,dict_index_t ** indexes,const ulint * key_numbers,ulint n_indexes,struct TABLE * table,const dtuple_t * add_cols,const ulint * col_map,ulint add_autoinc,ib_sequence_t & sequence,bool skip_pk_sort,ut_stage_alter_t * stage,const dict_add_v_col_t * add_v,struct TABLE * eval_table,row_prebuilt_t * prebuilt)4449 row_merge_build_indexes(
4450 	trx_t*			trx,
4451 	dict_table_t*		old_table,
4452 	dict_table_t*		new_table,
4453 	bool			online,
4454 	dict_index_t**		indexes,
4455 	const ulint*		key_numbers,
4456 	ulint			n_indexes,
4457 	struct TABLE*		table,
4458 	const dtuple_t*		add_cols,
4459 	const ulint*		col_map,
4460 	ulint			add_autoinc,
4461 	ib_sequence_t&		sequence,
4462 	bool			skip_pk_sort,
4463 	ut_stage_alter_t*	stage,
4464 	const dict_add_v_col_t*	add_v,
4465 	struct TABLE*		eval_table,
4466 	row_prebuilt_t*		prebuilt)
4467 {
4468 	merge_file_t*		merge_files;
4469 	row_merge_block_t*	block;
4470 	ut_new_pfx_t		block_pfx;
4471 	ut_new_pfx_t		crypt_pfx;
4472 	ulint			i;
4473 	ulint			j;
4474 	dberr_t			error;
4475 	int			tmpfd = -1;
4476 	dict_index_t*		fts_sort_idx = NULL;
4477 	fts_psort_t*		psort_info = NULL;
4478 	fts_psort_t*		merge_info = NULL;
4479 	int64_t			sig_count = 0;
4480 	bool			fts_psort_initiated = false;
4481 	DBUG_ENTER("row_merge_build_indexes");
4482 
4483 	ut_ad(!srv_read_only_mode);
4484 	ut_ad((old_table == new_table) == !col_map);
4485 	ut_ad(!add_cols || col_map);
4486 
4487 	stage->begin_phase_read_pk(skip_pk_sort && new_table != old_table
4488 				   ? n_indexes - 1
4489 				   : n_indexes);
4490 
4491 	/* Allocate memory for merge file data structure and initialize
4492 	fields */
4493 
4494 	ut_allocator<row_merge_block_t>	alloc(mem_key_row_merge_sort);
4495 
4496 	/* This will allocate "3 * srv_sort_buf_size" elements of type
4497 	row_merge_block_t. The latter is defined as byte. */
4498 	block = alloc.allocate_large(3 * srv_sort_buf_size, &block_pfx, false);
4499 
4500 	if (block == NULL) {
4501 		DBUG_RETURN(DB_OUT_OF_MEMORY);
4502 	}
4503 
4504 	/* If online logs are set to be encrypted, allocate additional buffer
4505 	for encryption/decryption. */
4506 	row_merge_block_t* crypt_block = NULL;
4507 
4508 	if (log_tmp_is_encrypted()) {
4509 		crypt_block = static_cast<row_merge_block_t*>(
4510 			alloc.allocate_large(
4511 				3 * srv_sort_buf_size, &crypt_pfx, false));
4512 
4513 		if (crypt_block == NULL) {
4514 			DBUG_RETURN(DB_OUT_OF_MEMORY);
4515 		}
4516 	}
4517 
4518 	trx_start_if_not_started_xa(trx, true);
4519 
4520 	/* Check if we need a flush observer to flush dirty pages.
4521 	Since we disable redo logging in bulk load, so we should flush
4522 	dirty pages before online log apply, because online log apply enables
4523 	redo logging(we can do further optimization here).
4524 	1. online add index: flush dirty pages right before row_log_apply().
4525 	2. table rebuild: flush dirty pages before row_log_table_apply().
4526 
4527 	we use bulk load to create all types of indexes except spatial index,
4528 	for which redo logging is enabled. If we create only spatial indexes,
4529 	we don't need to flush dirty pages at all. */
4530 	bool	need_flush_observer = (old_table != new_table);
4531 
4532 	for (i = 0; i < n_indexes; i++) {
4533 		if (!dict_index_is_spatial(indexes[i])) {
4534 			need_flush_observer = true;
4535 		}
4536 	}
4537 
4538 	FlushObserver*	flush_observer = NULL;
4539 	if (need_flush_observer) {
4540 		flush_observer = UT_NEW_NOKEY(
4541 			FlushObserver(new_table->space, trx, stage));
4542 
4543 		trx_set_flush_observer(trx, flush_observer);
4544 	}
4545 
4546 	merge_files = static_cast<merge_file_t*>(
4547 		ut_malloc_nokey(n_indexes * sizeof *merge_files));
4548 
4549 	/* Initialize all the merge file descriptors, so that we
4550 	don't call row_merge_file_destroy() on uninitialized
4551 	merge file descriptor */
4552 
4553 	for (i = 0; i < n_indexes; i++) {
4554 		merge_files[i].fd = -1;
4555 	}
4556 
4557 	for (i = 0; i < n_indexes; i++) {
4558 		if (indexes[i]->type & DICT_FTS) {
4559 			ibool	opt_doc_id_size = FALSE;
4560 
4561 			/* To build FTS index, we would need to extract
4562 			doc's word, Doc ID, and word's position, so
4563 			we need to build a "fts sort index" indexing
4564 			on above three 'fields' */
4565 			fts_sort_idx = row_merge_create_fts_sort_index(
4566 				indexes[i], old_table, &opt_doc_id_size);
4567 
4568 			row_merge_dup_t*	dup
4569 				= static_cast<row_merge_dup_t*>(
4570 					ut_malloc_nokey(sizeof *dup));
4571 			dup->index = fts_sort_idx;
4572 			dup->table = table;
4573 			dup->col_map = col_map;
4574 			dup->n_dup = 0;
4575 
4576 			row_fts_psort_info_init(
4577 				trx, dup, new_table, opt_doc_id_size,
4578 				&psort_info, &merge_info);
4579 
4580 			/* We need to ensure that we free the resources
4581 			allocated */
4582 			fts_psort_initiated = true;
4583 		}
4584 	}
4585 
4586 	/* Reset the MySQL row buffer that is used when reporting
4587 	duplicate keys. */
4588 	innobase_rec_reset(table);
4589 
4590 	if (!old_table->is_readable() ||
4591 	    !new_table->is_readable()) {
4592 		error = DB_DECRYPTION_FAILED;
4593 		ib::warn() << "Table %s is encrypted but encryption service or"
4594 			" used key_id is not available. "
4595 			" Can't continue reading table.";
4596 		goto func_exit;
4597 	}
4598 
4599 	/* Read clustered index of the table and create files for
4600 	secondary index entries for merge sort */
4601 	error = row_merge_read_clustered_index(
4602 		trx, table, old_table, new_table, online, indexes,
4603 		fts_sort_idx, psort_info, merge_files, key_numbers,
4604 		n_indexes, add_cols, add_v, col_map, add_autoinc,
4605 		sequence, block, crypt_block, skip_pk_sort, &tmpfd, stage,
4606 		eval_table, prebuilt);
4607 
4608 	stage->end_phase_read_pk();
4609 
4610 	if (error != DB_SUCCESS) {
4611 
4612 		goto func_exit;
4613 	}
4614 
4615 	DEBUG_SYNC_C("row_merge_after_scan");
4616 
4617 	/* Now we have files containing index entries ready for
4618 	sorting and inserting. */
4619 
4620 	for (i = 0; i < n_indexes; i++) {
4621 		dict_index_t*	sort_idx = indexes[i];
4622 
4623 		if (dict_index_is_spatial(sort_idx)) {
4624 			continue;
4625 		}
4626 
4627 		if (indexes[i]->type & DICT_FTS) {
4628 			os_event_t	fts_parallel_merge_event;
4629 
4630 			sort_idx = fts_sort_idx;
4631 
4632 			fts_parallel_merge_event
4633 				= merge_info[0].psort_common->merge_event;
4634 
4635 			if (FTS_PLL_MERGE) {
4636 				ulint	trial_count = 0;
4637 				bool	all_exit = false;
4638 
4639 				os_event_reset(fts_parallel_merge_event);
4640 				row_fts_start_parallel_merge(merge_info);
4641 wait_again:
4642 				os_event_wait_time_low(
4643 					fts_parallel_merge_event, 1000000,
4644 					sig_count);
4645 
4646 				for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
4647 					if (merge_info[j].child_status
4648 					    != FTS_CHILD_COMPLETE
4649 					    && merge_info[j].child_status
4650 					    != FTS_CHILD_EXITING) {
4651 						sig_count = os_event_reset(
4652 						fts_parallel_merge_event);
4653 
4654 						goto wait_again;
4655 					}
4656 				}
4657 
4658 				/* Now all children should complete, wait
4659 				a bit until they all finish using event */
4660 				while (!all_exit && trial_count < 10000) {
4661 					all_exit = true;
4662 
4663 					for (j = 0; j < FTS_NUM_AUX_INDEX;
4664 					     j++) {
4665 						if (merge_info[j].child_status
4666 						    != FTS_CHILD_EXITING) {
4667 							all_exit = false;
4668 							os_thread_sleep(1000);
4669 							break;
4670 						}
4671 					}
4672 					trial_count++;
4673 				}
4674 
4675 				if (!all_exit) {
4676 					ib::error() << "Not all child merge"
4677 						" threads exited when creating"
4678 						" FTS index '"
4679 						<< indexes[i]->name << "'";
4680 				} else {
4681 					for (j = 0; j < FTS_NUM_AUX_INDEX;
4682 					     j++) {
4683 
4684 						os_thread_join(merge_info[j]
4685 							       .thread_hdl);
4686 					}
4687 				}
4688 			} else {
4689 				/* This cannot report duplicates; an
4690 				assertion would fail in that case. */
4691 				error = row_fts_merge_insert(
4692 					sort_idx, new_table,
4693 					psort_info, 0);
4694 			}
4695 
4696 #ifdef FTS_INTERNAL_DIAG_PRINT
4697 			DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n");
4698 #endif
4699 		} else if (merge_files[i].fd >= 0) {
4700 			row_merge_dup_t	dup = {
4701 				sort_idx, table, col_map, 0};
4702 
4703 			error = row_merge_sort(
4704 				trx, &dup, &merge_files[i], block, crypt_block,
4705 				new_table->space, &tmpfd, stage);
4706 
4707 			if (error == DB_SUCCESS) {
4708 				BtrBulk	btr_bulk(sort_idx, trx->id,
4709 						 flush_observer);
4710 				btr_bulk.init();
4711 
4712 				error = row_merge_insert_index_tuples(
4713 					trx->id, sort_idx, old_table,
4714 					merge_files[i].fd, block, crypt_block,
4715 					new_table->space,
4716 					NULL, &btr_bulk, stage);
4717 
4718 				error = btr_bulk.finish(error);
4719 			}
4720 		}
4721 
4722 		/* Close the temporary file to free up space. */
4723 		row_merge_file_destroy(&merge_files[i]);
4724 
4725 		if (indexes[i]->type & DICT_FTS) {
4726 			row_fts_psort_info_destroy(psort_info, merge_info);
4727 			fts_psort_initiated = false;
4728 		} else if (error != DB_SUCCESS || !online) {
4729 			/* Do not apply any online log. */
4730 		} else if (old_table != new_table) {
4731 			ut_ad(!sort_idx->online_log);
4732 			ut_ad(sort_idx->online_status
4733 			      == ONLINE_INDEX_COMPLETE);
4734 		} else {
4735 			ut_ad(need_flush_observer);
4736 
4737 			flush_observer->flush();
4738 			row_merge_write_redo(indexes[i]);
4739 
4740 			DEBUG_SYNC_C("row_log_apply_before");
4741 			error = row_log_apply(trx, sort_idx, table, stage);
4742 			DEBUG_SYNC_C("row_log_apply_after");
4743 		}
4744 
4745 		if (error != DB_SUCCESS) {
4746 			trx->error_key_num = key_numbers[i];
4747 			goto func_exit;
4748 		}
4749 
4750 		if (indexes[i]->type & DICT_FTS && fts_enable_diag_print) {
4751 			ib::info() << "Finished building full-text index "
4752 				<< indexes[i]->name;
4753 		}
4754 	}
4755 
4756 func_exit:
4757 	DBUG_EXECUTE_IF(
4758 		"ib_build_indexes_too_many_concurrent_trxs",
4759 		error = DB_TOO_MANY_CONCURRENT_TRXS;
4760 		trx->error_state = error;);
4761 
4762 	if (fts_psort_initiated) {
4763 		/* Clean up FTS psort related resource */
4764 		row_fts_psort_info_destroy(psort_info, merge_info);
4765 		fts_psort_initiated = false;
4766 	}
4767 
4768 	row_merge_file_destroy_low(tmpfd);
4769 
4770 	for (i = 0; i < n_indexes; i++) {
4771 		row_merge_file_destroy(&merge_files[i]);
4772 	}
4773 
4774 	if (fts_sort_idx) {
4775 		dict_mem_index_free(fts_sort_idx);
4776 	}
4777 
4778 	ut_free(merge_files);
4779 
4780 	alloc.deallocate_large(block, &block_pfx);
4781 
4782 	if (crypt_block) {
4783 		alloc.deallocate_large(crypt_block, &crypt_pfx);
4784 	}
4785 
4786 	DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID);
4787 
4788 	if (online && old_table == new_table && error != DB_SUCCESS) {
4789 		/* On error, flag all online secondary index creation
4790 		as aborted. */
4791 		for (i = 0; i < n_indexes; i++) {
4792 			ut_ad(!(indexes[i]->type & DICT_FTS));
4793 			ut_ad(!indexes[i]->is_committed());
4794 			ut_ad(!dict_index_is_clust(indexes[i]));
4795 
4796 			/* Completed indexes should be dropped as
4797 			well, and indexes whose creation was aborted
4798 			should be dropped from the persistent
4799 			storage. However, at this point we can only
4800 			set some flags in the not-yet-published
4801 			indexes. These indexes will be dropped later
4802 			in row_merge_drop_indexes(), called by
4803 			rollback_inplace_alter_table(). */
4804 
4805 			switch (dict_index_get_online_status(indexes[i])) {
4806 			case ONLINE_INDEX_COMPLETE:
4807 				break;
4808 			case ONLINE_INDEX_CREATION:
4809 				rw_lock_x_lock(
4810 					dict_index_get_lock(indexes[i]));
4811 				row_log_abort_sec(indexes[i]);
4812 				indexes[i]->type |= DICT_CORRUPT;
4813 				rw_lock_x_unlock(
4814 					dict_index_get_lock(indexes[i]));
4815 				new_table->drop_aborted = TRUE;
4816 				/* fall through */
4817 			case ONLINE_INDEX_ABORTED_DROPPED:
4818 			case ONLINE_INDEX_ABORTED:
4819 				MONITOR_ATOMIC_INC(
4820 					MONITOR_BACKGROUND_DROP_INDEX);
4821 			}
4822 		}
4823 	}
4824 
4825 	DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
4826 
4827 	if (flush_observer != NULL) {
4828 		ut_ad(need_flush_observer);
4829 
4830 		DBUG_EXECUTE_IF("ib_index_build_fail_before_flush",
4831 			error = DB_FAIL;
4832 		);
4833 
4834 		if (error != DB_SUCCESS) {
4835 			flush_observer->interrupted();
4836 		}
4837 
4838 		flush_observer->flush();
4839 
4840 		UT_DELETE(flush_observer);
4841 
4842 		if (trx_is_interrupted(trx)) {
4843 			error = DB_INTERRUPTED;
4844 		}
4845 
4846 		if (error == DB_SUCCESS && old_table != new_table) {
4847 			for (const dict_index_t* index
4848 				     = dict_table_get_first_index(new_table);
4849 			     index != NULL;
4850 			     index = dict_table_get_next_index(index)) {
4851 				row_merge_write_redo(index);
4852 			}
4853 		}
4854 	}
4855 
4856 	DBUG_RETURN(error);
4857 }
4858