1 /*****************************************************************************
2 
3 Copyright (c) 2005, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0merge.cc
29 New index creation routines using a merge sort
30 
31 Created 12/4/2005 Jan Lindstrom
32 Completed by Sunny Bains and Marko Makela
33 *******************************************************/
34 
35 #include <math.h>
36 
37 #include "ha_prototypes.h"
38 
39 #include "row0merge.h"
40 #include "row0ext.h"
41 #include "row0log.h"
42 #include "row0ins.h"
43 #include "row0sel.h"
44 #include "dict0crea.h"
45 #include "trx0purge.h"
46 #include "lock0lock.h"
47 #include "pars0pars.h"
48 #include "ut0sort.h"
49 #include "row0ftsort.h"
50 #include "row0import.h"
51 #include "handler0alter.h"
52 #include "btr0bulk.h"
53 #include "fsp0sysspace.h"
54 #include "ut0new.h"
55 #include "ut0stage.h"
56 
57 /* Ignore posix_fadvise() on those platforms where it does not exist */
58 #if defined _WIN32
59 # define posix_fadvise(fd, offset, len, advice) /* nothing */
60 #endif /* _WIN32 */
61 
62 /* Whether to disable file system cache */
63 char	srv_disable_sort_file_cache;
64 
65 /** Class that caches index row tuples made from a single cluster
66 index page scan, and then insert into corresponding index tree */
67 class index_tuple_info_t {
68 public:
69 	/** constructor
70 	@param[in]	heap	memory heap
71 	@param[in]	index	index to be created */
index_tuple_info_t(mem_heap_t * heap,dict_index_t * index)72 	index_tuple_info_t(
73 		mem_heap_t*	heap,
74 		dict_index_t*	index) UNIV_NOTHROW
75 	{
76 		m_heap = heap;
77 		m_index = index;
78 		m_dtuple_vec = UT_NEW_NOKEY(idx_tuple_vec());
79 	}
80 
81 	/** destructor */
~index_tuple_info_t()82 	~index_tuple_info_t()
83 	{
84 		UT_DELETE(m_dtuple_vec);
85 	}
86 
87 	/** Get the index object
88 	@return the index object */
get_index()89 	dict_index_t*   get_index() UNIV_NOTHROW
90 	{
91 		return(m_index);
92 	}
93 
94 	/** Caches an index row into index tuple vector
95 	@param[in]	row	table row
96 	@param[in]	ext	externally stored column
97 	prefixes, or NULL */
add(const dtuple_t * row,const row_ext_t * ext)98 	void add(
99 		const dtuple_t*		row,
100 		const row_ext_t*	ext) UNIV_NOTHROW
101 	{
102 		dtuple_t*	dtuple;
103 
104 		dtuple = row_build_index_entry(row, ext, m_index, m_heap);
105 
106 		ut_ad(dtuple);
107 
108 		m_dtuple_vec->push_back(dtuple);
109 	}
110 
111 	/** Insert spatial index rows cached in vector into spatial index
112 	@param[in]	trx_id		transaction id
113 	@param[in,out]	row_heap	memory heap
114 	@param[in]	pcur		cluster index scanning cursor
115 	@param[in,out]	scan_mtr	mini-transaction for pcur
116 	@param[out]	mtr_committed	whether scan_mtr got committed
117 	@return DB_SUCCESS if successful, else error number */
insert(trx_id_t trx_id,mem_heap_t * row_heap,btr_pcur_t * pcur,mtr_t * scan_mtr,bool * mtr_committed)118 	dberr_t insert(
119 		trx_id_t		trx_id,
120 		mem_heap_t*		row_heap,
121 		btr_pcur_t*		pcur,
122 		mtr_t*			scan_mtr,
123 		bool*			mtr_committed)
124 	{
125 		big_rec_t*      big_rec;
126 		rec_t*          rec;
127 		btr_cur_t       ins_cur;
128 		mtr_t           mtr;
129 		rtr_info_t      rtr_info;
130 		ulint*		ins_offsets = NULL;
131 		dberr_t		error = DB_SUCCESS;
132 		dtuple_t*	dtuple;
133 		ulint		count = 0;
134 		const ulint	flag = BTR_NO_UNDO_LOG_FLAG
135 				       | BTR_NO_LOCKING_FLAG
136 				       | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG;
137 
138 		ut_ad(dict_index_is_spatial(m_index));
139 
140 		DBUG_EXECUTE_IF("row_merge_instrument_log_check_flush",
141 			log_sys->check_flush_or_checkpoint = true;
142 		);
143 
144 		for (idx_tuple_vec::iterator it = m_dtuple_vec->begin();
145 		     it != m_dtuple_vec->end();
146 		     ++it) {
147 			dtuple = *it;
148 			ut_ad(dtuple);
149 
150 			if (log_sys->check_flush_or_checkpoint) {
151 				if (!(*mtr_committed)) {
152 					btr_pcur_move_to_prev_on_page(pcur);
153 					btr_pcur_store_position(pcur, scan_mtr);
154 					mtr_commit(scan_mtr);
155 					*mtr_committed = true;
156 				}
157 
158 				log_free_check();
159 			}
160 
161 			mtr.start();
162 			mtr.set_named_space(m_index->space);
163 
164 			ins_cur.index = m_index;
165 			rtr_init_rtr_info(&rtr_info, false, &ins_cur, m_index,
166 					  false);
167 			rtr_info_update_btr(&ins_cur, &rtr_info);
168 
169 			btr_cur_search_to_nth_level(m_index, 0, dtuple,
170 						    PAGE_CUR_RTREE_INSERT,
171 						    BTR_MODIFY_LEAF, &ins_cur,
172 						    0, __FILE__, __LINE__,
173 						    &mtr);
174 
175 			/* It need to update MBR in parent entry,
176 			so change search mode to BTR_MODIFY_TREE */
177 			if (rtr_info.mbr_adj) {
178 				mtr_commit(&mtr);
179 				rtr_clean_rtr_info(&rtr_info, true);
180 				rtr_init_rtr_info(&rtr_info, false, &ins_cur,
181 						  m_index, false);
182 				rtr_info_update_btr(&ins_cur, &rtr_info);
183 				mtr_start(&mtr);
184 				mtr.set_named_space(m_index->space);
185 				btr_cur_search_to_nth_level(
186 					m_index, 0, dtuple,
187 					PAGE_CUR_RTREE_INSERT,
188 					BTR_MODIFY_TREE, &ins_cur, 0,
189 					__FILE__, __LINE__, &mtr);
190 			}
191 
192 			error = btr_cur_optimistic_insert(
193 				flag, &ins_cur, &ins_offsets, &row_heap,
194 				dtuple, &rec, &big_rec, 0, NULL, &mtr);
195 
196 			if (error == DB_FAIL) {
197 				ut_ad(!big_rec);
198 				mtr.commit();
199 				mtr.start();
200 				mtr.set_named_space(m_index->space);
201 
202 				rtr_clean_rtr_info(&rtr_info, true);
203 				rtr_init_rtr_info(&rtr_info, false,
204 						  &ins_cur, m_index, false);
205 
206 				rtr_info_update_btr(&ins_cur, &rtr_info);
207 				btr_cur_search_to_nth_level(
208 					m_index, 0, dtuple,
209 					PAGE_CUR_RTREE_INSERT,
210 					BTR_MODIFY_TREE,
211 					&ins_cur, 0,
212 					__FILE__, __LINE__, &mtr);
213 
214 
215 				error = btr_cur_pessimistic_insert(
216 						flag, &ins_cur, &ins_offsets,
217 						&row_heap, dtuple, &rec,
218 						&big_rec, 0, NULL, &mtr);
219 			}
220 
221 			DBUG_EXECUTE_IF(
222 				"row_merge_ins_spatial_fail",
223 				error = DB_FAIL;
224 			);
225 
226 			if (error == DB_SUCCESS) {
227 				if (rtr_info.mbr_adj) {
228 					error = rtr_ins_enlarge_mbr(
229 							&ins_cur, NULL, &mtr);
230 				}
231 
232 				if (error == DB_SUCCESS) {
233 					page_update_max_trx_id(
234 						btr_cur_get_block(&ins_cur),
235 						btr_cur_get_page_zip(&ins_cur),
236 						trx_id, &mtr);
237 				}
238 			}
239 
240 			mtr_commit(&mtr);
241 
242 			rtr_clean_rtr_info(&rtr_info, true);
243 			count++;
244 		}
245 
246 		m_dtuple_vec->clear();
247 
248 		return(error);
249 	}
250 
251 private:
252 	/** Cache index rows made from a cluster index scan. Usually
253 	for rows on single cluster index page */
254 	typedef std::vector<dtuple_t*, ut_allocator<dtuple_t*> >
255 		idx_tuple_vec;
256 
257 	/** vector used to cache index rows made from cluster index scan */
258 	idx_tuple_vec*		m_dtuple_vec;
259 
260 	/** the index being built */
261 	dict_index_t*		m_index;
262 
263 	/** memory heap for creating index tuples */
264 	mem_heap_t*		m_heap;
265 };
266 
267 /* Maximum pending doc memory limit in bytes for a fts tokenization thread */
268 #define FTS_PENDING_DOC_MEMORY_LIMIT	1000000
269 
270 /** Insert sorted data tuples to the index.
271 @param[in]	trx_id		transaction identifier
272 @param[in]	index		index to be inserted
273 @param[in]	old_table	old table
274 @param[in]	fd		file descriptor
275 @param[in,out]	block		file buffer
276 @param[in]	row_buf		row_buf the sorted data tuples,
277 or NULL if fd, block will be used instead
278 @param[in,out]	btr_bulk	btr bulk instance
279 @param[in,out]	stage		performance schema accounting object, used by
280 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
281 and then stage->inc() will be called for each record that is processed.
282 @return DB_SUCCESS or error number */
283 static	MY_ATTRIBUTE((warn_unused_result))
284 dberr_t
285 row_merge_insert_index_tuples(
286 	trx_id_t		trx_id,
287 	dict_index_t*		index,
288 	const dict_table_t*	old_table,
289 	int			fd,
290 	row_merge_block_t*	block,
291 	const row_merge_buf_t*	row_buf,
292 	BtrBulk*		btr_bulk,
293 	ut_stage_alter_t*	stage = NULL);
294 
295 /******************************************************//**
296 Encode an index record. */
297 static
298 void
row_merge_buf_encode(byte ** b,const dict_index_t * index,const mtuple_t * entry,ulint n_fields)299 row_merge_buf_encode(
300 /*=================*/
301 	byte**			b,		/*!< in/out: pointer to
302 						current end of output buffer */
303 	const dict_index_t*	index,		/*!< in: index */
304 	const mtuple_t*		entry,		/*!< in: index fields
305 						of the record to encode */
306 	ulint			n_fields)	/*!< in: number of fields
307 						in the entry */
308 {
309 	ulint	size;
310 	ulint	extra_size;
311 
312 	size = rec_get_converted_size_temp(
313 		index, entry->fields, n_fields, NULL, &extra_size);
314 	ut_ad(size >= extra_size);
315 
316 	/* Encode extra_size + 1 */
317 	if (extra_size + 1 < 0x80) {
318 		*(*b)++ = (byte) (extra_size + 1);
319 	} else {
320 		ut_ad((extra_size + 1) < 0x8000);
321 		*(*b)++ = (byte) (0x80 | ((extra_size + 1) >> 8));
322 		*(*b)++ = (byte) (extra_size + 1);
323 	}
324 
325 	rec_convert_dtuple_to_temp(*b + extra_size, index,
326 				   entry->fields, n_fields, NULL);
327 
328 	*b += size;
329 }
330 
331 /******************************************************//**
332 Allocate a sort buffer.
333 @return own: sort buffer */
334 static MY_ATTRIBUTE((malloc))
335 row_merge_buf_t*
row_merge_buf_create_low(mem_heap_t * heap,dict_index_t * index,ulint max_tuples,ulint buf_size)336 row_merge_buf_create_low(
337 /*=====================*/
338 	mem_heap_t*	heap,		/*!< in: heap where allocated */
339 	dict_index_t*	index,		/*!< in: secondary index */
340 	ulint		max_tuples,	/*!< in: maximum number of
341 					data tuples */
342 	ulint		buf_size)	/*!< in: size of the buffer,
343 					in bytes */
344 {
345 	row_merge_buf_t*	buf;
346 
347 	ut_ad(max_tuples > 0);
348 
349 	ut_ad(max_tuples <= srv_sort_buf_size);
350 
351 	buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
352 	buf->heap = heap;
353 	buf->index = index;
354 	buf->max_tuples = max_tuples;
355 	buf->tuples = static_cast<mtuple_t*>(
356 		ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
357 	buf->tmp_tuples = buf->tuples + max_tuples;
358 
359 	return(buf);
360 }
361 
362 /******************************************************//**
363 Allocate a sort buffer.
364 @return own: sort buffer */
365 row_merge_buf_t*
row_merge_buf_create(dict_index_t * index)366 row_merge_buf_create(
367 /*=================*/
368 	dict_index_t*	index)	/*!< in: secondary index */
369 {
370 	row_merge_buf_t*	buf;
371 	ulint			max_tuples;
372 	ulint			buf_size;
373 	mem_heap_t*		heap;
374 
375 	max_tuples = static_cast<ulint>(srv_sort_buf_size)
376 		/ ut_max(static_cast<ulint>(1),
377 			 dict_index_get_min_size(index));
378 
379 	buf_size = (sizeof *buf);
380 
381 	heap = mem_heap_create(buf_size);
382 
383 	buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
384 
385 	return(buf);
386 }
387 
388 /******************************************************//**
389 Empty a sort buffer.
390 @return sort buffer */
391 row_merge_buf_t*
row_merge_buf_empty(row_merge_buf_t * buf)392 row_merge_buf_empty(
393 /*================*/
394 	row_merge_buf_t*	buf)	/*!< in,own: sort buffer */
395 {
396 	ulint		buf_size	= sizeof *buf;
397 	ulint		max_tuples	= buf->max_tuples;
398 	mem_heap_t*	heap		= buf->heap;
399 	dict_index_t*	index		= buf->index;
400 	mtuple_t*	tuples		= buf->tuples;
401 
402 	mem_heap_empty(heap);
403 
404 	buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
405 	buf->heap = heap;
406 	buf->index = index;
407 	buf->max_tuples = max_tuples;
408 	buf->tuples = tuples;
409 	buf->tmp_tuples = buf->tuples + max_tuples;
410 
411 	return(buf);
412 }
413 
414 /******************************************************//**
415 Deallocate a sort buffer. */
416 void
row_merge_buf_free(row_merge_buf_t * buf)417 row_merge_buf_free(
418 /*===============*/
419 	row_merge_buf_t*	buf)	/*!< in,own: sort buffer to be freed */
420 {
421 	ut_free(buf->tuples);
422 	mem_heap_free(buf->heap);
423 }
424 
425 /** Convert the field data from compact to redundant format.
426 @param[in]	row_field	field to copy from
427 @param[out]	field		field to copy to
428 @param[in]	len		length of the field data
429 @param[in]	zip_size	compressed BLOB page size,
430 				zero for uncompressed BLOBs
431 @param[in,out]	heap		memory heap where to allocate data when
432 				converting to ROW_FORMAT=REDUNDANT, or NULL
433 				when not to invoke
434 				row_merge_buf_redundant_convert(). */
435 static
436 void
row_merge_buf_redundant_convert(const dfield_t * row_field,dfield_t * field,ulint len,const page_size_t & page_size,mem_heap_t * heap)437 row_merge_buf_redundant_convert(
438 	const dfield_t*		row_field,
439 	dfield_t*		field,
440 	ulint			len,
441 	const page_size_t&	page_size,
442 	mem_heap_t*		heap)
443 {
444 	ut_ad(DATA_MBMINLEN(field->type.mbminmaxlen) == 1);
445 	ut_ad(DATA_MBMAXLEN(field->type.mbminmaxlen) > 1);
446 
447 	byte*		buf = (byte*) mem_heap_alloc(heap, len);
448 	ulint		field_len = row_field->len;
449 	ut_ad(field_len <= len);
450 
451 	if (row_field->ext) {
452 		const byte*	field_data = static_cast<byte*>(
453 			dfield_get_data(row_field));
454 		ulint		ext_len;
455 
456 		ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
457 		ut_a(memcmp(field_data + field_len - BTR_EXTERN_FIELD_REF_SIZE,
458 			    field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
459 
460 		byte*	data = btr_copy_externally_stored_field(
461 			&ext_len, field_data, page_size, field_len, heap);
462 
463 		ut_ad(ext_len < len);
464 
465 		memcpy(buf, data, ext_len);
466 		field_len = ext_len;
467 	} else {
468 		memcpy(buf, row_field->data, field_len);
469 	}
470 
471 	memset(buf + field_len, 0x20, len - field_len);
472 
473 	dfield_set_data(field, buf, len);
474 }
475 
476 /** Insert a data tuple into a sort buffer.
477 @param[in,out]	buf		sort buffer
478 @param[in]	fts_index	fts index to be created
479 @param[in]	old_table	original table
480 @param[in]	new_table	new table
481 @param[in,out]	psort_info	parallel sort info
482 @param[in]	row		table row
483 @param[in]	ext		cache of externally stored
484 				column prefixes, or NULL
485 @param[in,out]	doc_id		Doc ID if we are creating
486 				FTS index
487 @param[in,out]	conv_heap	memory heap where to allocate data when
488 				converting to ROW_FORMAT=REDUNDANT, or NULL
489 				when not to invoke
490 				row_merge_buf_redundant_convert()
491 @param[in,out]	err		set if error occurs
492 @param[in,out]	v_heap		heap memory to process data for virtual column
493 @param[in,out]	my_table	mysql table object
494 @param[in]	trx		transaction object
495 @return number of rows added, 0 if out of space */
496 static
497 ulint
row_merge_buf_add(row_merge_buf_t * buf,dict_index_t * fts_index,const dict_table_t * old_table,const dict_table_t * new_table,fts_psort_t * psort_info,const dtuple_t * row,const row_ext_t * ext,doc_id_t * doc_id,mem_heap_t * conv_heap,dberr_t * err,mem_heap_t ** v_heap,TABLE * my_table,trx_t * trx)498 row_merge_buf_add(
499 	row_merge_buf_t*	buf,
500 	dict_index_t*		fts_index,
501 	const dict_table_t*	old_table,
502 	const dict_table_t*	new_table,
503 	fts_psort_t*		psort_info,
504 	const dtuple_t*		row,
505 	const row_ext_t*	ext,
506 	doc_id_t*		doc_id,
507 	mem_heap_t*		conv_heap,
508 	dberr_t*		err,
509 	mem_heap_t**		v_heap,
510 	TABLE*			my_table,
511 	trx_t*			trx)
512 {
513 	ulint			i;
514 	const dict_index_t*	index;
515 	mtuple_t*		entry;
516 	dfield_t*		field;
517 	const dict_field_t*	ifield;
518 	ulint			n_fields;
519 	ulint			data_size;
520 	ulint			extra_size;
521 	ulint			bucket = 0;
522 	doc_id_t		write_doc_id;
523 	ulint			n_row_added = 0;
524 	DBUG_ENTER("row_merge_buf_add");
525 
526 	if (buf->n_tuples >= buf->max_tuples) {
527 		DBUG_RETURN(0);
528 	}
529 
530 	DBUG_EXECUTE_IF(
531 		"ib_row_merge_buf_add_two",
532 		if (buf->n_tuples >= 2) DBUG_RETURN(0););
533 
534 	UNIV_PREFETCH_R(row->fields);
535 
536 	/* If we are building FTS index, buf->index points to
537 	the 'fts_sort_idx', and real FTS index is stored in
538 	fts_index */
539 	index = (buf->index->type & DICT_FTS) ? fts_index : buf->index;
540 
541 	/* create spatial index should not come here */
542 	ut_ad(!dict_index_is_spatial(index));
543 
544 	n_fields = dict_index_get_n_fields(index);
545 
546 	entry = &buf->tuples[buf->n_tuples];
547 	field = entry->fields = static_cast<dfield_t*>(
548 		mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
549 
550 	data_size = 0;
551 	extra_size = UT_BITS_IN_BYTES(index->n_nullable);
552 
553 	ifield = dict_index_get_nth_field(index, 0);
554 
555 	for (i = 0; i < n_fields; i++, field++, ifield++) {
556 		ulint			len;
557 		const dict_col_t*	col;
558 		const dict_v_col_t*	v_col = NULL;
559 		ulint			col_no;
560 		ulint			fixed_len;
561 		const dfield_t*		row_field;
562 
563 		col = ifield->col;
564 		if (dict_col_is_virtual(col)) {
565 			v_col = reinterpret_cast<const dict_v_col_t*>(col);
566 		}
567 
568 		col_no = dict_col_get_no(col);
569 
570 		/* Process the Doc ID column */
571 		if (*doc_id > 0
572 		    && col_no == index->table->fts->doc_col
573 		    && !dict_col_is_virtual(col)) {
574 			fts_write_doc_id((byte*) &write_doc_id, *doc_id);
575 
576 			/* Note: field->data now points to a value on the
577 			stack: &write_doc_id after dfield_set_data(). Because
578 			there is only one doc_id per row, it shouldn't matter.
579 			We allocate a new buffer before we leave the function
580 			later below. */
581 
582 			dfield_set_data(
583 				field, &write_doc_id, sizeof(write_doc_id));
584 
585 			field->type.mtype = ifield->col->mtype;
586 			field->type.prtype = ifield->col->prtype;
587 			field->type.mbminmaxlen = DATA_MBMINMAXLEN(0, 0);
588 			field->type.len = ifield->col->len;
589 		} else {
590 			/* Use callback to get the virtual column value */
591 			if (dict_col_is_virtual(col)) {
592 				dict_index_t*	clust_index
593 					= dict_table_get_first_index(new_table);
594 
595 				row_field = innobase_get_computed_value(
596 					row, v_col, clust_index,
597 					v_heap, NULL, ifield, trx->mysql_thd,
598 					my_table, old_table, NULL, NULL);
599 
600 				if (row_field == NULL) {
601 					*err = DB_COMPUTE_VALUE_FAILED;
602 					DBUG_RETURN(0);
603 				}
604 				dfield_copy(field, row_field);
605 			} else {
606 				row_field = dtuple_get_nth_field(row, col_no);
607 				dfield_copy(field, row_field);
608 			}
609 
610 
611 			/* Tokenize and process data for FTS */
612 			if (index->type & DICT_FTS) {
613 				fts_doc_item_t*	doc_item;
614 				byte*		value;
615 				void*		ptr;
616 				const ulint	max_trial_count = 10000;
617 				ulint		trial_count = 0;
618 
619 				/* fetch Doc ID if it already exists
620 				in the row, and not supplied by the
621 				caller. Even if the value column is
622 				NULL, we still need to get the Doc
623 				ID so to maintain the correct max
624 				Doc ID */
625 				if (*doc_id == 0) {
626 					const dfield_t*	doc_field;
627 					doc_field = dtuple_get_nth_field(
628 						row,
629 						index->table->fts->doc_col);
630 					*doc_id = (doc_id_t) mach_read_from_8(
631 						static_cast<byte*>(
632 						dfield_get_data(doc_field)));
633 
634 					if (*doc_id == 0) {
635 						ib::warn() << "FTS Doc ID is"
636 							" zero. Record"
637 							" skipped";
638 						DBUG_RETURN(0);
639 					}
640 				}
641 
642 				if (dfield_is_null(field)) {
643 					n_row_added = 1;
644 					continue;
645 				}
646 
647 				ptr = ut_malloc_nokey(sizeof(*doc_item)
648 						      + field->len);
649 
650 				doc_item = static_cast<fts_doc_item_t*>(ptr);
651 				value = static_cast<byte*>(ptr)
652 					+ sizeof(*doc_item);
653 				memcpy(value, field->data, field->len);
654 				field->data = value;
655 
656 				doc_item->field = field;
657 				doc_item->doc_id = *doc_id;
658 
659 				bucket = *doc_id % fts_sort_pll_degree;
660 
661 				/* Add doc item to fts_doc_list */
662 				mutex_enter(&psort_info[bucket].mutex);
663 
664 				if (psort_info[bucket].error == DB_SUCCESS) {
665 					UT_LIST_ADD_LAST(
666 						psort_info[bucket].fts_doc_list,
667 						doc_item);
668 					psort_info[bucket].memory_used +=
669 						sizeof(*doc_item) + field->len;
670 				} else {
671 					ut_free(doc_item);
672 				}
673 
674 				mutex_exit(&psort_info[bucket].mutex);
675 
676 				/* Sleep when memory used exceeds limit*/
677 				while (psort_info[bucket].memory_used
678 				       > FTS_PENDING_DOC_MEMORY_LIMIT
679 				       && trial_count++ < max_trial_count) {
680 					os_thread_sleep(1000);
681 				}
682 
683 				n_row_added = 1;
684 				continue;
685 			}
686 
687 			if (field->len != UNIV_SQL_NULL
688 			    && col->mtype == DATA_MYSQL
689 			    && col->len != field->len) {
690 				if (conv_heap != NULL) {
691 					row_merge_buf_redundant_convert(
692 						row_field, field, col->len,
693 						dict_table_page_size(old_table),
694 						conv_heap);
695 				} else {
696 					/* Field length mismatch should not
697 					happen when rebuilding redundant row
698 					format table. */
699 					ut_ad(dict_table_is_comp(index->table));
700 				}
701 			}
702 		}
703 
704 		len = dfield_get_len(field);
705 
706 		if (dfield_is_null(field)) {
707 			ut_ad(!(col->prtype & DATA_NOT_NULL));
708 			continue;
709 		} else if (!ext) {
710 		} else if (dict_index_is_clust(index)) {
711 			/* Flag externally stored fields. */
712 			const byte*	buf = row_ext_lookup(ext, col_no,
713 							     &len);
714 			if (UNIV_LIKELY_NULL(buf)) {
715 				ut_a(buf != field_ref_zero);
716 				if (i < dict_index_get_n_unique(index)) {
717 					dfield_set_data(field, buf, len);
718 				} else {
719 					dfield_set_ext(field);
720 					len = dfield_get_len(field);
721 				}
722 			}
723 		} else if (!dict_col_is_virtual(col)) {
724 			/* Only non-virtual column are stored externally */
725 			const byte*	buf = row_ext_lookup(ext, col_no,
726 							     &len);
727 			if (UNIV_LIKELY_NULL(buf)) {
728 				ut_a(buf != field_ref_zero);
729 				dfield_set_data(field, buf, len);
730 			}
731 		}
732 
733 		/* If a column prefix index, take only the prefix */
734 
735 		if (ifield->prefix_len) {
736 			len = dtype_get_at_most_n_mbchars(
737 				col->prtype,
738 				col->mbminmaxlen,
739 				ifield->prefix_len,
740 				len,
741 				static_cast<char*>(dfield_get_data(field)));
742 			dfield_set_len(field, len);
743 		}
744 
745 		ut_ad(len <= col->len
746 		      || DATA_LARGE_MTYPE(col->mtype)
747 		      || (col->mtype == DATA_POINT
748 			  && len == DATA_MBR_LEN));
749 
750 		fixed_len = ifield->fixed_len;
751 		if (fixed_len && !dict_table_is_comp(index->table)
752 		    && DATA_MBMINLEN(col->mbminmaxlen)
753 		    != DATA_MBMAXLEN(col->mbminmaxlen)) {
754 			/* CHAR in ROW_FORMAT=REDUNDANT is always
755 			fixed-length, but in the temporary file it is
756 			variable-length for variable-length character
757 			sets. */
758 			fixed_len = 0;
759 		}
760 
761 		if (fixed_len) {
762 #ifdef UNIV_DEBUG
763 			ulint	mbminlen = DATA_MBMINLEN(col->mbminmaxlen);
764 			ulint	mbmaxlen = DATA_MBMAXLEN(col->mbminmaxlen);
765 
766 			/* len should be between size calcualted base on
767 			mbmaxlen and mbminlen */
768 			ut_ad(len <= fixed_len);
769 			ut_ad(!mbmaxlen || len >= mbminlen
770 			      * (fixed_len / mbmaxlen));
771 
772 			ut_ad(!dfield_is_ext(field));
773 #endif /* UNIV_DEBUG */
774 		} else if (dfield_is_ext(field)) {
775 			extra_size += 2;
776 		} else if (len < 128
777 			   || (!DATA_BIG_COL(col))) {
778 			extra_size++;
779 		} else {
780 			/* For variable-length columns, we look up the
781 			maximum length from the column itself.  If this
782 			is a prefix index column shorter than 256 bytes,
783 			this will waste one byte. */
784 			extra_size += 2;
785 		}
786 		data_size += len;
787 	}
788 
789 	/* If this is FTS index, we already populated the sort buffer, return
790 	here */
791 	if (index->type & DICT_FTS) {
792 		DBUG_RETURN(n_row_added);
793 	}
794 
795 #ifdef UNIV_DEBUG
796 	{
797 		ulint	size;
798 		ulint	extra;
799 
800 		size = rec_get_converted_size_temp(
801 			index, entry->fields, n_fields, NULL, &extra);
802 
803 		ut_ad(data_size + extra_size == size);
804 		ut_ad(extra_size == extra);
805 	}
806 #endif /* UNIV_DEBUG */
807 
808 	/* Add to the total size of the record in row_merge_block_t
809 	the encoded length of extra_size and the extra bytes (extra_size).
810 	See row_merge_buf_write() for the variable-length encoding
811 	of extra_size. */
812 	data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
813 
814 	/* Record size can exceed page size while converting to
815 	redundant row format. But there is assert
816 	ut_ad(size < UNIV_PAGE_SIZE) in rec_offs_data_size().
817 	It may hit the assert before attempting to insert the row. */
818 	if (conv_heap != NULL && data_size > UNIV_PAGE_SIZE) {
819 		*err = DB_TOO_BIG_RECORD;
820 	}
821 
822 	ut_ad(data_size < srv_sort_buf_size);
823 
824 	/* Reserve one byte for the end marker of row_merge_block_t. */
825 	if (buf->total_size + data_size >= srv_sort_buf_size - 1) {
826 		DBUG_RETURN(0);
827 	}
828 
829 	buf->total_size += data_size;
830 	buf->n_tuples++;
831 	n_row_added++;
832 
833 	field = entry->fields;
834 
835 	/* Copy the data fields. */
836 
837 	do {
838 		dfield_dup(field++, buf->heap);
839 	} while (--n_fields);
840 
841 	if (conv_heap != NULL) {
842 		mem_heap_empty(conv_heap);
843 	}
844 
845 	DBUG_RETURN(n_row_added);
846 }
847 
848 /*************************************************************//**
849 Report a duplicate key. */
850 void
row_merge_dup_report(row_merge_dup_t * dup,const dfield_t * entry)851 row_merge_dup_report(
852 /*=================*/
853 	row_merge_dup_t*	dup,	/*!< in/out: for reporting duplicates */
854 	const dfield_t*		entry)	/*!< in: duplicate index entry */
855 {
856 	if (!dup->n_dup++) {
857 		/* Only report the first duplicate record,
858 		but count all duplicate records. */
859 		innobase_fields_to_mysql(dup->table, dup->index, entry);
860 	}
861 }
862 
863 /*************************************************************//**
864 Compare two tuples.
865 @return positive, 0, negative if a is greater, equal, less, than b,
866 respectively */
867 static MY_ATTRIBUTE((warn_unused_result))
868 int
row_merge_tuple_cmp(ulint n_uniq,ulint n_field,const mtuple_t & a,const mtuple_t & b,row_merge_dup_t * dup)869 row_merge_tuple_cmp(
870 /*================*/
871 	ulint			n_uniq,	/*!< in: number of unique fields */
872 	ulint			n_field,/*!< in: number of fields */
873 	const mtuple_t&		a,	/*!< in: first tuple to be compared */
874 	const mtuple_t&		b,	/*!< in: second tuple to be compared */
875 	row_merge_dup_t*	dup)	/*!< in/out: for reporting duplicates,
876 					NULL if non-unique index */
877 {
878 	int		cmp;
879 	const dfield_t*	af	= a.fields;
880 	const dfield_t*	bf	= b.fields;
881 	ulint		n	= n_uniq;
882 
883 	ut_ad(n_uniq > 0);
884 	ut_ad(n_uniq <= n_field);
885 
886 	/* Compare the fields of the tuples until a difference is
887 	found or we run out of fields to compare.  If !cmp at the
888 	end, the tuples are equal. */
889 	do {
890 		cmp = cmp_dfield_dfield(af++, bf++);
891 	} while (!cmp && --n);
892 
893 	if (cmp) {
894 		return(cmp);
895 	}
896 
897 	if (dup) {
898 		/* Report a duplicate value error if the tuples are
899 		logically equal.  NULL columns are logically inequal,
900 		although they are equal in the sorting order.  Find
901 		out if any of the fields are NULL. */
902 		for (const dfield_t* df = a.fields; df != af; df++) {
903 			if (dfield_is_null(df)) {
904 				goto no_report;
905 			}
906 		}
907 
908 		row_merge_dup_report(dup, a.fields);
909 	}
910 
911 no_report:
912 	/* The n_uniq fields were equal, but we compare all fields so
913 	that we will get the same (internal) order as in the B-tree. */
914 	for (n = n_field - n_uniq + 1; --n; ) {
915 		cmp = cmp_dfield_dfield(af++, bf++);
916 		if (cmp) {
917 			return(cmp);
918 		}
919 	}
920 
921 	/* This should never be reached, except in a secondary index
922 	when creating a secondary index and a PRIMARY KEY, and there
923 	is a duplicate in the PRIMARY KEY that has not been detected
924 	yet. Internally, an index must never contain duplicates. */
925 	return(cmp);
926 }
927 
928 /** Wrapper for row_merge_tuple_sort() to inject some more context to
929 UT_SORT_FUNCTION_BODY().
930 @param tuples array of tuples that being sorted
931 @param aux work area, same size as tuples[]
932 @param low lower bound of the sorting area, inclusive
933 @param high upper bound of the sorting area, inclusive */
934 #define row_merge_tuple_sort_ctx(tuples, aux, low, high)		\
935 	row_merge_tuple_sort(n_uniq, n_field, dup, tuples, aux, low, high)
936 /** Wrapper for row_merge_tuple_cmp() to inject some more context to
937 UT_SORT_FUNCTION_BODY().
938 @param a first tuple to be compared
939 @param b second tuple to be compared
940 @return positive, 0, negative, if a is greater, equal, less, than b,
941 respectively */
942 #define row_merge_tuple_cmp_ctx(a,b)			\
943 	row_merge_tuple_cmp(n_uniq, n_field, a, b, dup)
944 
945 /**********************************************************************//**
946 Merge sort the tuple buffer in main memory. */
947 static
948 void
row_merge_tuple_sort(ulint n_uniq,ulint n_field,row_merge_dup_t * dup,mtuple_t * tuples,mtuple_t * aux,ulint low,ulint high)949 row_merge_tuple_sort(
950 /*=================*/
951 	ulint			n_uniq,	/*!< in: number of unique fields */
952 	ulint			n_field,/*!< in: number of fields */
953 	row_merge_dup_t*	dup,	/*!< in/out: reporter of duplicates
954 					(NULL if non-unique index) */
955 	mtuple_t*		tuples,	/*!< in/out: tuples */
956 	mtuple_t*		aux,	/*!< in/out: work area */
957 	ulint			low,	/*!< in: lower bound of the
958 					sorting area, inclusive */
959 	ulint			high)	/*!< in: upper bound of the
960 					sorting area, exclusive */
961 {
962 	ut_ad(n_field > 0);
963 	ut_ad(n_uniq <= n_field);
964 
965 	UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
966 			      tuples, aux, low, high, row_merge_tuple_cmp_ctx);
967 }
968 
969 /******************************************************//**
970 Sort a buffer. */
971 void
row_merge_buf_sort(row_merge_buf_t * buf,row_merge_dup_t * dup)972 row_merge_buf_sort(
973 /*===============*/
974 	row_merge_buf_t*	buf,	/*!< in/out: sort buffer */
975 	row_merge_dup_t*	dup)	/*!< in/out: reporter of duplicates
976 					(NULL if non-unique index) */
977 {
978 	ut_ad(!dict_index_is_spatial(buf->index));
979 
980 	row_merge_tuple_sort(dict_index_get_n_unique(buf->index),
981 			     dict_index_get_n_fields(buf->index),
982 			     dup,
983 			     buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
984 }
985 
986 /******************************************************//**
987 Write a buffer to a block. */
988 void
row_merge_buf_write(const row_merge_buf_t * buf,const merge_file_t * of UNIV_UNUSED,row_merge_block_t * block)989 row_merge_buf_write(
990 /*================*/
991 	const row_merge_buf_t*	buf,	/*!< in: sorted buffer */
992 	const merge_file_t*	of UNIV_UNUSED,
993 					/*!< in: output file */
994 	row_merge_block_t*	block)	/*!< out: buffer for writing to file */
995 {
996 	const dict_index_t*	index	= buf->index;
997 	ulint			n_fields= dict_index_get_n_fields(index);
998 	byte*			b	= &block[0];
999 
1000 	DBUG_ENTER("row_merge_buf_write");
1001 
1002 	for (ulint i = 0; i < buf->n_tuples; i++) {
1003 		const mtuple_t*	entry	= &buf->tuples[i];
1004 
1005 		row_merge_buf_encode(&b, index, entry, n_fields);
1006 		ut_ad(b < &block[srv_sort_buf_size]);
1007 
1008 		DBUG_PRINT("ib_merge_sort",
1009 			   ("%p,fd=%d,%lu %lu: %s",
1010 			    reinterpret_cast<const void*>(b), of->fd,
1011 			    ulong(of->offset), ulong(i),
1012 			    rec_printer(entry->fields,
1013 					n_fields).str().c_str()));
1014 	}
1015 
1016 	/* Write an "end-of-chunk" marker. */
1017 	ut_a(b < &block[srv_sort_buf_size]);
1018 	ut_a(b == &block[0] + buf->total_size);
1019 	*b++ = 0;
1020 #ifdef UNIV_DEBUG_VALGRIND
1021 	/* The rest of the block is uninitialized.  Initialize it
1022 	to avoid bogus warnings. */
1023 	memset(b, 0xff, &block[srv_sort_buf_size] - b);
1024 #endif /* UNIV_DEBUG_VALGRIND */
1025 	DBUG_PRINT("ib_merge_sort",
1026 		   ("write %p,%d,%lu EOF",
1027 		    reinterpret_cast<const void*>(b), of->fd,
1028 		    ulong(of->offset)));
1029 	DBUG_VOID_RETURN;
1030 }
1031 
1032 /******************************************************//**
1033 Create a memory heap and allocate space for row_merge_rec_offsets()
1034 and mrec_buf_t[3].
1035 @return memory heap */
1036 static
1037 mem_heap_t*
row_merge_heap_create(const dict_index_t * index,mrec_buf_t ** buf,ulint ** offsets1,ulint ** offsets2)1038 row_merge_heap_create(
1039 /*==================*/
1040 	const dict_index_t*	index,		/*!< in: record descriptor */
1041 	mrec_buf_t**		buf,		/*!< out: 3 buffers */
1042 	ulint**			offsets1,	/*!< out: offsets */
1043 	ulint**			offsets2)	/*!< out: offsets */
1044 {
1045 	ulint		i	= 1 + REC_OFFS_HEADER_SIZE
1046 		+ dict_index_get_n_fields(index);
1047 	mem_heap_t*	heap	= mem_heap_create(2 * i * sizeof **offsets1
1048 						  + 3 * sizeof **buf);
1049 
1050 	*buf = static_cast<mrec_buf_t*>(
1051 		mem_heap_alloc(heap, 3 * sizeof **buf));
1052 	*offsets1 = static_cast<ulint*>(
1053 		mem_heap_alloc(heap, i * sizeof **offsets1));
1054 	*offsets2 = static_cast<ulint*>(
1055 		mem_heap_alloc(heap, i * sizeof **offsets2));
1056 
1057 	(*offsets1)[0] = (*offsets2)[0] = i;
1058 	(*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
1059 
1060 	return(heap);
1061 }
1062 
1063 /********************************************************************//**
1064 Read a merge block from the file system.
1065 @return TRUE if request was successful, FALSE if fail */
1066 ibool
row_merge_read(int fd,ulint offset,row_merge_block_t * buf)1067 row_merge_read(
1068 /*===========*/
1069 	int			fd,	/*!< in: file descriptor */
1070 	ulint			offset,	/*!< in: offset where to read
1071 					in number of row_merge_block_t
1072 					elements */
1073 	row_merge_block_t*	buf)	/*!< out: data */
1074 {
1075 	os_offset_t	ofs = ((os_offset_t) offset) * srv_sort_buf_size;
1076 	dberr_t		err;
1077 
1078 	DBUG_ENTER("row_merge_read");
1079 	DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
1080 	DBUG_EXECUTE_IF("row_merge_read_failure", DBUG_RETURN(FALSE););
1081 
1082 	IORequest	request;
1083 
1084 	/* Merge sort pages are never compressed. */
1085 	request.disable_compression();
1086 
1087 	err = os_file_read_no_error_handling_int_fd(
1088 		request,
1089 		fd, buf, ofs, srv_sort_buf_size, NULL);
1090 
1091 #ifdef POSIX_FADV_DONTNEED
1092 	/* Each block is read exactly once.  Free up the file cache. */
1093 	posix_fadvise(fd, ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
1094 #endif /* POSIX_FADV_DONTNEED */
1095 
1096 	if (err != DB_SUCCESS) {
1097 		ib::error() << "Failed to read merge block at " << ofs;
1098 	}
1099 
1100 	DBUG_RETURN(err == DB_SUCCESS);
1101 }
1102 
1103 /********************************************************************//**
1104 Write a merge block to the file system.
1105 @return TRUE if request was successful, FALSE if fail */
1106 ibool
row_merge_write(int fd,ulint offset,const void * buf)1107 row_merge_write(
1108 /*============*/
1109 	int		fd,	/*!< in: file descriptor */
1110 	ulint		offset,	/*!< in: offset where to write,
1111 				in number of row_merge_block_t elements */
1112 	const void*	buf)	/*!< in: data */
1113 {
1114 	size_t		buf_len = srv_sort_buf_size;
1115 	os_offset_t	ofs = buf_len * (os_offset_t) offset;
1116 	dberr_t		err;
1117 
1118 	DBUG_ENTER("row_merge_write");
1119 	DBUG_PRINT("ib_merge_sort", ("fd=%d ofs=" UINT64PF, fd, ofs));
1120 	DBUG_EXECUTE_IF("row_merge_write_failure", DBUG_RETURN(FALSE););
1121 
1122 	IORequest	request(IORequest::WRITE);
1123 
1124 	request.disable_compression();
1125 
1126 	err = os_file_write_int_fd(
1127 		request,
1128 		"(merge)", fd, buf, ofs, buf_len);
1129 
1130 #ifdef POSIX_FADV_DONTNEED
1131 	/* The block will be needed on the next merge pass,
1132 	but it can be evicted from the file cache meanwhile. */
1133 	posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
1134 #endif /* POSIX_FADV_DONTNEED */
1135 
1136 	DBUG_RETURN(err == DB_SUCCESS);
1137 }
1138 
1139 /********************************************************************//**
1140 Read a merge record.
1141 @return pointer to next record, or NULL on I/O error or end of list */
1142 const byte*
row_merge_read_rec(row_merge_block_t * block,mrec_buf_t * buf,const byte * b,const dict_index_t * index,int fd,ulint * foffs,const mrec_t ** mrec,ulint * offsets)1143 row_merge_read_rec(
1144 /*===============*/
1145 	row_merge_block_t*	block,	/*!< in/out: file buffer */
1146 	mrec_buf_t*		buf,	/*!< in/out: secondary buffer */
1147 	const byte*		b,	/*!< in: pointer to record */
1148 	const dict_index_t*	index,	/*!< in: index of the record */
1149 	int			fd,	/*!< in: file descriptor */
1150 	ulint*			foffs,	/*!< in/out: file offset */
1151 	const mrec_t**		mrec,	/*!< out: pointer to merge record,
1152 					or NULL on end of list
1153 					(non-NULL on I/O error) */
1154 	ulint*			offsets)/*!< out: offsets of mrec */
1155 {
1156 	ulint	extra_size;
1157 	ulint	data_size;
1158 	ulint	avail_size;
1159 
1160 	ut_ad(block);
1161 	ut_ad(buf);
1162 	ut_ad(b >= &block[0]);
1163 	ut_ad(b < &block[srv_sort_buf_size]);
1164 	ut_ad(index);
1165 	ut_ad(foffs);
1166 	ut_ad(mrec);
1167 	ut_ad(offsets);
1168 
1169 	ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
1170 	      + dict_index_get_n_fields(index));
1171 
1172 	DBUG_ENTER("row_merge_read_rec");
1173 
1174 	extra_size = *b++;
1175 
1176 	if (UNIV_UNLIKELY(!extra_size)) {
1177 		/* End of list */
1178 		*mrec = NULL;
1179 		DBUG_PRINT("ib_merge_sort",
1180 			   ("read %p,%p,%d,%lu EOF\n",
1181 			    reinterpret_cast<const void*>(b),
1182 			    reinterpret_cast<const void*>(block),
1183 			    fd, ulong(*foffs)));
1184 		DBUG_RETURN(NULL);
1185 	}
1186 
1187 	if (extra_size >= 0x80) {
1188 		/* Read another byte of extra_size. */
1189 
1190 		if (UNIV_UNLIKELY(b >= &block[srv_sort_buf_size])) {
1191 			if (!row_merge_read(fd, ++(*foffs), block)) {
1192 err_exit:
1193 				/* Signal I/O error. */
1194 				*mrec = b;
1195 				DBUG_RETURN(NULL);
1196 			}
1197 
1198 			/* Wrap around to the beginning of the buffer. */
1199 			b = &block[0];
1200 		}
1201 
1202 		extra_size = (extra_size & 0x7f) << 8;
1203 		extra_size |= *b++;
1204 	}
1205 
1206 	/* Normalize extra_size.  Above, value 0 signals "end of list". */
1207 	extra_size--;
1208 
1209 	/* Read the extra bytes. */
1210 
1211 	if (UNIV_UNLIKELY(b + extra_size >= &block[srv_sort_buf_size])) {
1212 		/* The record spans two blocks.  Copy the entire record
1213 		to the auxiliary buffer and handle this as a special
1214 		case. */
1215 
1216 		avail_size = &block[srv_sort_buf_size] - b;
1217 		ut_ad(avail_size < sizeof *buf);
1218 		memcpy(*buf, b, avail_size);
1219 
1220 		if (!row_merge_read(fd, ++(*foffs), block)) {
1221 
1222 			goto err_exit;
1223 		}
1224 
1225 		/* Wrap around to the beginning of the buffer. */
1226 		b = &block[0];
1227 
1228 		/* Copy the record. */
1229 		memcpy(*buf + avail_size, b, extra_size - avail_size);
1230 		b += extra_size - avail_size;
1231 
1232 		*mrec = *buf + extra_size;
1233 
1234 		rec_init_offsets_temp(*mrec, index, offsets);
1235 
1236 		data_size = rec_offs_data_size(offsets);
1237 
1238 		/* These overflows should be impossible given that
1239 		records are much smaller than either buffer, and
1240 		the record starts near the beginning of each buffer. */
1241 		ut_a(extra_size + data_size < sizeof *buf);
1242 		ut_a(b + data_size < &block[srv_sort_buf_size]);
1243 
1244 		/* Copy the data bytes. */
1245 		memcpy(*buf + extra_size, b, data_size);
1246 		b += data_size;
1247 
1248 		goto func_exit;
1249 	}
1250 
1251 	*mrec = b + extra_size;
1252 
1253 	rec_init_offsets_temp(*mrec, index, offsets);
1254 
1255 	data_size = rec_offs_data_size(offsets);
1256 	ut_ad(extra_size + data_size < sizeof *buf);
1257 
1258 	b += extra_size + data_size;
1259 
1260 	if (UNIV_LIKELY(b < &block[srv_sort_buf_size])) {
1261 		/* The record fits entirely in the block.
1262 		This is the normal case. */
1263 		goto func_exit;
1264 	}
1265 
1266 	/* The record spans two blocks.  Copy it to buf. */
1267 
1268 	b -= extra_size + data_size;
1269 	avail_size = &block[srv_sort_buf_size] - b;
1270 	memcpy(*buf, b, avail_size);
1271 	*mrec = *buf + extra_size;
1272 
1273 	/* We cannot invoke rec_offs_make_valid() here, because there
1274 	are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
1275 	Similarly, rec_offs_validate() would fail, because it invokes
1276 	rec_get_status(). */
1277 	ut_d(offsets[2] = (ulint) *mrec);
1278 	ut_d(offsets[3] = (ulint) index);
1279 
1280 	if (!row_merge_read(fd, ++(*foffs), block)) {
1281 
1282 		goto err_exit;
1283 	}
1284 
1285 	/* Wrap around to the beginning of the buffer. */
1286 	b = &block[0];
1287 
1288 	/* Copy the rest of the record. */
1289 	memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
1290 	b += extra_size + data_size - avail_size;
1291 
1292 func_exit:
1293 	DBUG_PRINT("ib_merge_sort",
1294 		   ("%p,%p,fd=%d,%lu: %s",
1295 		    reinterpret_cast<const void*>(b),
1296 		    reinterpret_cast<const void*>(block),
1297 		    fd, ulong(*foffs),
1298 		    rec_printer(*mrec, 0, offsets).str().c_str()));
1299 	DBUG_RETURN(b);
1300 }
1301 
1302 /********************************************************************//**
1303 Write a merge record. */
1304 static
1305 void
row_merge_write_rec_low(byte * b,ulint e,ulint size,int fd,ulint foffs,const mrec_t * mrec,const ulint * offsets)1306 row_merge_write_rec_low(
1307 /*====================*/
1308 	byte*		b,	/*!< out: buffer */
1309 	ulint		e,	/*!< in: encoded extra_size */
1310 #ifndef NDEBUG
1311 	ulint		size,	/*!< in: total size to write */
1312 	int		fd,	/*!< in: file descriptor */
1313 	ulint		foffs,	/*!< in: file offset */
1314 #endif /* !NDEBUG */
1315 	const mrec_t*	mrec,	/*!< in: record to write */
1316 	const ulint*	offsets)/*!< in: offsets of mrec */
1317 #ifdef NDEBUG
1318 # define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets)	\
1319 	row_merge_write_rec_low(b, e, mrec, offsets)
1320 #endif /* NDEBUG */
1321 {
1322 	DBUG_ENTER("row_merge_write_rec_low");
1323 
1324 #ifndef NDEBUG
1325 	const byte* const end = b + size;
1326 #endif /* NDEBUG */
1327 	assert(e == rec_offs_extra_size(offsets) + 1);
1328 	DBUG_PRINT("ib_merge_sort",
1329 		   ("%p,fd=%d,%lu: %s",
1330 		    reinterpret_cast<const void*>(b), fd, ulong(foffs),
1331 		    rec_printer(mrec, 0, offsets).str().c_str()));
1332 
1333 	if (e < 0x80) {
1334 		*b++ = (byte) e;
1335 	} else {
1336 		*b++ = (byte) (0x80 | (e >> 8));
1337 		*b++ = (byte) e;
1338 	}
1339 
1340 	memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
1341 	assert(b + rec_offs_size(offsets) == end);
1342 	DBUG_VOID_RETURN;
1343 }
1344 
1345 /********************************************************************//**
1346 Write a merge record.
1347 @return pointer to end of block, or NULL on error */
1348 static
1349 byte*
row_merge_write_rec(row_merge_block_t * block,mrec_buf_t * buf,byte * b,int fd,ulint * foffs,const mrec_t * mrec,const ulint * offsets)1350 row_merge_write_rec(
1351 /*================*/
1352 	row_merge_block_t*	block,	/*!< in/out: file buffer */
1353 	mrec_buf_t*		buf,	/*!< in/out: secondary buffer */
1354 	byte*			b,	/*!< in: pointer to end of block */
1355 	int			fd,	/*!< in: file descriptor */
1356 	ulint*			foffs,	/*!< in/out: file offset */
1357 	const mrec_t*		mrec,	/*!< in: record to write */
1358 	const ulint*		offsets)/*!< in: offsets of mrec */
1359 {
1360 	ulint	extra_size;
1361 	ulint	size;
1362 	ulint	avail_size;
1363 
1364 	ut_ad(block);
1365 	ut_ad(buf);
1366 	ut_ad(b >= &block[0]);
1367 	ut_ad(b < &block[srv_sort_buf_size]);
1368 	ut_ad(mrec);
1369 	ut_ad(foffs);
1370 	ut_ad(mrec < &block[0] || mrec > &block[srv_sort_buf_size]);
1371 	ut_ad(mrec < buf[0] || mrec > buf[1]);
1372 
1373 	/* Normalize extra_size.  Value 0 signals "end of list". */
1374 	extra_size = rec_offs_extra_size(offsets) + 1;
1375 
1376 	size = extra_size + (extra_size >= 0x80)
1377 		+ rec_offs_data_size(offsets);
1378 
1379 	if (UNIV_UNLIKELY(b + size >= &block[srv_sort_buf_size])) {
1380 		/* The record spans two blocks.
1381 		Copy it to the temporary buffer first. */
1382 		avail_size = &block[srv_sort_buf_size] - b;
1383 
1384 		row_merge_write_rec_low(buf[0],
1385 					extra_size, size, fd, *foffs,
1386 					mrec, offsets);
1387 
1388 		/* Copy the head of the temporary buffer, write
1389 		the completed block, and copy the tail of the
1390 		record to the head of the new block. */
1391 		memcpy(b, buf[0], avail_size);
1392 
1393 		if (!row_merge_write(fd, (*foffs)++, block)) {
1394 			return(NULL);
1395 		}
1396 
1397 		UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1398 
1399 		/* Copy the rest. */
1400 		b = &block[0];
1401 		memcpy(b, buf[0] + avail_size, size - avail_size);
1402 		b += size - avail_size;
1403 	} else {
1404 		row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
1405 					mrec, offsets);
1406 		b += size;
1407 	}
1408 
1409 	return(b);
1410 }
1411 
1412 /********************************************************************//**
1413 Write an end-of-list marker.
1414 @return pointer to end of block, or NULL on error */
1415 static
1416 byte*
row_merge_write_eof(row_merge_block_t * block,byte * b,int fd,ulint * foffs)1417 row_merge_write_eof(
1418 /*================*/
1419 	row_merge_block_t*	block,	/*!< in/out: file buffer */
1420 	byte*			b,	/*!< in: pointer to end of block */
1421 	int			fd,	/*!< in: file descriptor */
1422 	ulint*			foffs)	/*!< in/out: file offset */
1423 {
1424 	ut_ad(block);
1425 	ut_ad(b >= &block[0]);
1426 	ut_ad(b < &block[srv_sort_buf_size]);
1427 	ut_ad(foffs);
1428 
1429 	DBUG_ENTER("row_merge_write_eof");
1430 	DBUG_PRINT("ib_merge_sort",
1431 		   ("%p,%p,fd=%d,%lu",
1432 		    reinterpret_cast<const void*>(b),
1433 		    reinterpret_cast<const void*>(block),
1434 		    fd, ulong(*foffs)));
1435 
1436 	*b++ = 0;
1437 	UNIV_MEM_ASSERT_RW(&block[0], b - &block[0]);
1438 	UNIV_MEM_ASSERT_W(&block[0], srv_sort_buf_size);
1439 #ifdef UNIV_DEBUG_VALGRIND
1440 	/* The rest of the block is uninitialized.  Initialize it
1441 	to avoid bogus warnings. */
1442 	memset(b, 0xff, &block[srv_sort_buf_size] - b);
1443 #endif /* UNIV_DEBUG_VALGRIND */
1444 
1445 	if (!row_merge_write(fd, (*foffs)++, block)) {
1446 		DBUG_RETURN(NULL);
1447 	}
1448 
1449 	UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
1450 	DBUG_RETURN(&block[0]);
1451 }
1452 
1453 /** Create a temporary file if it has not been created already.
1454 @param[in,out]	tmpfd	temporary file handle
1455 @param[in]	path	location for creating temporary file
1456 @return file descriptor, or -1 on failure */
1457 static MY_ATTRIBUTE((warn_unused_result))
1458 int
row_merge_tmpfile_if_needed(int * tmpfd,const char * path)1459 row_merge_tmpfile_if_needed(
1460 	int*		tmpfd,
1461 	const char*	path)
1462 {
1463 	if (*tmpfd < 0) {
1464 		*tmpfd = row_merge_file_create_low(path);
1465 		if (*tmpfd >= 0) {
1466 			MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1467 		}
1468 	}
1469 
1470 	return(*tmpfd);
1471 }
1472 
1473 /** Create a temporary file for merge sort if it was not created already.
1474 @param[in,out]	file	merge file structure
1475 @param[in]	nrec	number of records in the file
1476 @param[in]	path	location for creating temporary file
1477 @return file descriptor, or -1 on failure */
1478 static MY_ATTRIBUTE((warn_unused_result))
1479 int
row_merge_file_create_if_needed(merge_file_t * file,int * tmpfd,ulint nrec,const char * path)1480 row_merge_file_create_if_needed(
1481 	merge_file_t*	file,
1482 	int*		tmpfd,
1483 	ulint		nrec,
1484 	const char*	path)
1485 {
1486 	ut_ad(file->fd < 0 || *tmpfd >=0);
1487 	if (file->fd < 0 && row_merge_file_create(file, path) >= 0) {
1488 		MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_SORT_FILES);
1489 		if (row_merge_tmpfile_if_needed(tmpfd, path) < 0) {
1490 			return(-1);
1491 		}
1492 
1493 		file->n_rec = nrec;
1494 	}
1495 
1496 	ut_ad(file->fd < 0 || *tmpfd >=0);
1497 	return(file->fd);
1498 }
1499 
1500 /** Copy the merge data tuple from another merge data tuple.
1501 @param[in]	mtuple		source merge data tuple
1502 @param[in,out]	prev_mtuple	destination merge data tuple
1503 @param[in]	n_unique	number of unique fields exist in the mtuple
1504 @param[in,out]	heap		memory heap where last_mtuple allocated */
1505 static
1506 void
row_mtuple_create(const mtuple_t * mtuple,mtuple_t * prev_mtuple,ulint n_unique,mem_heap_t * heap)1507 row_mtuple_create(
1508 	const mtuple_t*	mtuple,
1509 	mtuple_t*	prev_mtuple,
1510 	ulint		n_unique,
1511 	mem_heap_t*	heap)
1512 {
1513 	memcpy(prev_mtuple->fields, mtuple->fields,
1514 	       n_unique * sizeof *mtuple->fields);
1515 
1516 	dfield_t*	field = prev_mtuple->fields;
1517 
1518 	for (ulint i = 0; i < n_unique; i++) {
1519 		dfield_dup(field++, heap);
1520 	}
1521 }
1522 
1523 /** Compare two merge data tuples.
1524 @param[in]	prev_mtuple	merge data tuple
1525 @param[in]	current_mtuple	merge data tuple
1526 @param[in,out]	dup		reporter of duplicates
1527 @retval positive, 0, negative if current_mtuple is greater, equal, less, than
1528 last_mtuple. */
1529 static
1530 int
row_mtuple_cmp(const mtuple_t * prev_mtuple,const mtuple_t * current_mtuple,row_merge_dup_t * dup)1531 row_mtuple_cmp(
1532 	const mtuple_t*		prev_mtuple,
1533 	const mtuple_t*		current_mtuple,
1534 	row_merge_dup_t*	dup)
1535 {
1536 	ut_ad(dict_index_is_clust(dup->index));
1537 	const ulint	n_unique = dict_index_get_n_unique(dup->index);
1538 
1539 	return(row_merge_tuple_cmp(
1540 		       n_unique, n_unique, *current_mtuple, *prev_mtuple, dup));
1541 }
1542 
1543 /** Insert cached spatial index rows.
1544 @param[in]	trx_id		transaction id
1545 @param[in]	sp_tuples	cached spatial rows
1546 @param[in]	num_spatial	number of spatial indexes
1547 @param[in,out]	row_heap	heap for insert
1548 @param[in,out]	sp_heap		heap for tuples
1549 @param[in,out]	pcur		cluster index cursor
1550 @param[in,out]	mtr		mini transaction
1551 @param[in,out]	mtr_committed	whether scan_mtr got committed
1552 @return DB_SUCCESS or error number */
1553 static
1554 dberr_t
row_merge_spatial_rows(trx_id_t trx_id,index_tuple_info_t ** sp_tuples,ulint num_spatial,mem_heap_t * row_heap,mem_heap_t * sp_heap,btr_pcur_t * pcur,mtr_t * mtr,bool * mtr_committed)1555 row_merge_spatial_rows(
1556 	trx_id_t		trx_id,
1557 	index_tuple_info_t**	sp_tuples,
1558 	ulint			num_spatial,
1559 	mem_heap_t*		row_heap,
1560 	mem_heap_t*		sp_heap,
1561 	btr_pcur_t*		pcur,
1562 	mtr_t*			mtr,
1563 	bool*			mtr_committed)
1564 {
1565 	dberr_t			err = DB_SUCCESS;
1566 
1567 	if (sp_tuples == NULL) {
1568 		return(DB_SUCCESS);
1569 	}
1570 
1571 	ut_ad(sp_heap != NULL);
1572 
1573 	for (ulint j = 0; j < num_spatial; j++) {
1574 		err = sp_tuples[j]->insert(
1575 			trx_id, row_heap,
1576 			pcur, mtr, mtr_committed);
1577 
1578 		if (err != DB_SUCCESS) {
1579 			return(err);
1580 		}
1581 	}
1582 
1583 	mem_heap_empty(sp_heap);
1584 
1585 	return(err);
1586 }
1587 
1588 /** Check if the geometry field is valid.
1589 @param[in]	row		the row
1590 @param[in]	index		spatial index
1591 @return true if it's valid, false if it's invalid. */
1592 static
1593 bool
row_geo_field_is_valid(const dtuple_t * row,dict_index_t * index)1594 row_geo_field_is_valid(
1595 	const dtuple_t*		row,
1596 	dict_index_t*		index)
1597 {
1598 	const dict_field_t*	ind_field
1599 		= dict_index_get_nth_field(index, 0);
1600 	const dict_col_t*	col
1601 		= ind_field->col;
1602 	ulint			col_no
1603 		= dict_col_get_no(col);
1604 	const dfield_t*		dfield
1605 		= dtuple_get_nth_field(row, col_no);
1606 
1607 	if (dfield_is_null(dfield)
1608 	    || dfield_get_len(dfield) < GEO_DATA_HEADER_SIZE) {
1609 		return(false);
1610 	}
1611 
1612 	return(true);
1613 }
1614 
1615 /** Reads clustered index of the table and create temporary files
1616 containing the index entries for the indexes to be built.
1617 @param[in]	trx		transaction
1618 @param[in,out]	table		MySQL table object, for reporting erroneous
1619 records
1620 @param[in]	old_table	table where rows are read from
1621 @param[in]	new_table	table where indexes are created; identical to
1622 old_table unless creating a PRIMARY KEY
1623 @param[in]	online		true if creating indexes online
1624 @param[in]	index		indexes to be created
1625 @param[in]	fts_sort_idx	full-text index to be created, or NULL
1626 @param[in]	psort_info	parallel sort info for fts_sort_idx creation,
1627 or NULL
1628 @param[in]	files		temporary files
1629 @param[in]	key_numbers	MySQL key numbers to create
1630 @param[in]	n_index		number of indexes to create
1631 @param[in]	add_cols	default values of added columns, or NULL
1632 @param[in]	add_v		newly added virtual columns along with indexes
1633 @param[in]	col_map		mapping of old column numbers to new ones, or
1634 NULL if old_table == new_table
1635 @param[in]	add_autoinc	number of added AUTO_INCREMENT columns, or
1636 ULINT_UNDEFINED if none is added
1637 @param[in,out]	sequence	autoinc sequence
1638 @param[in,out]	block		file buffer
1639 @param[in]	skip_pk_sort	whether the new PRIMARY KEY will follow
1640 existing order
1641 @param[in,out]	tmpfd		temporary file handle
1642 @param[in,out]	stage		performance schema accounting object, used by
1643 ALTER TABLE. stage->n_pk_recs_inc() will be called for each record read and
1644 stage->inc() will be called for each page read.
1645 @param[in]	eval_table	mysql table used to evaluate virtual column
1646 				value, see innobase_get_computed_value().
1647 @return DB_SUCCESS or error */
1648 static MY_ATTRIBUTE((warn_unused_result))
1649 dberr_t
row_merge_read_clustered_index(trx_t * trx,struct TABLE * table,const dict_table_t * old_table,const dict_table_t * new_table,bool online,dict_index_t ** index,dict_index_t * fts_sort_idx,fts_psort_t * psort_info,merge_file_t * files,const ulint * key_numbers,ulint n_index,const dtuple_t * add_cols,const dict_add_v_col_t * add_v,const ulint * col_map,ulint add_autoinc,ib_sequence_t & sequence,row_merge_block_t * block,bool skip_pk_sort,int * tmpfd,ut_stage_alter_t * stage,struct TABLE * eval_table)1650 row_merge_read_clustered_index(
1651 	trx_t*			trx,
1652 	struct TABLE*		table,
1653 	const dict_table_t*	old_table,
1654 	const dict_table_t*	new_table,
1655 	bool			online,
1656 	dict_index_t**		index,
1657 	dict_index_t*		fts_sort_idx,
1658 	fts_psort_t*		psort_info,
1659 	merge_file_t*		files,
1660 	const ulint*		key_numbers,
1661 	ulint			n_index,
1662 	const dtuple_t*		add_cols,
1663 	const dict_add_v_col_t*	add_v,
1664 	const ulint*		col_map,
1665 	ulint			add_autoinc,
1666 	ib_sequence_t&		sequence,
1667 	row_merge_block_t*	block,
1668 	bool			skip_pk_sort,
1669 	int*			tmpfd,
1670 	ut_stage_alter_t*	stage,
1671 	struct TABLE*		eval_table)
1672 {
1673 	dict_index_t*		clust_index;	/* Clustered index */
1674 	mem_heap_t*		row_heap;	/* Heap memory to create
1675 						clustered index tuples */
1676 	row_merge_buf_t**	merge_buf;	/* Temporary list for records*/
1677 	mem_heap_t*		v_heap = NULL;	/* Heap memory to process large
1678 						data for virtual column */
1679 	btr_pcur_t		pcur;		/* Cursor on the clustered
1680 						index */
1681 	mtr_t			mtr;		/* Mini transaction */
1682 	dberr_t			err = DB_SUCCESS;/* Return code */
1683 	ulint			n_nonnull = 0;	/* number of columns
1684 						changed to NOT NULL */
1685 	ulint*			nonnull = NULL;	/* NOT NULL columns */
1686 	dict_index_t*		fts_index = NULL;/* FTS index */
1687 	doc_id_t		doc_id = 0;
1688 	doc_id_t		max_doc_id = 0;
1689 	ibool			add_doc_id = FALSE;
1690 	os_event_t		fts_parallel_sort_event = NULL;
1691 	ibool			fts_pll_sort = FALSE;
1692 	int64_t			sig_count = 0;
1693 	index_tuple_info_t**	sp_tuples = NULL;
1694 	mem_heap_t*		sp_heap = NULL;
1695 	ulint			num_spatial = 0;
1696 	BtrBulk*		clust_btr_bulk = NULL;
1697 	bool			clust_temp_file = false;
1698 	mem_heap_t*		mtuple_heap = NULL;
1699 	mtuple_t		prev_mtuple;
1700 	mem_heap_t*		conv_heap = NULL;
1701 	FlushObserver*		observer = trx->flush_observer;
1702 	DBUG_ENTER("row_merge_read_clustered_index");
1703 
1704 	ut_ad((old_table == new_table) == !col_map);
1705 	ut_ad(!add_cols || col_map);
1706 
1707 	trx->op_info = "reading clustered index";
1708 
1709 #ifdef FTS_INTERNAL_DIAG_PRINT
1710 	DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
1711 #endif
1712 
1713 	/* Create and initialize memory for record buffers */
1714 
1715 	merge_buf = static_cast<row_merge_buf_t**>(
1716 		ut_malloc_nokey(n_index * sizeof *merge_buf));
1717 
1718 	row_merge_dup_t	clust_dup = {index[0], table, col_map, 0};
1719 	dfield_t*	prev_fields;
1720 	const ulint	n_uniq = dict_index_get_n_unique(index[0]);
1721 
1722 	ut_ad(trx->mysql_thd != NULL);
1723 
1724 	const char*	path = thd_innodb_tmpdir(trx->mysql_thd);
1725 
1726 	ut_ad(!skip_pk_sort || dict_index_is_clust(index[0]));
1727 	/* There is no previous tuple yet. */
1728 	prev_mtuple.fields = NULL;
1729 
1730 	for (ulint i = 0; i < n_index; i++) {
1731 		if (index[i]->type & DICT_FTS) {
1732 
1733 			/* We are building a FT index, make sure
1734 			we have the temporary 'fts_sort_idx' */
1735 			ut_a(fts_sort_idx);
1736 
1737 			fts_index = index[i];
1738 
1739 			merge_buf[i] = row_merge_buf_create(fts_sort_idx);
1740 
1741 			add_doc_id = DICT_TF2_FLAG_IS_SET(
1742 				new_table, DICT_TF2_FTS_ADD_DOC_ID);
1743 
1744 			/* If Doc ID does not exist in the table itself,
1745 			fetch the first FTS Doc ID */
1746 			if (add_doc_id) {
1747 				fts_get_next_doc_id(
1748 					(dict_table_t*) new_table,
1749 					&doc_id);
1750 				ut_ad(doc_id > 0);
1751 			}
1752 
1753 			fts_pll_sort = TRUE;
1754 			row_fts_start_psort(psort_info);
1755 			fts_parallel_sort_event =
1756 				 psort_info[0].psort_common->sort_event;
1757 		} else {
1758 			if (dict_index_is_spatial(index[i])) {
1759 				num_spatial++;
1760 			}
1761 
1762 			merge_buf[i] = row_merge_buf_create(index[i]);
1763 		}
1764 	}
1765 
1766 	if (num_spatial > 0) {
1767 		ulint	count = 0;
1768 
1769 		sp_heap = mem_heap_create(512);
1770 
1771 		sp_tuples = static_cast<index_tuple_info_t**>(
1772 			ut_malloc_nokey(num_spatial
1773 					* sizeof(*sp_tuples)));
1774 
1775 		for (ulint i = 0; i < n_index; i++) {
1776 			if (dict_index_is_spatial(index[i])) {
1777 				sp_tuples[count]
1778 					= UT_NEW_NOKEY(
1779 						index_tuple_info_t(
1780 							sp_heap,
1781 							index[i]));
1782 				count++;
1783 			}
1784 		}
1785 
1786 		ut_ad(count == num_spatial);
1787 	}
1788 
1789 	mtr_start(&mtr);
1790 
1791 	/* Find the clustered index and create a persistent cursor
1792 	based on that. */
1793 
1794 	clust_index = dict_table_get_first_index(old_table);
1795 
1796 	btr_pcur_open_at_index_side(
1797 		true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
1798 
1799 	if (old_table != new_table) {
1800 		/* The table is being rebuilt.  Identify the columns
1801 		that were flagged NOT NULL in the new table, so that
1802 		we can quickly check that the records in the old table
1803 		do not violate the added NOT NULL constraints. */
1804 
1805 		nonnull = static_cast<ulint*>(
1806 			ut_malloc_nokey(dict_table_get_n_cols(new_table)
1807 				  * sizeof *nonnull));
1808 
1809 		for (ulint i = 0; i < dict_table_get_n_cols(old_table); i++) {
1810 			if (dict_table_get_nth_col(old_table, i)->prtype
1811 			    & DATA_NOT_NULL) {
1812 				continue;
1813 			}
1814 
1815 			const ulint j = col_map[i];
1816 
1817 			if (j == ULINT_UNDEFINED) {
1818 				/* The column was dropped. */
1819 				continue;
1820 			}
1821 
1822 			if (dict_table_get_nth_col(new_table, j)->prtype
1823 			    & DATA_NOT_NULL) {
1824 				nonnull[n_nonnull++] = j;
1825 			}
1826 		}
1827 
1828 		if (!n_nonnull) {
1829 			ut_free(nonnull);
1830 			nonnull = NULL;
1831 		}
1832 	}
1833 
1834 	row_heap = mem_heap_create(sizeof(mrec_buf_t));
1835 
1836 	if (dict_table_is_comp(old_table)
1837 	    && !dict_table_is_comp(new_table)) {
1838 		conv_heap = mem_heap_create(sizeof(mrec_buf_t));
1839 	}
1840 
1841 	if (skip_pk_sort) {
1842 		prev_fields = static_cast<dfield_t*>(
1843 			ut_malloc_nokey(n_uniq * sizeof *prev_fields));
1844 		mtuple_heap = mem_heap_create(sizeof(mrec_buf_t));
1845 	} else {
1846 		prev_fields = NULL;
1847 	}
1848 
1849 	/* Scan the clustered index. */
1850 	for (;;) {
1851 		const rec_t*	rec;
1852 		ulint*		offsets;
1853 		const dtuple_t*	row;
1854 		row_ext_t*	ext;
1855 		page_cur_t*	cur	= btr_pcur_get_page_cur(&pcur);
1856 
1857 		mem_heap_empty(row_heap);
1858 
1859 		page_cur_move_to_next(cur);
1860 
1861 		stage->n_pk_recs_inc();
1862 
1863 		if (page_cur_is_after_last(cur)) {
1864 
1865 			stage->inc();
1866 
1867 			if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
1868 				err = DB_INTERRUPTED;
1869 				trx->error_key_num = 0;
1870 				goto func_exit;
1871 			}
1872 
1873 			if (online && old_table != new_table) {
1874 				err = row_log_table_get_error(clust_index);
1875 				if (err != DB_SUCCESS) {
1876 					trx->error_key_num = 0;
1877 					goto func_exit;
1878 				}
1879 			}
1880 
1881 #ifdef NDEBUG
1882 # define dbug_run_purge	false
1883 #else /* NDEBUG */
1884 			bool	dbug_run_purge = false;
1885 #endif /* NDEBUG */
1886 			DBUG_EXECUTE_IF(
1887 				"ib_purge_on_create_index_page_switch",
1888 				dbug_run_purge = true;);
1889 
1890 			/* Insert the cached spatial index rows. */
1891 			bool	mtr_committed = false;
1892 
1893 			err = row_merge_spatial_rows(
1894 				trx->id, sp_tuples, num_spatial,
1895 				row_heap, sp_heap, &pcur,
1896 				&mtr, &mtr_committed);
1897 
1898 			if (err != DB_SUCCESS) {
1899 				goto func_exit;
1900 			}
1901 
1902 			if (mtr_committed) {
1903 				goto scan_next;
1904 			}
1905 
1906 			if (dbug_run_purge
1907 			    || rw_lock_get_waiters(
1908 				    dict_index_get_lock(clust_index))) {
1909 				/* There are waiters on the clustered
1910 				index tree lock, likely the purge
1911 				thread. Store and restore the cursor
1912 				position, and yield so that scanning a
1913 				large table will not starve other
1914 				threads. */
1915 
1916 				/* Store the cursor position on the last user
1917 				record on the page. */
1918 				btr_pcur_move_to_prev_on_page(&pcur);
1919 				/* Leaf pages must never be empty, unless
1920 				this is the only page in the index tree. */
1921 				ut_ad(btr_pcur_is_on_user_rec(&pcur)
1922 				      || btr_pcur_get_block(
1923 					      &pcur)->page.id.page_no()
1924 				      == clust_index->page);
1925 
1926 				btr_pcur_store_position(&pcur, &mtr);
1927 				mtr_commit(&mtr);
1928 
1929 				if (dbug_run_purge) {
1930 					/* This is for testing
1931 					purposes only (see
1932 					DBUG_EXECUTE_IF above).  We
1933 					signal the purge thread and
1934 					hope that the purge batch will
1935 					complete before we execute
1936 					btr_pcur_restore_position(). */
1937 					trx_purge_run();
1938 					os_thread_sleep(1000000);
1939 				}
1940 
1941 				/* Give the waiters a chance to proceed. */
1942 				os_thread_yield();
1943 scan_next:
1944 				mtr_start(&mtr);
1945 				/* Restore position on the record, or its
1946 				predecessor if the record was purged
1947 				meanwhile. */
1948 				btr_pcur_restore_position(
1949 					BTR_SEARCH_LEAF, &pcur, &mtr);
1950 				/* Move to the successor of the
1951 				original record. */
1952 				if (!btr_pcur_move_to_next_user_rec(
1953 					    &pcur, &mtr)) {
1954 end_of_index:
1955 					row = NULL;
1956 					mtr_commit(&mtr);
1957 					mem_heap_free(row_heap);
1958 					ut_free(nonnull);
1959 					goto write_buffers;
1960 				}
1961 			} else {
1962 				ulint		next_page_no;
1963 				buf_block_t*	block;
1964 
1965 				next_page_no = btr_page_get_next(
1966 					page_cur_get_page(cur), &mtr);
1967 
1968 				if (next_page_no == FIL_NULL) {
1969 					goto end_of_index;
1970 				}
1971 
1972 				block = page_cur_get_block(cur);
1973 				block = btr_block_get(
1974 					page_id_t(block->page.id.space(),
1975 						  next_page_no),
1976 					block->page.size,
1977 					BTR_SEARCH_LEAF,
1978 					clust_index, &mtr);
1979 
1980 				btr_leaf_page_release(page_cur_get_block(cur),
1981 						      BTR_SEARCH_LEAF, &mtr);
1982 				page_cur_set_before_first(block, cur);
1983 				page_cur_move_to_next(cur);
1984 
1985 				ut_ad(!page_cur_is_after_last(cur));
1986 			}
1987 		}
1988 
1989 		rec = page_cur_get_rec(cur);
1990 
1991 		offsets = rec_get_offsets(rec, clust_index, NULL,
1992 					  ULINT_UNDEFINED, &row_heap);
1993 
1994 		if (online) {
1995 			/* Perform a REPEATABLE READ.
1996 
1997 			When rebuilding the table online,
1998 			row_log_table_apply() must not see a newer
1999 			state of the table when applying the log.
2000 			This is mainly to prevent false duplicate key
2001 			errors, because the log will identify records
2002 			by the PRIMARY KEY, and also to prevent unsafe
2003 			BLOB access.
2004 
2005 			When creating a secondary index online, this
2006 			table scan must not see records that have only
2007 			been inserted to the clustered index, but have
2008 			not been written to the online_log of
2009 			index[]. If we performed READ UNCOMMITTED, it
2010 			could happen that the ADD INDEX reaches
2011 			ONLINE_INDEX_COMPLETE state between the time
2012 			the DML thread has updated the clustered index
2013 			but has not yet accessed secondary index. */
2014 			ut_ad(MVCC::is_view_active(trx->read_view));
2015 
2016 			if (!trx->read_view->changes_visible(
2017 				    row_get_rec_trx_id(
2018 					    rec, clust_index, offsets),
2019 				    old_table->name)) {
2020 				rec_t*	old_vers;
2021 
2022 				row_vers_build_for_consistent_read(
2023 					rec, &mtr, clust_index, &offsets,
2024 					trx->read_view, &row_heap,
2025 					row_heap, &old_vers, NULL);
2026 
2027 				rec = old_vers;
2028 
2029 				if (!rec) {
2030 					continue;
2031 				}
2032 			}
2033 
2034 			if (rec_get_deleted_flag(
2035 				    rec,
2036 				    dict_table_is_comp(old_table))) {
2037 				/* This record was deleted in the latest
2038 				committed version, or it was deleted and
2039 				then reinserted-by-update before purge
2040 				kicked in. Skip it. */
2041 				continue;
2042 			}
2043 
2044 			ut_ad(!rec_offs_any_null_extern(rec, offsets));
2045 		} else if (rec_get_deleted_flag(
2046 				   rec, dict_table_is_comp(old_table))) {
2047 			/* Skip delete-marked records.
2048 
2049 			Skipping delete-marked records will make the
2050 			created indexes unuseable for transactions
2051 			whose read views were created before the index
2052 			creation completed, but preserving the history
2053 			would make it tricky to detect duplicate
2054 			keys. */
2055 			continue;
2056 		}
2057 
2058 		/* When !online, we are holding a lock on old_table, preventing
2059 		any inserts that could have written a record 'stub' before
2060 		writing out off-page columns. */
2061 		ut_ad(!rec_offs_any_null_extern(rec, offsets));
2062 
2063 		/* Build a row based on the clustered index. */
2064 
2065 		row = row_build_w_add_vcol(ROW_COPY_POINTERS, clust_index,
2066 					   rec, offsets, new_table,
2067 					   add_cols, add_v, col_map, &ext,
2068 					   row_heap);
2069 		ut_ad(row);
2070 
2071 		for (ulint i = 0; i < n_nonnull; i++) {
2072 			const dfield_t*	field	= &row->fields[nonnull[i]];
2073 
2074 			ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL);
2075 
2076 			if (dfield_is_null(field)) {
2077 				err = DB_INVALID_NULL;
2078 				trx->error_key_num = 0;
2079 				goto func_exit;
2080 			}
2081 		}
2082 
2083 		/* Get the next Doc ID */
2084 		if (add_doc_id) {
2085 			doc_id++;
2086 		} else {
2087 			doc_id = 0;
2088 		}
2089 
2090 		if (add_autoinc != ULINT_UNDEFINED) {
2091 
2092 			ut_ad(add_autoinc
2093 			      < dict_table_get_n_user_cols(new_table));
2094 
2095 			const dfield_t*	dfield;
2096 
2097 			dfield = dtuple_get_nth_field(row, add_autoinc);
2098 			if (dfield_is_null(dfield)) {
2099 				goto write_buffers;
2100 			}
2101 
2102 			const dtype_t*  dtype = dfield_get_type(dfield);
2103 			byte*	b = static_cast<byte*>(dfield_get_data(dfield));
2104 
2105 			if (sequence.eof()) {
2106 				err = DB_ERROR;
2107 				trx->error_key_num = 0;
2108 
2109 				ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
2110 					ER_AUTOINC_READ_FAILED, "[NULL]");
2111 
2112 				goto func_exit;
2113 			}
2114 
2115 			ulonglong	value = sequence++;
2116 
2117 			switch (dtype_get_mtype(dtype)) {
2118 			case DATA_INT: {
2119 				ibool	usign;
2120 				ulint	len = dfield_get_len(dfield);
2121 
2122 				usign = dtype_get_prtype(dtype) & DATA_UNSIGNED;
2123 				mach_write_ulonglong(b, value, len, usign);
2124 
2125 				break;
2126 				}
2127 
2128 			case DATA_FLOAT:
2129 				mach_float_write(
2130 					b, static_cast<float>(value));
2131 				break;
2132 
2133 			case DATA_DOUBLE:
2134 				mach_double_write(
2135 					b, static_cast<double>(value));
2136 				break;
2137 
2138 			default:
2139 				ut_ad(0);
2140 			}
2141 		}
2142 
2143 write_buffers:
2144 		/* Build all entries for all the indexes to be created
2145 		in a single scan of the clustered index. */
2146 
2147 		ulint	s_idx_cnt = 0;
2148 		bool	skip_sort = skip_pk_sort
2149 			&& dict_index_is_clust(merge_buf[0]->index);
2150 
2151 		for (ulint i = 0; i < n_index; i++, skip_sort = false) {
2152 			row_merge_buf_t*	buf	= merge_buf[i];
2153 			merge_file_t*		file	= &files[i];
2154 			ulint			rows_added = 0;
2155 
2156 			if (dict_index_is_spatial(buf->index)) {
2157 				if (!row) {
2158 					continue;
2159 				}
2160 
2161 				ut_ad(sp_tuples[s_idx_cnt]->get_index()
2162 				      == buf->index);
2163 
2164 				/* If the geometry field is invalid, report
2165 				error. */
2166 				if (!row_geo_field_is_valid(row, buf->index)) {
2167 					err = DB_CANT_CREATE_GEOMETRY_OBJECT;
2168 					break;
2169 				}
2170 
2171 				sp_tuples[s_idx_cnt]->add(row, ext);
2172 				s_idx_cnt++;
2173 
2174 				continue;
2175 			}
2176 
2177 			if (UNIV_LIKELY
2178 			    (row && (rows_added = row_merge_buf_add(
2179 					buf, fts_index, old_table, new_table,
2180 					psort_info, row, ext, &doc_id,
2181 					conv_heap, &err,
2182 					&v_heap, eval_table, trx)))) {
2183 
2184 				/* If we are creating FTS index,
2185 				a single row can generate more
2186 				records for tokenized word */
2187 				file->n_rec += rows_added;
2188 
2189 				if (err != DB_SUCCESS) {
2190 					ut_ad(err == DB_TOO_BIG_RECORD);
2191 					break;
2192 				}
2193 
2194 				if (doc_id > max_doc_id) {
2195 					max_doc_id = doc_id;
2196 				}
2197 
2198 				if (buf->index->type & DICT_FTS) {
2199 					/* Check if error occurs in child thread */
2200 					for (ulint j = 0;
2201 					     j < fts_sort_pll_degree; j++) {
2202 						if (psort_info[j].error
2203 							!= DB_SUCCESS) {
2204 							err = psort_info[j].error;
2205 							trx->error_key_num = i;
2206 							break;
2207 						}
2208 					}
2209 
2210 					if (err != DB_SUCCESS) {
2211 						break;
2212 					}
2213 				}
2214 
2215 				if (skip_sort) {
2216 					ut_ad(buf->n_tuples > 0);
2217 					const mtuple_t*	curr =
2218 						&buf->tuples[buf->n_tuples - 1];
2219 
2220 					ut_ad(i == 0);
2221 					ut_ad(dict_index_is_clust(merge_buf[0]->index));
2222 					/* Detect duplicates by comparing the
2223 					current record with previous record.
2224 					When temp file is not used, records
2225 					should be in sorted order. */
2226 					if (prev_mtuple.fields != NULL
2227 					    && (row_mtuple_cmp(
2228 						&prev_mtuple, curr,
2229 						&clust_dup) == 0)) {
2230 
2231 						err = DB_DUPLICATE_KEY;
2232 						trx->error_key_num
2233 							= key_numbers[0];
2234 						goto func_exit;
2235 					}
2236 
2237 					prev_mtuple.fields = curr->fields;
2238 				}
2239 
2240 				continue;
2241 			}
2242 
2243 			if (err == DB_COMPUTE_VALUE_FAILED) {
2244 				trx->error_key_num = i;
2245 				goto func_exit;
2246 			}
2247 
2248 			if (buf->index->type & DICT_FTS) {
2249 				if (!row || !doc_id) {
2250 					continue;
2251 				}
2252 			}
2253 
2254 			/* The buffer must be sufficiently large
2255 			to hold at least one record. It may only
2256 			be empty when we reach the end of the
2257 			clustered index. row_merge_buf_add()
2258 			must not have been called in this loop. */
2259 			ut_ad(buf->n_tuples || row == NULL);
2260 
2261 			/* We have enough data tuples to form a block.
2262 			Sort them and write to disk if temp file is used
2263 			or insert into index if temp file is not used. */
2264 			ut_ad(old_table == new_table
2265 			      ? !dict_index_is_clust(buf->index)
2266 			      : (i == 0) == dict_index_is_clust(buf->index));
2267 
2268 			/* We have enough data tuples to form a block.
2269 			Sort them (if !skip_sort) and write to disk. */
2270 
2271 			if (buf->n_tuples) {
2272 				if (skip_sort) {
2273 					/* Temporary File is not used.
2274 					so insert sorted block to the index */
2275 					if (row != NULL) {
2276 						bool	mtr_committed = false;
2277 
2278 						/* We have to do insert the
2279 						cached spatial index rows, since
2280 						after the mtr_commit, the cluster
2281 						index page could be updated, then
2282 						the data in cached rows become
2283 						invalid. */
2284 						err = row_merge_spatial_rows(
2285 							trx->id, sp_tuples,
2286 							num_spatial,
2287 							row_heap, sp_heap,
2288 							&pcur, &mtr,
2289 							&mtr_committed);
2290 
2291 						if (err != DB_SUCCESS) {
2292 							goto func_exit;
2293 						}
2294 
2295 						/* We are not at the end of
2296 						the scan yet. We must
2297 						mtr_commit() in order to be
2298 						able to call log_free_check()
2299 						in row_merge_insert_index_tuples().
2300 						Due to mtr_commit(), the
2301 						current row will be invalid, and
2302 						we must reread it on the next
2303 						loop iteration. */
2304 						if (!mtr_committed) {
2305 							btr_pcur_move_to_prev_on_page(
2306 								&pcur);
2307 							btr_pcur_store_position(
2308 								&pcur, &mtr);
2309 
2310 							mtr_commit(&mtr);
2311 						}
2312 					}
2313 
2314 					mem_heap_empty(mtuple_heap);
2315 					prev_mtuple.fields = prev_fields;
2316 
2317 					row_mtuple_create(
2318 						&buf->tuples[buf->n_tuples - 1],
2319 						&prev_mtuple, n_uniq,
2320 						mtuple_heap);
2321 
2322 					if (clust_btr_bulk == NULL) {
2323 						clust_btr_bulk = UT_NEW_NOKEY(
2324 							BtrBulk(index[i],
2325 								trx->id,
2326 								observer));
2327 
2328 						clust_btr_bulk->init();
2329 					} else {
2330 						clust_btr_bulk->latch();
2331 					}
2332 
2333 					err = row_merge_insert_index_tuples(
2334 						trx->id, index[i], old_table,
2335 						-1, NULL, buf, clust_btr_bulk);
2336 
2337 					if (row == NULL) {
2338 						err = clust_btr_bulk->finish(
2339 							err);
2340 						UT_DELETE(clust_btr_bulk);
2341 						clust_btr_bulk = NULL;
2342 					} else {
2343 						/* Release latches for possible
2344 						log_free_chck in spatial index
2345 						build. */
2346 						clust_btr_bulk->release();
2347 					}
2348 
2349 					if (err != DB_SUCCESS) {
2350 						break;
2351 					}
2352 
2353 					if (row != NULL) {
2354 						/* Restore the cursor on the
2355 						previous clustered index record,
2356 						and empty the buffer. The next
2357 						iteration of the outer loop will
2358 						advance the cursor and read the
2359 						next record (the one which we
2360 						had to ignore due to the buffer
2361 						overflow). */
2362 						mtr_start(&mtr);
2363 						btr_pcur_restore_position(
2364 							BTR_SEARCH_LEAF, &pcur,
2365 							&mtr);
2366 						buf = row_merge_buf_empty(buf);
2367 						/* Restart the outer loop on the
2368 						record. We did not insert it
2369 						into any index yet. */
2370 						ut_ad(i == 0);
2371 						break;
2372 					}
2373 				} else if (dict_index_is_unique(buf->index)) {
2374 					row_merge_dup_t	dup = {
2375 						buf->index, table, col_map, 0};
2376 
2377 					row_merge_buf_sort(buf, &dup);
2378 
2379 					if (dup.n_dup) {
2380 						err = DB_DUPLICATE_KEY;
2381 						trx->error_key_num
2382 							= key_numbers[i];
2383 						break;
2384 					}
2385 				} else {
2386 					row_merge_buf_sort(buf, NULL);
2387 				}
2388 			} else if (online && new_table == old_table) {
2389 				/* Note the newest transaction that
2390 				modified this index when the scan was
2391 				completed. We prevent older readers
2392 				from accessing this index, to ensure
2393 				read consistency. */
2394 
2395 				trx_id_t	max_trx_id;
2396 
2397 				ut_a(row == NULL);
2398 				rw_lock_x_lock(
2399 					dict_index_get_lock(buf->index));
2400 				ut_a(dict_index_get_online_status(buf->index)
2401 				     == ONLINE_INDEX_CREATION);
2402 
2403 				max_trx_id = row_log_get_max_trx(buf->index);
2404 
2405 				if (max_trx_id > buf->index->trx_id) {
2406 					buf->index->trx_id = max_trx_id;
2407 				}
2408 
2409 				rw_lock_x_unlock(
2410 					dict_index_get_lock(buf->index));
2411 			}
2412 
2413 			/* Secondary index and clustered index which is
2414 			not in sorted order can use the temporary file.
2415 			Fulltext index should not use the temporary file. */
2416 			if (!skip_sort && !(buf->index->type & DICT_FTS)) {
2417 				/* In case we can have all rows in sort buffer,
2418 				we can insert directly into the index without
2419 				temporary file if clustered index does not uses
2420 				temporary file. */
2421 				if (row == NULL && file->fd == -1
2422 				    && !clust_temp_file) {
2423 					DBUG_EXECUTE_IF(
2424 						"row_merge_write_failure",
2425 						err = DB_TEMP_FILE_WRITE_FAIL;
2426 						trx->error_key_num = i;
2427 						goto all_done;);
2428 
2429 					DBUG_EXECUTE_IF(
2430 						"row_merge_tmpfile_fail",
2431 						err = DB_OUT_OF_MEMORY;
2432 						trx->error_key_num = i;
2433 						goto all_done;);
2434 
2435 					BtrBulk	btr_bulk(index[i], trx->id,
2436 							 observer);
2437 					btr_bulk.init();
2438 
2439 					err = row_merge_insert_index_tuples(
2440 						trx->id, index[i], old_table,
2441 						-1, NULL, buf, &btr_bulk);
2442 
2443 					err = btr_bulk.finish(err);
2444 
2445 					DBUG_EXECUTE_IF(
2446 						"row_merge_insert_big_row",
2447 						err = DB_TOO_BIG_RECORD;);
2448 
2449 					if (err != DB_SUCCESS) {
2450 						break;
2451 					}
2452 				} else {
2453 					if (row_merge_file_create_if_needed(
2454 						file, tmpfd,
2455 						buf->n_tuples, path) < 0) {
2456 						err = DB_OUT_OF_MEMORY;
2457 						trx->error_key_num = i;
2458 						goto func_exit;
2459 					}
2460 
2461 					/* Ensure that duplicates in the
2462 					clustered index will be detected before
2463 					inserting secondary index records. */
2464 					if (dict_index_is_clust(buf->index)) {
2465 						clust_temp_file = true;
2466 					}
2467 
2468 					ut_ad(file->n_rec > 0);
2469 
2470 					row_merge_buf_write(buf, file, block);
2471 
2472 					if (!row_merge_write(
2473 						    file->fd, file->offset++,
2474 						    block)) {
2475 						err = DB_TEMP_FILE_WRITE_FAIL;
2476 						trx->error_key_num = i;
2477 						break;
2478 					}
2479 
2480 					UNIV_MEM_INVALID(
2481 						&block[0], srv_sort_buf_size);
2482 				}
2483 			}
2484 			merge_buf[i] = row_merge_buf_empty(buf);
2485 
2486 			if (UNIV_LIKELY(row != NULL)) {
2487 				/* Try writing the record again, now
2488 				that the buffer has been written out
2489 				and emptied. */
2490 
2491 				if (UNIV_UNLIKELY
2492 				    (!(rows_added = row_merge_buf_add(
2493 						buf, fts_index, old_table,
2494 						new_table, psort_info, row, ext,
2495 						&doc_id, conv_heap,
2496 						&err, &v_heap, table, trx)))) {
2497 					/* An empty buffer should have enough
2498 					room for at least one record. */
2499 					ut_error;
2500 				}
2501 
2502 				if (err != DB_SUCCESS) {
2503 					break;
2504 				}
2505 
2506 				file->n_rec += rows_added;
2507 			}
2508 		}
2509 
2510 		if (row == NULL) {
2511 			goto all_done;
2512 		}
2513 
2514 		if (err != DB_SUCCESS) {
2515 			goto func_exit;
2516 		}
2517 
2518 		if (v_heap) {
2519 			mem_heap_empty(v_heap);
2520 		}
2521 	}
2522 
2523 func_exit:
2524 	/* row_merge_spatial_rows may have committed
2525 	the mtr	before an error occurs. */
2526 	if (mtr.is_active()) {
2527 		mtr_commit(&mtr);
2528 	}
2529 	mem_heap_free(row_heap);
2530 	ut_free(nonnull);
2531 
2532 all_done:
2533 	if (clust_btr_bulk != NULL) {
2534 		ut_ad(err != DB_SUCCESS);
2535 		clust_btr_bulk->latch();
2536 		err = clust_btr_bulk->finish(
2537 			err);
2538 		UT_DELETE(clust_btr_bulk);
2539 	}
2540 
2541 	if (prev_fields != NULL) {
2542 		ut_free(prev_fields);
2543 		mem_heap_free(mtuple_heap);
2544 	}
2545 
2546 	if (v_heap) {
2547 		mem_heap_free(v_heap);
2548 	}
2549 
2550 	if (conv_heap != NULL) {
2551 		mem_heap_free(conv_heap);
2552 	}
2553 
2554 #ifdef FTS_INTERNAL_DIAG_PRINT
2555 	DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
2556 #endif
2557 	if (fts_pll_sort) {
2558 		bool	all_exit = false;
2559 		ulint	trial_count = 0;
2560 		const ulint max_trial_count = 10000;
2561 
2562 wait_again:
2563                 /* Check if error occurs in child thread */
2564 		for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2565 			if (psort_info[j].error != DB_SUCCESS) {
2566 				err = psort_info[j].error;
2567 				trx->error_key_num = j;
2568 				break;
2569 			}
2570 		}
2571 
2572 		/* Tell all children that parent has done scanning */
2573 		for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2574 			if (err == DB_SUCCESS) {
2575 				psort_info[i].state = FTS_PARENT_COMPLETE;
2576 			} else {
2577 				psort_info[i].state = FTS_PARENT_EXITING;
2578 			}
2579 		}
2580 
2581 		/* Now wait all children to report back to be completed */
2582 		os_event_wait_time_low(fts_parallel_sort_event,
2583 				       1000000, sig_count);
2584 
2585 		for (ulint i = 0; i < fts_sort_pll_degree; i++) {
2586 			if (psort_info[i].child_status != FTS_CHILD_COMPLETE
2587 			    && psort_info[i].child_status != FTS_CHILD_EXITING) {
2588 				sig_count = os_event_reset(
2589 					fts_parallel_sort_event);
2590 				goto wait_again;
2591 			}
2592 		}
2593 
2594 		/* Now all children should complete, wait a bit until
2595 		they all finish setting the event, before we free everything.
2596 		This has a 10 second timeout */
2597 		do {
2598 			all_exit = true;
2599 
2600 			for (ulint j = 0; j < fts_sort_pll_degree; j++) {
2601 				if (psort_info[j].child_status
2602 				    != FTS_CHILD_EXITING) {
2603 					all_exit = false;
2604 					os_thread_sleep(1000);
2605 					break;
2606 				}
2607 			}
2608 			trial_count++;
2609 		} while (!all_exit && trial_count < max_trial_count);
2610 
2611 		if (!all_exit) {
2612 			ib::fatal() << "Not all child sort threads exited"
2613 				" when creating FTS index '"
2614 				<< fts_sort_idx->name << "'";
2615 		}
2616 	}
2617 
2618 #ifdef FTS_INTERNAL_DIAG_PRINT
2619 	DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Tokenization\n");
2620 #endif
2621 	for (ulint i = 0; i < n_index; i++) {
2622 		row_merge_buf_free(merge_buf[i]);
2623 	}
2624 
2625 	row_fts_free_pll_merge_buf(psort_info);
2626 
2627 	ut_free(merge_buf);
2628 
2629 	btr_pcur_close(&pcur);
2630 
2631 	if (sp_tuples != NULL) {
2632 		for (ulint i = 0; i < num_spatial; i++) {
2633 			UT_DELETE(sp_tuples[i]);
2634 		}
2635 		ut_free(sp_tuples);
2636 
2637 		if (sp_heap) {
2638 			mem_heap_free(sp_heap);
2639 		}
2640 	}
2641 
2642 	/* Update the next Doc ID we used. Table should be locked, so
2643 	no concurrent DML */
2644 	if (max_doc_id && err == DB_SUCCESS) {
2645 		/* Sync fts cache for other fts indexes to keep all
2646 		fts indexes consistent in sync_doc_id. */
2647 		err = fts_sync_table(const_cast<dict_table_t*>(new_table),
2648 				     false, true, false);
2649 
2650 		if (err == DB_SUCCESS) {
2651 			fts_update_next_doc_id(
2652 				0, new_table,
2653 				old_table->name.m_name, max_doc_id);
2654 		}
2655 	}
2656 
2657 	trx->op_info = "";
2658 
2659 	DBUG_RETURN(err);
2660 }
2661 
2662 /** Write a record via buffer 2 and read the next record to buffer N.
2663 @param N number of the buffer (0 or 1)
2664 @param INDEX record descriptor
2665 @param AT_END statement to execute at end of input */
2666 #define ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END)			\
2667 	do {								\
2668 		b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], \
2669 					 &buf[2], b2,			\
2670 					 of->fd, &of->offset,		\
2671 					 mrec##N, offsets##N);		\
2672 		if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) {	\
2673 			goto corrupt;					\
2674 		}							\
2675 		b##N = row_merge_read_rec(&block[N * srv_sort_buf_size],\
2676 					  &buf[N], b##N, INDEX,		\
2677 					  file->fd, foffs##N,		\
2678 					  &mrec##N, offsets##N);	\
2679 		if (UNIV_UNLIKELY(!b##N)) {				\
2680 			if (mrec##N) {					\
2681 				goto corrupt;				\
2682 			}						\
2683 			AT_END;						\
2684 		}							\
2685 	} while (0)
2686 
2687 #ifdef HAVE_PSI_STAGE_INTERFACE
2688 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END)			\
2689 	do {								\
2690 		if (stage != NULL) {					\
2691 			stage->inc();					\
2692 		}							\
2693 		ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END);		\
2694 	} while (0)
2695 #else /* HAVE_PSI_STAGE_INTERFACE */
2696 #define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END)			\
2697 	ROW_MERGE_WRITE_GET_NEXT_LOW(N, INDEX, AT_END)
2698 #endif /* HAVE_PSI_STAGE_INTERFACE */
2699 
2700 /** Merge two blocks of records on disk and write a bigger block.
2701 @param[in]	dup	descriptor of index being created
2702 @param[in]	file	file containing index entries
2703 @param[in,out]	block	3 buffers
2704 @param[in,out]	foffs0	offset of first source list in the file
2705 @param[in,out]	foffs1	offset of second source list in the file
2706 @param[in,out]	of	output file
2707 @param[in,out]	stage	performance schema accounting object, used by
2708 ALTER TABLE. If not NULL stage->inc() will be called for each record
2709 processed.
2710 @return DB_SUCCESS or error code */
2711 static MY_ATTRIBUTE((warn_unused_result))
2712 dberr_t
row_merge_blocks(const row_merge_dup_t * dup,const merge_file_t * file,row_merge_block_t * block,ulint * foffs0,ulint * foffs1,merge_file_t * of,ut_stage_alter_t * stage)2713 row_merge_blocks(
2714 	const row_merge_dup_t*	dup,
2715 	const merge_file_t*	file,
2716 	row_merge_block_t*	block,
2717 	ulint*			foffs0,
2718 	ulint*			foffs1,
2719 	merge_file_t*		of,
2720 	ut_stage_alter_t*	stage)
2721 {
2722 	mem_heap_t*	heap;	/*!< memory heap for offsets0, offsets1 */
2723 
2724 	mrec_buf_t*	buf;	/*!< buffer for handling
2725 				split mrec in block[] */
2726 	const byte*	b0;	/*!< pointer to block[0] */
2727 	const byte*	b1;	/*!< pointer to block[srv_sort_buf_size] */
2728 	byte*		b2;	/*!< pointer to block[2 * srv_sort_buf_size] */
2729 	const mrec_t*	mrec0;	/*!< merge rec, points to block[0] or buf[0] */
2730 	const mrec_t*	mrec1;	/*!< merge rec, points to
2731 				block[srv_sort_buf_size] or buf[1] */
2732 	ulint*		offsets0;/* offsets of mrec0 */
2733 	ulint*		offsets1;/* offsets of mrec1 */
2734 
2735 	DBUG_ENTER("row_merge_blocks");
2736 	DBUG_PRINT("ib_merge_sort",
2737 		   ("fd=%d,%lu+%lu to fd=%d,%lu",
2738 		    file->fd, ulong(*foffs0), ulong(*foffs1),
2739 		    of->fd, ulong(of->offset)));
2740 
2741 	heap = row_merge_heap_create(dup->index, &buf, &offsets0, &offsets1);
2742 
2743 	/* Write a record and read the next record.  Split the output
2744 	file in two halves, which can be merged on the following pass. */
2745 
2746 	if (!row_merge_read(file->fd, *foffs0, &block[0])
2747 	    || !row_merge_read(file->fd, *foffs1, &block[srv_sort_buf_size])) {
2748 corrupt:
2749 		mem_heap_free(heap);
2750 		DBUG_RETURN(DB_CORRUPTION);
2751 	}
2752 
2753 	b0 = &block[0];
2754 	b1 = &block[srv_sort_buf_size];
2755 	b2 = &block[2 * srv_sort_buf_size];
2756 
2757 	b0 = row_merge_read_rec(
2758 		&block[0], &buf[0], b0, dup->index,
2759 		file->fd, foffs0, &mrec0, offsets0);
2760 	b1 = row_merge_read_rec(
2761 		&block[srv_sort_buf_size],
2762 		&buf[srv_sort_buf_size], b1, dup->index,
2763 		file->fd, foffs1, &mrec1, offsets1);
2764 	if (UNIV_UNLIKELY(!b0 && mrec0)
2765 	    || UNIV_UNLIKELY(!b1 && mrec1)) {
2766 
2767 		goto corrupt;
2768 	}
2769 
2770 	while (mrec0 && mrec1) {
2771 		int cmp = cmp_rec_rec_simple(
2772 			mrec0, mrec1, offsets0, offsets1,
2773 			dup->index, dup->table);
2774 		if (cmp < 0) {
2775 			ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto merged);
2776 		} else if (cmp) {
2777 			ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto merged);
2778 		} else {
2779 			mem_heap_free(heap);
2780 			DBUG_RETURN(DB_DUPLICATE_KEY);
2781 		}
2782 	}
2783 
2784 merged:
2785 	if (mrec0) {
2786 		/* append all mrec0 to output */
2787 		for (;;) {
2788 			ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto done0);
2789 		}
2790 	}
2791 done0:
2792 	if (mrec1) {
2793 		/* append all mrec1 to output */
2794 		for (;;) {
2795 			ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto done1);
2796 		}
2797 	}
2798 done1:
2799 
2800 	mem_heap_free(heap);
2801 	b2 = row_merge_write_eof(&block[2 * srv_sort_buf_size],
2802 				 b2, of->fd, &of->offset);
2803 	DBUG_RETURN(b2 ? DB_SUCCESS : DB_CORRUPTION);
2804 }
2805 
2806 /** Copy a block of index entries.
2807 @param[in]	index	index being created
2808 @param[in]	file	input file
2809 @param[in,out]	block	3 buffers
2810 @param[in,out]	foffs0	input file offset
2811 @param[in,out]	of	output file
2812 @param[in,out]	stage	performance schema accounting object, used by
2813 ALTER TABLE. If not NULL stage->inc() will be called for each record
2814 processed.
2815 @return TRUE on success, FALSE on failure */
2816 static MY_ATTRIBUTE((warn_unused_result))
2817 ibool
row_merge_blocks_copy(const dict_index_t * index,const merge_file_t * file,row_merge_block_t * block,ulint * foffs0,merge_file_t * of,ut_stage_alter_t * stage)2818 row_merge_blocks_copy(
2819 	const dict_index_t*	index,
2820 	const merge_file_t*	file,
2821 	row_merge_block_t*	block,
2822 	ulint*			foffs0,
2823 	merge_file_t*		of,
2824 	ut_stage_alter_t*	stage)
2825 {
2826 	mem_heap_t*	heap;	/*!< memory heap for offsets0, offsets1 */
2827 
2828 	mrec_buf_t*	buf;	/*!< buffer for handling
2829 				split mrec in block[] */
2830 	const byte*	b0;	/*!< pointer to block[0] */
2831 	byte*		b2;	/*!< pointer to block[2 * srv_sort_buf_size] */
2832 	const mrec_t*	mrec0;	/*!< merge rec, points to block[0] */
2833 	ulint*		offsets0;/* offsets of mrec0 */
2834 	ulint*		offsets1;/* dummy offsets */
2835 
2836 	DBUG_ENTER("row_merge_blocks_copy");
2837 	DBUG_PRINT("ib_merge_sort",
2838 		   ("fd=%d," ULINTPF " to fd=%d," ULINTPF,
2839 		    file->fd, *foffs0,
2840 		    of->fd, of->offset));
2841 
2842 	heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
2843 
2844 	/* Write a record and read the next record.  Split the output
2845 	file in two halves, which can be merged on the following pass. */
2846 
2847 	if (!row_merge_read(file->fd, *foffs0, &block[0])) {
2848 corrupt:
2849 		mem_heap_free(heap);
2850 		DBUG_RETURN(FALSE);
2851 	}
2852 
2853 	b0 = &block[0];
2854 
2855 	b2 = &block[2 * srv_sort_buf_size];
2856 
2857 	b0 = row_merge_read_rec(&block[0], &buf[0], b0, index,
2858 				file->fd, foffs0, &mrec0, offsets0);
2859 	if (UNIV_UNLIKELY(!b0 && mrec0)) {
2860 
2861 		goto corrupt;
2862 	}
2863 
2864 	if (mrec0) {
2865 		/* append all mrec0 to output */
2866 		for (;;) {
2867 			ROW_MERGE_WRITE_GET_NEXT(0, index, goto done0);
2868 		}
2869 	}
2870 done0:
2871 
2872 	/* The file offset points to the beginning of the last page
2873 	that has been read.  Update it to point to the next block. */
2874 	(*foffs0)++;
2875 
2876 	mem_heap_free(heap);
2877 	DBUG_RETURN(row_merge_write_eof(&block[2 * srv_sort_buf_size],
2878 					b2, of->fd, &of->offset)
2879 		    != NULL);
2880 }
2881 
2882 /** Merge disk files.
2883 @param[in]	trx		transaction
2884 @param[in]	dup		descriptor of index being created
2885 @param[in,out]	file		file containing index entries
2886 @param[in,out]	block		3 buffers
2887 @param[in,out]	tmpfd		temporary file handle
2888 @param[in,out]	num_run		Number of runs that remain to be merged
2889 @param[in,out]	run_offset	Array that contains the first offset number
2890 for each merge run
2891 @param[in,out]	stage		performance schema accounting object, used by
2892 ALTER TABLE. If not NULL stage->inc() will be called for each record
2893 processed.
2894 @return DB_SUCCESS or error code */
2895 static
2896 dberr_t
row_merge(trx_t * trx,const row_merge_dup_t * dup,merge_file_t * file,row_merge_block_t * block,int * tmpfd,ulint * num_run,ulint * run_offset,ut_stage_alter_t * stage)2897 row_merge(
2898 	trx_t*			trx,
2899 	const row_merge_dup_t*	dup,
2900 	merge_file_t*		file,
2901 	row_merge_block_t*	block,
2902 	int*			tmpfd,
2903 	ulint*			num_run,
2904 	ulint*			run_offset,
2905 	ut_stage_alter_t*	stage)
2906 {
2907 	ulint		foffs0;	/*!< first input offset */
2908 	ulint		foffs1;	/*!< second input offset */
2909 	dberr_t		error;	/*!< error code */
2910 	merge_file_t	of;	/*!< output file */
2911 	const ulint	ihalf	= run_offset[*num_run / 2];
2912 				/*!< half the input file */
2913 	ulint		n_run	= 0;
2914 				/*!< num of runs generated from this merge */
2915 
2916 	UNIV_MEM_ASSERT_W(&block[0], 3 * srv_sort_buf_size);
2917 
2918 	ut_ad(ihalf < file->offset);
2919 
2920 	of.fd = *tmpfd;
2921 	of.offset = 0;
2922 	of.n_rec = 0;
2923 
2924 #ifdef POSIX_FADV_SEQUENTIAL
2925 	/* The input file will be read sequentially, starting from the
2926 	beginning and the middle.  In Linux, the POSIX_FADV_SEQUENTIAL
2927 	affects the entire file.  Each block will be read exactly once. */
2928 	posix_fadvise(file->fd, 0, 0,
2929 		      POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
2930 #endif /* POSIX_FADV_SEQUENTIAL */
2931 
2932 	/* Merge blocks to the output file. */
2933 	foffs0 = 0;
2934 	foffs1 = ihalf;
2935 
2936 	UNIV_MEM_INVALID(run_offset, *num_run * sizeof *run_offset);
2937 
2938 	for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
2939 
2940 		if (trx_is_interrupted(trx)) {
2941 			return(DB_INTERRUPTED);
2942 		}
2943 
2944 		/* Remember the offset number for this run */
2945 		run_offset[n_run++] = of.offset;
2946 
2947 		error = row_merge_blocks(dup, file, block,
2948 					 &foffs0, &foffs1, &of, stage);
2949 
2950 		if (error != DB_SUCCESS) {
2951 			return(error);
2952 		}
2953 
2954 	}
2955 
2956 	/* Copy the last blocks, if there are any. */
2957 
2958 	while (foffs0 < ihalf) {
2959 		if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
2960 			return(DB_INTERRUPTED);
2961 		}
2962 
2963 		/* Remember the offset number for this run */
2964 		run_offset[n_run++] = of.offset;
2965 
2966 		if (!row_merge_blocks_copy(dup->index, file, block,
2967 					   &foffs0, &of, stage)) {
2968 			return(DB_CORRUPTION);
2969 		}
2970 	}
2971 
2972 	ut_ad(foffs0 == ihalf);
2973 
2974 	while (foffs1 < file->offset) {
2975 		if (trx_is_interrupted(trx)) {
2976 			return(DB_INTERRUPTED);
2977 		}
2978 
2979 		/* Remember the offset number for this run */
2980 		run_offset[n_run++] = of.offset;
2981 
2982 		if (!row_merge_blocks_copy(dup->index, file, block,
2983 					   &foffs1, &of, stage)) {
2984 			return(DB_CORRUPTION);
2985 		}
2986 	}
2987 
2988 	ut_ad(foffs1 == file->offset);
2989 
2990 	if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
2991 		return(DB_CORRUPTION);
2992 	}
2993 
2994 	ut_ad(n_run <= *num_run);
2995 
2996 	*num_run = n_run;
2997 
2998 	/* Each run can contain one or more offsets. As merge goes on,
2999 	the number of runs (to merge) will reduce until we have one
3000 	single run. So the number of runs will always be smaller than
3001 	the number of offsets in file */
3002 	ut_ad((*num_run) <= file->offset);
3003 
3004 	/* The number of offsets in output file is always equal or
3005 	smaller than input file */
3006 	ut_ad(of.offset <= file->offset);
3007 
3008 	/* Swap file descriptors for the next pass. */
3009 	*tmpfd = file->fd;
3010 	*file = of;
3011 
3012 	UNIV_MEM_INVALID(&block[0], 3 * srv_sort_buf_size);
3013 
3014 	return(DB_SUCCESS);
3015 }
3016 
3017 /** Merge disk files.
3018 @param[in]	trx	transaction
3019 @param[in]	dup	descriptor of index being created
3020 @param[in,out]	file	file containing index entries
3021 @param[in,out]	block	3 buffers
3022 @param[in,out]	tmpfd	temporary file handle
3023 @param[in,out]	stage	performance schema accounting object, used by
3024 ALTER TABLE. If not NULL, stage->begin_phase_sort() will be called initially
3025 and then stage->inc() will be called for each record processed.
3026 @return DB_SUCCESS or error code */
3027 dberr_t
row_merge_sort(trx_t * trx,const row_merge_dup_t * dup,merge_file_t * file,row_merge_block_t * block,int * tmpfd,ut_stage_alter_t * stage)3028 row_merge_sort(
3029 	trx_t*			trx,
3030 	const row_merge_dup_t*	dup,
3031 	merge_file_t*		file,
3032 	row_merge_block_t*	block,
3033 	int*			tmpfd,
3034 	ut_stage_alter_t*	stage /* = NULL */)
3035 {
3036 	const ulint	half	= file->offset / 2;
3037 	ulint		num_runs;
3038 	ulint*		run_offset;
3039 	dberr_t		error	= DB_SUCCESS;
3040 	DBUG_ENTER("row_merge_sort");
3041 
3042 	/* Record the number of merge runs we need to perform */
3043 	num_runs = file->offset;
3044 
3045 	if (stage != NULL) {
3046 		stage->begin_phase_sort(log2(num_runs));
3047 	}
3048 
3049 	/* If num_runs are less than 1, nothing to merge */
3050 	if (num_runs <= 1) {
3051 		DBUG_RETURN(error);
3052 	}
3053 
3054 	/* "run_offset" records each run's first offset number */
3055 	run_offset = (ulint*) ut_malloc_nokey(file->offset * sizeof(ulint));
3056 
3057 	/* This tells row_merge() where to start for the first round
3058 	of merge. */
3059 	run_offset[half] = half;
3060 
3061 	/* The file should always contain at least one byte (the end
3062 	of file marker).  Thus, it must be at least one block. */
3063 	ut_ad(file->offset > 0);
3064 
3065 	/* Merge the runs until we have one big run */
3066 	do {
3067 		error = row_merge(trx, dup, file, block, tmpfd,
3068 				  &num_runs, run_offset, stage);
3069 
3070 		if (error != DB_SUCCESS) {
3071 			break;
3072 		}
3073 
3074 		UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
3075 	} while (num_runs > 1);
3076 
3077 	ut_free(run_offset);
3078 
3079 	DBUG_RETURN(error);
3080 }
3081 
3082 /** Copy externally stored columns to the data tuple.
3083 @param[in]	mrec		record containing BLOB pointers,
3084 or NULL to use tuple instead
3085 @param[in]	offsets		offsets of mrec
3086 @param[in]	zip_size	compressed page size in bytes, or 0
3087 @param[in,out]	tuple		data tuple
3088 @param[in,out]	heap		memory heap */
3089 static
3090 void
row_merge_copy_blobs(const mrec_t * mrec,const ulint * offsets,const page_size_t & page_size,dtuple_t * tuple,mem_heap_t * heap)3091 row_merge_copy_blobs(
3092 	const mrec_t*		mrec,
3093 	const ulint*		offsets,
3094 	const page_size_t&	page_size,
3095 	dtuple_t*		tuple,
3096 	mem_heap_t*		heap)
3097 {
3098 	ut_ad(mrec == NULL || rec_offs_any_extern(offsets));
3099 
3100 	for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
3101 		ulint		len;
3102 		const void*	data;
3103 		dfield_t*	field = dtuple_get_nth_field(tuple, i);
3104 		ulint		field_len;
3105 		const byte*	field_data;
3106 
3107 		if (!dfield_is_ext(field)) {
3108 			continue;
3109 		}
3110 
3111 		ut_ad(!dfield_is_null(field));
3112 
3113 		/* During the creation of a PRIMARY KEY, the table is
3114 		X-locked, and we skip copying records that have been
3115 		marked for deletion. Therefore, externally stored
3116 		columns cannot possibly be freed between the time the
3117 		BLOB pointers are read (row_merge_read_clustered_index())
3118 		and dereferenced (below). */
3119 		if (mrec == NULL) {
3120 			field_data
3121 				= static_cast<byte*>(dfield_get_data(field));
3122 			field_len = dfield_get_len(field);
3123 
3124 			ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
3125 
3126 			ut_a(memcmp(field_data + field_len
3127 				     - BTR_EXTERN_FIELD_REF_SIZE,
3128 				     field_ref_zero,
3129 				     BTR_EXTERN_FIELD_REF_SIZE));
3130 
3131 			data = btr_copy_externally_stored_field(
3132 				&len, field_data, page_size, field_len, heap);
3133 		} else {
3134 			data = btr_rec_copy_externally_stored_field(
3135 				mrec, offsets, page_size, i, &len, heap);
3136 		}
3137 
3138 		/* Because we have locked the table, any records
3139 		written by incomplete transactions must have been
3140 		rolled back already. There must not be any incomplete
3141 		BLOB columns. */
3142 		ut_a(data);
3143 
3144 		dfield_set_data(field, data, len);
3145 	}
3146 }
3147 
3148 /** Convert a merge record to a typed data tuple. Note that externally
3149 stored fields are not copied to heap.
3150 @param[in,out]	index	index on the table
3151 @param[in]	mtuple	merge record
3152 @param[in]	heap	memory heap from which memory needed is allocated
3153 @return	index entry built. */
3154 static
3155 void
row_merge_mtuple_to_dtuple(dict_index_t * index,dtuple_t * dtuple,const mtuple_t * mtuple)3156 row_merge_mtuple_to_dtuple(
3157 	dict_index_t*	index,
3158 	dtuple_t*	dtuple,
3159 	const mtuple_t* mtuple)
3160 {
3161 	ut_ad(!dict_index_is_ibuf(index));
3162 
3163 	memcpy(dtuple->fields, mtuple->fields,
3164 	       dtuple->n_fields * sizeof *mtuple->fields);
3165 }
3166 
3167 /** Insert sorted data tuples to the index.
3168 @param[in]	trx_id		transaction identifier
3169 @param[in]	index		index to be inserted
3170 @param[in]	old_table	old table
3171 @param[in]	fd		file descriptor
3172 @param[in,out]	block		file buffer
3173 @param[in]	row_buf		row_buf the sorted data tuples,
3174 or NULL if fd, block will be used instead
3175 @param[in,out]	btr_bulk	btr bulk instance
3176 @param[in,out]	stage		performance schema accounting object, used by
3177 ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
3178 and then stage->inc() will be called for each record that is processed.
3179 @return DB_SUCCESS or error number */
3180 static	MY_ATTRIBUTE((warn_unused_result))
3181 dberr_t
row_merge_insert_index_tuples(trx_id_t trx_id,dict_index_t * index,const dict_table_t * old_table,int fd,row_merge_block_t * block,const row_merge_buf_t * row_buf,BtrBulk * btr_bulk,ut_stage_alter_t * stage)3182 row_merge_insert_index_tuples(
3183 	trx_id_t		trx_id,
3184 	dict_index_t*		index,
3185 	const dict_table_t*	old_table,
3186 	int			fd,
3187 	row_merge_block_t*	block,
3188 	const row_merge_buf_t*	row_buf,
3189 	BtrBulk*		btr_bulk,
3190 	ut_stage_alter_t*	stage /* = NULL */)
3191 {
3192 	const byte*		b;
3193 	mem_heap_t*		heap;
3194 	mem_heap_t*		tuple_heap;
3195 	dberr_t			error = DB_SUCCESS;
3196 	ulint			foffs = 0;
3197 	ulint*			offsets;
3198 	mrec_buf_t*		buf;
3199 	ulint			n_rows = 0;
3200 	dtuple_t*		dtuple;
3201 	DBUG_ENTER("row_merge_insert_index_tuples");
3202 
3203 	ut_ad(!srv_read_only_mode);
3204 	ut_ad(!(index->type & DICT_FTS));
3205 	ut_ad(!dict_index_is_spatial(index));
3206 	ut_ad(trx_id);
3207 
3208 	if (stage != NULL) {
3209 		stage->begin_phase_insert();
3210 	}
3211 
3212 	tuple_heap = mem_heap_create(1000);
3213 
3214 	{
3215 		ulint i	= 1 + REC_OFFS_HEADER_SIZE
3216 			+ dict_index_get_n_fields(index);
3217 		heap = mem_heap_create(sizeof *buf + i * sizeof *offsets);
3218 		offsets = static_cast<ulint*>(
3219 			mem_heap_alloc(heap, i * sizeof *offsets));
3220 		offsets[0] = i;
3221 		offsets[1] = dict_index_get_n_fields(index);
3222 	}
3223 
3224 	if (row_buf != NULL) {
3225 		ut_ad(fd == -1);
3226 		ut_ad(block == NULL);
3227 		DBUG_EXECUTE_IF("row_merge_read_failure",
3228 				error = DB_CORRUPTION;
3229 				goto err_exit;);
3230 		buf = NULL;
3231 		b = NULL;
3232 		dtuple = dtuple_create(
3233 			heap, dict_index_get_n_fields(index));
3234 		dtuple_set_n_fields_cmp(
3235 			dtuple, dict_index_get_n_unique_in_tree(index));
3236 	} else {
3237 		b = block;
3238 		dtuple = NULL;
3239 
3240 		if (!row_merge_read(fd, foffs, block)) {
3241 			error = DB_CORRUPTION;
3242 			goto err_exit;
3243 		} else {
3244 			buf = static_cast<mrec_buf_t*>(
3245 				mem_heap_alloc(heap, sizeof *buf));
3246 		}
3247 	}
3248 
3249 
3250 	for (;;) {
3251 		const mrec_t*	mrec;
3252 		ulint		n_ext;
3253 		mtr_t		mtr;
3254 
3255 		if (stage != NULL) {
3256 			stage->inc();
3257 		}
3258 
3259 		if (row_buf != NULL) {
3260 			if (n_rows >= row_buf->n_tuples) {
3261 				break;
3262 			}
3263 
3264 			/* Convert merge tuple record from
3265 			row buffer to data tuple record */
3266 			row_merge_mtuple_to_dtuple(
3267 				index, dtuple, &row_buf->tuples[n_rows]);
3268 
3269 			n_ext = dtuple_get_n_ext(dtuple);
3270 			n_rows++;
3271 			/* BLOB pointers must be copied from dtuple */
3272 			mrec = NULL;
3273 		} else {
3274 			b = row_merge_read_rec(block, buf, b, index,
3275 					       fd, &foffs, &mrec, offsets);
3276 			if (UNIV_UNLIKELY(!b)) {
3277 				/* End of list, or I/O error */
3278 				if (mrec) {
3279 					error = DB_CORRUPTION;
3280 				}
3281 				break;
3282 			}
3283 
3284 			dtuple = row_rec_to_index_entry_low(
3285 				mrec, index, offsets, &n_ext, tuple_heap);
3286 		}
3287 
3288 		dict_index_t*	old_index
3289 			= dict_table_get_first_index(old_table);
3290 
3291 		if (dict_index_is_clust(index)
3292 		    && dict_index_is_online_ddl(old_index)) {
3293 			error = row_log_table_get_error(old_index);
3294 			if (error != DB_SUCCESS) {
3295 				break;
3296 			}
3297 		}
3298 
3299 		if (!n_ext) {
3300 			/* There are no externally stored columns. */
3301 		} else {
3302 			ut_ad(dict_index_is_clust(index));
3303 			/* Off-page columns can be fetched safely
3304 			when concurrent modifications to the table
3305 			are disabled. (Purge can process delete-marked
3306 			records, but row_merge_read_clustered_index()
3307 			would have skipped them.)
3308 
3309 			When concurrent modifications are enabled,
3310 			row_merge_read_clustered_index() will
3311 			only see rows from transactions that were
3312 			committed before the ALTER TABLE started
3313 			(REPEATABLE READ).
3314 
3315 			Any modifications after the
3316 			row_merge_read_clustered_index() scan
3317 			will go through row_log_table_apply().
3318 			Any modifications to off-page columns
3319 			will be tracked by
3320 			row_log_table_blob_alloc() and
3321 			row_log_table_blob_free(). */
3322 			row_merge_copy_blobs(
3323 				mrec, offsets,
3324 				dict_table_page_size(old_table),
3325 				dtuple, tuple_heap);
3326 		}
3327 
3328 		ut_ad(dtuple_validate(dtuple));
3329 
3330 		error = btr_bulk->insert(dtuple);
3331 
3332 		if (error != DB_SUCCESS) {
3333 			goto err_exit;
3334 		}
3335 
3336 		mem_heap_empty(tuple_heap);
3337 	}
3338 
3339 err_exit:
3340 	mem_heap_free(tuple_heap);
3341 	mem_heap_free(heap);
3342 
3343 	DBUG_RETURN(error);
3344 }
3345 
3346 /*********************************************************************//**
3347 Sets an exclusive lock on a table, for the duration of creating indexes.
3348 @return error code or DB_SUCCESS */
3349 dberr_t
row_merge_lock_table(trx_t * trx,dict_table_t * table,enum lock_mode mode)3350 row_merge_lock_table(
3351 /*=================*/
3352 	trx_t*		trx,		/*!< in/out: transaction */
3353 	dict_table_t*	table,		/*!< in: table to lock */
3354 	enum lock_mode	mode)		/*!< in: LOCK_X or LOCK_S */
3355 {
3356 	ut_ad(!srv_read_only_mode);
3357 	ut_ad(mode == LOCK_X || mode == LOCK_S);
3358 
3359 	trx->op_info = "setting table lock for creating or dropping index";
3360 	trx->ddl = true;
3361 	/* Trx for DDL should not be forced to rollback for now */
3362 	trx->in_innodb |= TRX_FORCE_ROLLBACK_DISABLE;
3363 
3364 	return(lock_table_for_trx(table, trx, mode));
3365 }
3366 
3367 /*********************************************************************//**
3368 Drop an index that was created before an error occurred.
3369 The data dictionary must have been locked exclusively by the caller,
3370 because the transaction will not be committed. */
3371 static
3372 void
row_merge_drop_index_dict(trx_t * trx,index_id_t index_id)3373 row_merge_drop_index_dict(
3374 /*======================*/
3375 	trx_t*		trx,	/*!< in/out: dictionary transaction */
3376 	index_id_t	index_id)/*!< in: index identifier */
3377 {
3378 	static const char sql[] =
3379 		"PROCEDURE DROP_INDEX_PROC () IS\n"
3380 		"BEGIN\n"
3381 		"DELETE FROM SYS_FIELDS WHERE INDEX_ID=:indexid;\n"
3382 		"DELETE FROM SYS_INDEXES WHERE ID=:indexid;\n"
3383 		"END;\n";
3384 	dberr_t		error;
3385 	pars_info_t*	info;
3386 
3387 	ut_ad(!srv_read_only_mode);
3388 	ut_ad(mutex_own(&dict_sys->mutex));
3389 	ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3390 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3391 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3392 
3393 	info = pars_info_create();
3394 	pars_info_add_ull_literal(info, "indexid", index_id);
3395 	trx->op_info = "dropping index from dictionary";
3396 	error = que_eval_sql(info, sql, FALSE, trx);
3397 
3398 	if (error != DB_SUCCESS) {
3399 		/* Even though we ensure that DDL transactions are WAIT
3400 		and DEADLOCK free, we could encounter other errors e.g.,
3401 		DB_TOO_MANY_CONCURRENT_TRXS. */
3402 		trx->error_state = DB_SUCCESS;
3403 
3404 		ib::error() << "row_merge_drop_index_dict failed with error "
3405 			<< error;
3406 	}
3407 
3408 	trx->op_info = "";
3409 }
3410 
3411 /*********************************************************************//**
3412 Drop indexes that were created before an error occurred.
3413 The data dictionary must have been locked exclusively by the caller,
3414 because the transaction will not be committed. */
3415 void
row_merge_drop_indexes_dict(trx_t * trx,table_id_t table_id)3416 row_merge_drop_indexes_dict(
3417 /*========================*/
3418 	trx_t*		trx,	/*!< in/out: dictionary transaction */
3419 	table_id_t	table_id)/*!< in: table identifier */
3420 {
3421 	static const char sql[] =
3422 		"PROCEDURE DROP_INDEXES_PROC () IS\n"
3423 		"ixid CHAR;\n"
3424 		"found INT;\n"
3425 
3426 		"DECLARE CURSOR index_cur IS\n"
3427 		" SELECT ID FROM SYS_INDEXES\n"
3428 		" WHERE TABLE_ID=:tableid AND\n"
3429 		" SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
3430 		"FOR UPDATE;\n"
3431 
3432 		"BEGIN\n"
3433 		"found := 1;\n"
3434 		"OPEN index_cur;\n"
3435 		"WHILE found = 1 LOOP\n"
3436 		"  FETCH index_cur INTO ixid;\n"
3437 		"  IF (SQL % NOTFOUND) THEN\n"
3438 		"    found := 0;\n"
3439 		"  ELSE\n"
3440 		"    DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
3441 		"    DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
3442 		"  END IF;\n"
3443 		"END LOOP;\n"
3444 		"CLOSE index_cur;\n"
3445 
3446 		"END;\n";
3447 	dberr_t		error;
3448 	pars_info_t*	info;
3449 
3450 	ut_ad(!srv_read_only_mode);
3451 	ut_ad(mutex_own(&dict_sys->mutex));
3452 	ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3453 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3454 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3455 
3456 	/* It is possible that table->n_ref_count > 1 when
3457 	locked=TRUE. In this case, all code that should have an open
3458 	handle to the table be waiting for the next statement to execute,
3459 	or waiting for a meta-data lock.
3460 
3461 	A concurrent purge will be prevented by dict_operation_lock. */
3462 
3463 	info = pars_info_create();
3464 	pars_info_add_ull_literal(info, "tableid", table_id);
3465 	trx->op_info = "dropping indexes";
3466 	error = que_eval_sql(info, sql, FALSE, trx);
3467 
3468 	switch (error) {
3469 	case DB_SUCCESS:
3470 		break;
3471 	default:
3472 		/* Even though we ensure that DDL transactions are WAIT
3473 		and DEADLOCK free, we could encounter other errors e.g.,
3474 		DB_TOO_MANY_CONCURRENT_TRXS. */
3475 		ib::error() << "row_merge_drop_indexes_dict failed with error "
3476 			<< error;
3477 		/* fall through */
3478 	case DB_TOO_MANY_CONCURRENT_TRXS:
3479 		trx->error_state = DB_SUCCESS;
3480 	}
3481 
3482 	trx->op_info = "";
3483 }
3484 
3485 /*********************************************************************//**
3486 Drop indexes that were created before an error occurred.
3487 The data dictionary must have been locked exclusively by the caller,
3488 because the transaction will not be committed. */
3489 void
row_merge_drop_indexes(trx_t * trx,dict_table_t * table,ibool locked)3490 row_merge_drop_indexes(
3491 /*===================*/
3492 	trx_t*		trx,	/*!< in/out: dictionary transaction */
3493 	dict_table_t*	table,	/*!< in/out: table containing the indexes */
3494 	ibool		locked)	/*!< in: TRUE=table locked,
3495 				FALSE=may need to do a lazy drop */
3496 {
3497 	dict_index_t*	index;
3498 	dict_index_t*	next_index;
3499 
3500 	ut_ad(!srv_read_only_mode);
3501 	ut_ad(mutex_own(&dict_sys->mutex));
3502 	ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
3503 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3504 	ut_ad(rw_lock_own(dict_operation_lock, RW_LOCK_X));
3505 
3506 	index = dict_table_get_first_index(table);
3507 	ut_ad(dict_index_is_clust(index));
3508 	ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE);
3509 
3510 	/* the caller should have an open handle to the table */
3511 	ut_ad(table->get_ref_count() >= 1);
3512 
3513 	/* It is possible that table->n_ref_count > 1 when
3514 	locked=TRUE. In this case, all code that should have an open
3515 	handle to the table be waiting for the next statement to execute,
3516 	or waiting for a meta-data lock.
3517 
3518 	A concurrent purge will be prevented by dict_operation_lock. */
3519 
3520 	if (!locked && table->get_ref_count() > 1) {
3521 		/* We will have to drop the indexes later, when the
3522 		table is guaranteed to be no longer in use.  Mark the
3523 		indexes as incomplete and corrupted, so that other
3524 		threads will stop using them.  Let dict_table_close()
3525 		or crash recovery or the next invocation of
3526 		prepare_inplace_alter_table() take care of dropping
3527 		the indexes. */
3528 
3529 		while ((index = dict_table_get_next_index(index)) != NULL) {
3530 			ut_ad(!dict_index_is_clust(index));
3531 
3532 			switch (dict_index_get_online_status(index)) {
3533 			case ONLINE_INDEX_ABORTED_DROPPED:
3534 				continue;
3535 			case ONLINE_INDEX_COMPLETE:
3536 				if (index->is_committed()) {
3537 					/* Do nothing to already
3538 					published indexes. */
3539 				} else if (index->type & DICT_FTS) {
3540 					/* Drop a completed FULLTEXT
3541 					index, due to a timeout during
3542 					MDL upgrade for
3543 					commit_inplace_alter_table().
3544 					Because only concurrent reads
3545 					are allowed (and they are not
3546 					seeing this index yet) we
3547 					are safe to drop the index. */
3548 					dict_index_t* prev = UT_LIST_GET_PREV(
3549 						indexes, index);
3550 					/* At least there should be
3551 					the clustered index before
3552 					this one. */
3553 					ut_ad(prev);
3554 					ut_a(table->fts);
3555 					fts_drop_index(table, index, trx);
3556 					/* Since
3557 					INNOBASE_SHARE::idx_trans_tbl
3558 					is shared between all open
3559 					ha_innobase handles to this
3560 					table, no thread should be
3561 					accessing this dict_index_t
3562 					object. Also, we should be
3563 					holding LOCK=SHARED MDL on the
3564 					table even after the MDL
3565 					upgrade timeout. */
3566 
3567 					/* We can remove a DICT_FTS
3568 					index from the cache, because
3569 					we do not allow ADD FULLTEXT INDEX
3570 					with LOCK=NONE. If we allowed that,
3571 					we should exclude FTS entries from
3572 					prebuilt->ins_node->entry_list
3573 					in ins_node_create_entry_list(). */
3574 					dict_index_remove_from_cache(
3575 						table, index);
3576 					index = prev;
3577 				} else {
3578 					rw_lock_x_lock(
3579 						dict_index_get_lock(index));
3580 					dict_index_set_online_status(
3581 						index, ONLINE_INDEX_ABORTED);
3582 					index->type |= DICT_CORRUPT;
3583 					table->drop_aborted = TRUE;
3584 					goto drop_aborted;
3585 				}
3586 				continue;
3587 			case ONLINE_INDEX_CREATION:
3588 				rw_lock_x_lock(dict_index_get_lock(index));
3589 				ut_ad(!index->is_committed());
3590 				row_log_abort_sec(index);
3591 			drop_aborted:
3592 				rw_lock_x_unlock(dict_index_get_lock(index));
3593 
3594 				DEBUG_SYNC_C("merge_drop_index_after_abort");
3595 				/* covered by dict_sys->mutex */
3596 				MONITOR_INC(MONITOR_BACKGROUND_DROP_INDEX);
3597 				/* fall through */
3598 			case ONLINE_INDEX_ABORTED:
3599 				/* Drop the index tree from the
3600 				data dictionary and free it from
3601 				the tablespace, but keep the object
3602 				in the data dictionary cache. */
3603 				row_merge_drop_index_dict(trx, index->id);
3604 				rw_lock_x_lock(dict_index_get_lock(index));
3605 				dict_index_set_online_status(
3606 					index, ONLINE_INDEX_ABORTED_DROPPED);
3607 				rw_lock_x_unlock(dict_index_get_lock(index));
3608 				table->drop_aborted = TRUE;
3609 				continue;
3610 			}
3611 			ut_error;
3612 		}
3613 
3614 		return;
3615 	}
3616 
3617 	row_merge_drop_indexes_dict(trx, table->id);
3618 
3619 	/* Invalidate all row_prebuilt_t::ins_graph that are referring
3620 	to this table. That is, force row_get_prebuilt_insert_row() to
3621 	rebuild prebuilt->ins_node->entry_list). */
3622 	ut_ad(table->def_trx_id <= trx->id);
3623 	table->def_trx_id = trx->id;
3624 
3625 	next_index = dict_table_get_next_index(index);
3626 
3627 	while ((index = next_index) != NULL) {
3628 		/* read the next pointer before freeing the index */
3629 		next_index = dict_table_get_next_index(index);
3630 
3631 		ut_ad(!dict_index_is_clust(index));
3632 
3633 		if (!index->is_committed()) {
3634 			/* If it is FTS index, drop from table->fts
3635 			and also drop its auxiliary tables */
3636 			if (index->type & DICT_FTS) {
3637 				ut_a(table->fts);
3638 				fts_drop_index(table, index, trx);
3639 			}
3640 
3641 			switch (dict_index_get_online_status(index)) {
3642 			case ONLINE_INDEX_CREATION:
3643 				/* This state should only be possible
3644 				when prepare_inplace_alter_table() fails
3645 				after invoking row_merge_create_index().
3646 				In inplace_alter_table(),
3647 				row_merge_build_indexes()
3648 				should never leave the index in this state.
3649 				It would invoke row_log_abort_sec() on
3650 				failure. */
3651 			case ONLINE_INDEX_COMPLETE:
3652 				/* In these cases, we are able to drop
3653 				the index straight. The DROP INDEX was
3654 				never deferred. */
3655 				break;
3656 			case ONLINE_INDEX_ABORTED:
3657 			case ONLINE_INDEX_ABORTED_DROPPED:
3658 				/* covered by dict_sys->mutex */
3659 				MONITOR_DEC(MONITOR_BACKGROUND_DROP_INDEX);
3660 			}
3661 
3662 			dict_index_remove_from_cache(table, index);
3663 		}
3664 	}
3665 
3666 	table->drop_aborted = FALSE;
3667 	ut_d(dict_table_check_for_dup_indexes(table, CHECK_ALL_COMPLETE));
3668 }
3669 
3670 /*********************************************************************//**
3671 Drop all partially created indexes during crash recovery. */
3672 void
row_merge_drop_temp_indexes(void)3673 row_merge_drop_temp_indexes(void)
3674 /*=============================*/
3675 {
3676 	static const char sql[] =
3677 		"PROCEDURE DROP_TEMP_INDEXES_PROC () IS\n"
3678 		"ixid CHAR;\n"
3679 		"found INT;\n"
3680 
3681 		"DECLARE CURSOR index_cur IS\n"
3682 		" SELECT ID FROM SYS_INDEXES\n"
3683 		" WHERE SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
3684 		"FOR UPDATE;\n"
3685 
3686 		"BEGIN\n"
3687 		"found := 1;\n"
3688 		"OPEN index_cur;\n"
3689 		"WHILE found = 1 LOOP\n"
3690 		"  FETCH index_cur INTO ixid;\n"
3691 		"  IF (SQL % NOTFOUND) THEN\n"
3692 		"    found := 0;\n"
3693 		"  ELSE\n"
3694 		"    DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
3695 		"    DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
3696 		"  END IF;\n"
3697 		"END LOOP;\n"
3698 		"CLOSE index_cur;\n"
3699 		"END;\n";
3700 	trx_t*	trx;
3701 	dberr_t	error;
3702 
3703 	/* Load the table definitions that contain partially defined
3704 	indexes, so that the data dictionary information can be checked
3705 	when accessing the tablename.ibd files. */
3706 	trx = trx_allocate_for_background();
3707 	trx->op_info = "dropping partially created indexes";
3708 	row_mysql_lock_data_dictionary(trx);
3709 	/* Ensure that this transaction will be rolled back and locks
3710 	will be released, if the server gets killed before the commit
3711 	gets written to the redo log. */
3712 	trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
3713 
3714 	trx->op_info = "dropping indexes";
3715 	error = que_eval_sql(NULL, sql, FALSE, trx);
3716 
3717 	if (error != DB_SUCCESS) {
3718 		/* Even though we ensure that DDL transactions are WAIT
3719 		and DEADLOCK free, we could encounter other errors e.g.,
3720 		DB_TOO_MANY_CONCURRENT_TRXS. */
3721 		trx->error_state = DB_SUCCESS;
3722 
3723 		ib::error() << "row_merge_drop_temp_indexes failed with error"
3724 			<< error;
3725 	}
3726 
3727 	trx_commit_for_mysql(trx);
3728 	row_mysql_unlock_data_dictionary(trx);
3729 	trx_free_for_background(trx);
3730 }
3731 
3732 
3733 /** Create temporary merge files in the given paramater path, and if
3734 UNIV_PFS_IO defined, register the file descriptor with Performance Schema.
3735 @param[in]	path	location for creating temporary merge files.
3736 @return File descriptor */
3737 int
row_merge_file_create_low(const char * path)3738 row_merge_file_create_low(
3739 	const char*	path)
3740 {
3741     int fd;
3742     if (path == NULL) {
3743       path = innobase_mysql_tmpdir();
3744     }
3745 #ifdef UNIV_PFS_IO
3746 	/* This temp file open does not go through normal
3747 	file APIs, add instrumentation to register with
3748 	performance schema */
3749 	struct PSI_file_locker*	locker = NULL;
3750 	Datafile df;
3751 	df.make_filepath(path, "Innodb Merge Temp File", NO_EXT);
3752 
3753         PSI_file_locker_state	state;
3754         locker = PSI_FILE_CALL(get_thread_file_name_locker)(
3755             &state, innodb_temp_file_key.m_value, PSI_FILE_OPEN, df.filepath(),
3756             &locker);
3757         if (locker != NULL) {
3758 		PSI_FILE_CALL(start_file_open_wait)(locker,
3759 						__FILE__,
3760 						__LINE__);
3761         }
3762 #endif
3763 	fd = innobase_mysql_tmpfile(path);
3764 #ifdef UNIV_PFS_IO
3765 	 if (locker != NULL) {
3766 		PSI_FILE_CALL(end_file_open_wait_and_bind_to_descriptor)(
3767 				locker, fd);
3768 		}
3769 #endif
3770 
3771 	if (fd < 0) {
3772 		ib::error() << "Cannot create temporary merge file";
3773 		return(-1);
3774 	}
3775 	return(fd);
3776 }
3777 
3778 
3779 /** Create a merge file in the given location.
3780 @param[out]	merge_file	merge file structure
3781 @param[in]	path		location for creating temporary file
3782 @return file descriptor, or -1 on failure */
3783 int
row_merge_file_create(merge_file_t * merge_file,const char * path)3784 row_merge_file_create(
3785 	merge_file_t*	merge_file,
3786 	const char*	path)
3787 {
3788 	merge_file->fd = row_merge_file_create_low(path);
3789 	merge_file->offset = 0;
3790 	merge_file->n_rec = 0;
3791 
3792 	if (merge_file->fd >= 0) {
3793 		if (srv_disable_sort_file_cache) {
3794 			os_file_set_nocache(merge_file->fd,
3795 				"row0merge.cc", "sort");
3796 		}
3797 	}
3798 	return(merge_file->fd);
3799 }
3800 
3801 /*********************************************************************//**
3802 Destroy a merge file. And de-register the file from Performance Schema
3803 if UNIV_PFS_IO is defined. */
3804 void
row_merge_file_destroy_low(int fd)3805 row_merge_file_destroy_low(
3806 /*=======================*/
3807 	int		fd)	/*!< in: merge file descriptor */
3808 {
3809 #ifdef UNIV_PFS_IO
3810 	struct PSI_file_locker*	locker = NULL;
3811 	PSI_file_locker_state	state;
3812 	locker = PSI_FILE_CALL(get_thread_file_descriptor_locker)(
3813 			       &state, fd, PSI_FILE_CLOSE);
3814 	if (locker != NULL) {
3815 		PSI_FILE_CALL(start_file_wait)(
3816 			      locker, 0, __FILE__, __LINE__);
3817 	}
3818 #endif
3819 	if (fd >= 0) {
3820 		close(fd);
3821 	}
3822 #ifdef UNIV_PFS_IO
3823 	if (locker != NULL) {
3824 		PSI_FILE_CALL(end_file_wait)(locker, 0);
3825 	}
3826 #endif
3827 }
3828 /*********************************************************************//**
3829 Destroy a merge file. */
3830 void
row_merge_file_destroy(merge_file_t * merge_file)3831 row_merge_file_destroy(
3832 /*===================*/
3833 	merge_file_t*	merge_file)	/*!< in/out: merge file structure */
3834 {
3835 	ut_ad(!srv_read_only_mode);
3836 
3837 	if (merge_file->fd != -1) {
3838 		row_merge_file_destroy_low(merge_file->fd);
3839 		merge_file->fd = -1;
3840 	}
3841 }
3842 
3843 /*********************************************************************//**
3844 Rename an index in the dictionary that was created. The data
3845 dictionary must have been locked exclusively by the caller, because
3846 the transaction will not be committed.
3847 @return DB_SUCCESS if all OK */
3848 dberr_t
row_merge_rename_index_to_add(trx_t * trx,table_id_t table_id,index_id_t index_id)3849 row_merge_rename_index_to_add(
3850 /*==========================*/
3851 	trx_t*		trx,		/*!< in/out: transaction */
3852 	table_id_t	table_id,	/*!< in: table identifier */
3853 	index_id_t	index_id)	/*!< in: index identifier */
3854 {
3855 	dberr_t		err = DB_SUCCESS;
3856 	pars_info_t*	info = pars_info_create();
3857 
3858 	/* We use the private SQL parser of Innobase to generate the
3859 	query graphs needed in renaming indexes. */
3860 
3861 	static const char rename_index[] =
3862 		"PROCEDURE RENAME_INDEX_PROC () IS\n"
3863 		"BEGIN\n"
3864 		"UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
3865 		"WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
3866 		"END;\n";
3867 
3868 	ut_ad(trx);
3869 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
3870 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3871 
3872 	trx->op_info = "renaming index to add";
3873 
3874 	pars_info_add_ull_literal(info, "tableid", table_id);
3875 	pars_info_add_ull_literal(info, "indexid", index_id);
3876 
3877 	err = que_eval_sql(info, rename_index, FALSE, trx);
3878 
3879 	if (err != DB_SUCCESS) {
3880 		/* Even though we ensure that DDL transactions are WAIT
3881 		and DEADLOCK free, we could encounter other errors e.g.,
3882 		DB_TOO_MANY_CONCURRENT_TRXS. */
3883 		trx->error_state = DB_SUCCESS;
3884 
3885 		ib::error() << "row_merge_rename_index_to_add failed with"
3886 			" error " << err;
3887 	}
3888 
3889 	trx->op_info = "";
3890 
3891 	return(err);
3892 }
3893 
3894 /*********************************************************************//**
3895 Rename an index in the dictionary that is to be dropped. The data
3896 dictionary must have been locked exclusively by the caller, because
3897 the transaction will not be committed.
3898 @return DB_SUCCESS if all OK */
3899 dberr_t
row_merge_rename_index_to_drop(trx_t * trx,table_id_t table_id,index_id_t index_id)3900 row_merge_rename_index_to_drop(
3901 /*===========================*/
3902 	trx_t*		trx,		/*!< in/out: transaction */
3903 	table_id_t	table_id,	/*!< in: table identifier */
3904 	index_id_t	index_id)	/*!< in: index identifier */
3905 {
3906 	dberr_t		err;
3907 	pars_info_t*	info = pars_info_create();
3908 
3909 	ut_ad(!srv_read_only_mode);
3910 
3911 	/* We use the private SQL parser of Innobase to generate the
3912 	query graphs needed in renaming indexes. */
3913 
3914 	static const char rename_index[] =
3915 		"PROCEDURE RENAME_INDEX_PROC () IS\n"
3916 		"BEGIN\n"
3917 		"UPDATE SYS_INDEXES SET NAME=CONCAT('"
3918 		TEMP_INDEX_PREFIX_STR "',NAME)\n"
3919 		"WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
3920 		"END;\n";
3921 
3922 	ut_ad(trx);
3923 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
3924 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3925 
3926 	trx->op_info = "renaming index to drop";
3927 
3928 	pars_info_add_ull_literal(info, "tableid", table_id);
3929 	pars_info_add_ull_literal(info, "indexid", index_id);
3930 
3931 	err = que_eval_sql(info, rename_index, FALSE, trx);
3932 
3933 	if (err != DB_SUCCESS) {
3934 		/* Even though we ensure that DDL transactions are WAIT
3935 		and DEADLOCK free, we could encounter other errors e.g.,
3936 		DB_TOO_MANY_CONCURRENT_TRXS. */
3937 		trx->error_state = DB_SUCCESS;
3938 
3939 		ib::error() << "row_merge_rename_index_to_drop failed with"
3940 			" error " << err;
3941 	}
3942 
3943 	trx->op_info = "";
3944 
3945 	return(err);
3946 }
3947 
3948 /*********************************************************************//**
3949 Provide a new pathname for a table that is being renamed if it belongs to
3950 a file-per-table tablespace.  The caller is responsible for freeing the
3951 memory allocated for the return value.
3952 @return new pathname of tablespace file, or NULL if space = 0 */
3953 char*
row_make_new_pathname(dict_table_t * table,const char * new_name)3954 row_make_new_pathname(
3955 /*==================*/
3956 	dict_table_t*	table,		/*!< in: table to be renamed */
3957 	const char*	new_name)	/*!< in: new name */
3958 {
3959 	char*	new_path;
3960 	char*	old_path;
3961 
3962 	ut_ad(!is_system_tablespace(table->space));
3963 
3964 	old_path = fil_space_get_first_path(table->space);
3965 	ut_a(old_path);
3966 
3967 	new_path = os_file_make_new_pathname(old_path, new_name);
3968 
3969 	ut_free(old_path);
3970 
3971 	return(new_path);
3972 }
3973 
3974 /*********************************************************************//**
3975 Rename the tables in the data dictionary.  The data dictionary must
3976 have been locked exclusively by the caller, because the transaction
3977 will not be committed.
3978 @return error code or DB_SUCCESS */
3979 dberr_t
row_merge_rename_tables_dict(dict_table_t * old_table,dict_table_t * new_table,const char * tmp_name,trx_t * trx)3980 row_merge_rename_tables_dict(
3981 /*=========================*/
3982 	dict_table_t*	old_table,	/*!< in/out: old table, renamed to
3983 					tmp_name */
3984 	dict_table_t*	new_table,	/*!< in/out: new table, renamed to
3985 					old_table->name */
3986 	const char*	tmp_name,	/*!< in: new name for old_table */
3987 	trx_t*		trx)		/*!< in/out: dictionary transaction */
3988 {
3989 	dberr_t		err	= DB_ERROR;
3990 	pars_info_t*	info;
3991 
3992 	ut_ad(!srv_read_only_mode);
3993 	ut_ad(old_table != new_table);
3994 	ut_ad(mutex_own(&dict_sys->mutex));
3995 	ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
3996 	ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_TABLE
3997 	      || trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
3998 
3999 	trx->op_info = "renaming tables";
4000 
4001 	/* We use the private SQL parser of Innobase to generate the query
4002 	graphs needed in updating the dictionary data in system tables. */
4003 
4004 	info = pars_info_create();
4005 
4006 	pars_info_add_str_literal(info, "new_name", new_table->name.m_name);
4007 	pars_info_add_str_literal(info, "old_name", old_table->name.m_name);
4008 	pars_info_add_str_literal(info, "tmp_name", tmp_name);
4009 
4010 	err = que_eval_sql(info,
4011 			   "PROCEDURE RENAME_TABLES () IS\n"
4012 			   "BEGIN\n"
4013 			   "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
4014 			   " WHERE NAME = :old_name;\n"
4015 			   "UPDATE SYS_TABLES SET NAME = :old_name\n"
4016 			   " WHERE NAME = :new_name;\n"
4017 			   "END;\n", FALSE, trx);
4018 
4019 	/* Update SYS_TABLESPACES and SYS_DATAFILES if the old table being
4020 	renamed is a single-table tablespace, which must be implicitly
4021 	renamed along with the table. */
4022 	if (err == DB_SUCCESS
4023 	    && dict_table_is_file_per_table(old_table)
4024 	    && !old_table->ibd_file_missing) {
4025 		/* Make pathname to update SYS_DATAFILES. */
4026 		char* tmp_path = row_make_new_pathname(old_table, tmp_name);
4027 
4028 		info = pars_info_create();
4029 
4030 		pars_info_add_str_literal(info, "tmp_name", tmp_name);
4031 		pars_info_add_str_literal(info, "tmp_path", tmp_path);
4032 		pars_info_add_int4_literal(info, "old_space",
4033 					   (lint) old_table->space);
4034 
4035 		err = que_eval_sql(info,
4036 				   "PROCEDURE RENAME_OLD_SPACE () IS\n"
4037 				   "BEGIN\n"
4038 				   "UPDATE SYS_TABLESPACES"
4039 				   " SET NAME = :tmp_name\n"
4040 				   " WHERE SPACE = :old_space;\n"
4041 				   "UPDATE SYS_DATAFILES"
4042 				   " SET PATH = :tmp_path\n"
4043 				   " WHERE SPACE = :old_space;\n"
4044 				   "END;\n", FALSE, trx);
4045 
4046 		ut_free(tmp_path);
4047 	}
4048 
4049 	/* Update SYS_TABLESPACES and SYS_DATAFILES if the new table being
4050 	renamed is a single-table tablespace, which must be implicitly
4051 	renamed along with the table. */
4052 	if (err == DB_SUCCESS
4053 	    && dict_table_is_file_per_table(new_table)) {
4054 		/* Make pathname to update SYS_DATAFILES. */
4055 		char* old_path = row_make_new_pathname(
4056 			new_table, old_table->name.m_name);
4057 
4058 		info = pars_info_create();
4059 
4060 		pars_info_add_str_literal(info, "old_name",
4061 					  old_table->name.m_name);
4062 		pars_info_add_str_literal(info, "old_path", old_path);
4063 		pars_info_add_int4_literal(info, "new_space",
4064 					   (lint) new_table->space);
4065 
4066 		err = que_eval_sql(info,
4067 				   "PROCEDURE RENAME_NEW_SPACE () IS\n"
4068 				   "BEGIN\n"
4069 				   "UPDATE SYS_TABLESPACES"
4070 				   " SET NAME = :old_name\n"
4071 				   " WHERE SPACE = :new_space;\n"
4072 				   "UPDATE SYS_DATAFILES"
4073 				   " SET PATH = :old_path\n"
4074 				   " WHERE SPACE = :new_space;\n"
4075 				   "END;\n", FALSE, trx);
4076 
4077 		ut_free(old_path);
4078 	}
4079 
4080 	if (err == DB_SUCCESS && dict_table_is_discarded(new_table)) {
4081 		err = row_import_update_discarded_flag(
4082 			trx, new_table->id, true, true);
4083 	}
4084 
4085 	trx->op_info = "";
4086 
4087 	return(err);
4088 }
4089 
4090 /** Create and execute a query graph for creating an index.
4091 @param[in,out]	trx	trx
4092 @param[in,out]	table	table
4093 @param[in,out]	index	index
4094 @param[in]	add_v	new virtual columns added along with add index call
4095 @return DB_SUCCESS or error code */
4096 static MY_ATTRIBUTE((warn_unused_result))
4097 dberr_t
row_merge_create_index_graph(trx_t * trx,dict_table_t * table,dict_index_t * index,const dict_add_v_col_t * add_v)4098 row_merge_create_index_graph(
4099 	trx_t*			trx,
4100 	dict_table_t*		table,
4101 	dict_index_t*		index,
4102 	const dict_add_v_col_t* add_v)
4103 {
4104 	ind_node_t*	node;		/*!< Index creation node */
4105 	mem_heap_t*	heap;		/*!< Memory heap */
4106 	que_thr_t*	thr;		/*!< Query thread */
4107 	dberr_t		err;
4108 
4109 	DBUG_ENTER("row_merge_create_index_graph");
4110 
4111 	ut_ad(trx);
4112 	ut_ad(table);
4113 	ut_ad(index);
4114 
4115 	heap = mem_heap_create(512);
4116 
4117 	index->table = table;
4118 	node = ind_create_graph_create(index, heap, add_v);
4119 	thr = pars_complete_graph_for_exec(node, trx, heap, NULL);
4120 
4121 	ut_a(thr == que_fork_start_command(
4122 			static_cast<que_fork_t*>(que_node_get_parent(thr))));
4123 
4124 	que_run_threads(thr);
4125 
4126 	err = trx->error_state;
4127 
4128 	que_graph_free((que_t*) que_node_get_parent(thr));
4129 
4130 	DBUG_RETURN(err);
4131 }
4132 
4133 /** Create the index and load in to the dictionary.
4134 @param[in,out]	trx		trx (sets error_state)
4135 @param[in,out]	table		the index is on this table
4136 @param[in]	index_def	the index definition
4137 @param[in]	add_v		new virtual columns added along with add
4138 				index call
4139 @return index, or NULL on error */
4140 dict_index_t*
row_merge_create_index(trx_t * trx,dict_table_t * table,const index_def_t * index_def,const dict_add_v_col_t * add_v)4141 row_merge_create_index(
4142 	trx_t*			trx,
4143 	dict_table_t*		table,
4144 	const index_def_t*	index_def,
4145 	const dict_add_v_col_t*	add_v)
4146 {
4147 	dict_index_t*	index;
4148 	dberr_t		err;
4149 	ulint		n_fields = index_def->n_fields;
4150 	ulint		i;
4151 	bool		has_new_v_col = false;
4152 
4153 	DBUG_ENTER("row_merge_create_index");
4154 
4155 	ut_ad(!srv_read_only_mode);
4156 
4157 	/* Create the index prototype, using the passed in def, this is not
4158 	a persistent operation. We pass 0 as the space id, and determine at
4159 	a lower level the space id where to store the table. */
4160 
4161 	index = dict_mem_index_create(table->name.m_name, index_def->name,
4162 				      0, index_def->ind_type, n_fields);
4163 
4164 	ut_a(index);
4165 
4166 	index->set_committed(index_def->rebuild);
4167 
4168 	for (i = 0; i < n_fields; i++) {
4169 		const char*	name;
4170 		index_field_t*	ifield = &index_def->fields[i];
4171 
4172 		if (ifield->is_v_col) {
4173 			if (ifield->col_no >= table->n_v_def) {
4174 				ut_ad(ifield->col_no < table->n_v_def
4175 				      + add_v->n_v_col);
4176 				ut_ad(ifield->col_no >= table->n_v_def);
4177 				name = add_v->v_col_name[
4178 					ifield->col_no - table->n_v_def];
4179 
4180 				has_new_v_col = true;
4181 			} else {
4182 				name = dict_table_get_v_col_name(
4183 					table, ifield->col_no);
4184 			}
4185 		} else {
4186 			name = dict_table_get_col_name(table, ifield->col_no);
4187 
4188 		}
4189 
4190 		dict_mem_index_add_field(index, name, ifield->prefix_len);
4191 	}
4192 
4193 	/* Add the index to SYS_INDEXES, using the index prototype. */
4194 	err = row_merge_create_index_graph(trx, table, index, add_v);
4195 
4196 	if (err == DB_SUCCESS) {
4197 
4198 		index = dict_table_get_index_on_name(table, index_def->name,
4199 						     index_def->rebuild);
4200 
4201 		ut_a(index);
4202 
4203 		index->parser = index_def->parser;
4204 		index->is_ngram = index_def->is_ngram;
4205 		index->has_new_v_col = has_new_v_col;
4206 
4207 		/* Note the id of the transaction that created this
4208 		index, we use it to restrict readers from accessing
4209 		this index, to ensure read consistency. */
4210 		ut_ad(index->trx_id == trx->id);
4211 	} else {
4212 		index = NULL;
4213 	}
4214 
4215 	DBUG_RETURN(index);
4216 }
4217 
4218 /*********************************************************************//**
4219 Check if a transaction can use an index. */
4220 ibool
row_merge_is_index_usable(const trx_t * trx,const dict_index_t * index)4221 row_merge_is_index_usable(
4222 /*======================*/
4223 	const trx_t*		trx,	/*!< in: transaction */
4224 	const dict_index_t*	index)	/*!< in: index to check */
4225 {
4226 	if (!dict_index_is_clust(index)
4227 	    && dict_index_is_online_ddl(index)) {
4228 		/* Indexes that are being created are not useable. */
4229 		return(FALSE);
4230 	}
4231 
4232 	return(!dict_index_is_corrupted(index)
4233 	       && (dict_table_is_temporary(index->table)
4234 		   || index->trx_id == 0
4235 		   || !MVCC::is_view_active(trx->read_view)
4236 		   || trx->read_view->changes_visible(
4237 			   index->trx_id,
4238 			   index->table->name)));
4239 }
4240 
4241 /*********************************************************************//**
4242 Drop a table. The caller must have ensured that the background stats
4243 thread is not processing the table. This can be done by calling
4244 dict_stats_wait_bg_to_stop_using_table() after locking the dictionary and
4245 before calling this function.
4246 @return DB_SUCCESS or error code */
4247 dberr_t
row_merge_drop_table(trx_t * trx,dict_table_t * table)4248 row_merge_drop_table(
4249 /*=================*/
4250 	trx_t*		trx,		/*!< in: transaction */
4251 	dict_table_t*	table)		/*!< in: table to drop */
4252 {
4253 	ut_ad(!srv_read_only_mode);
4254 
4255 	/* There must be no open transactions on the table. */
4256 	ut_a(table->get_ref_count() == 0);
4257 
4258 	return(row_drop_table_for_mysql(table->name.m_name,
4259 					trx, false, false));
4260 }
4261 
4262 /** Write an MLOG_INDEX_LOAD record to indicate in the redo-log
4263 that redo-logging of individual index pages was disabled, and
4264 the flushing of such pages to the data files was completed.
4265 @param[in]	index	an index tree on which redo logging was disabled */
4266 static
4267 void
row_merge_write_redo(const dict_index_t * index)4268 row_merge_write_redo(
4269 	const dict_index_t*	index)
4270 {
4271 	mtr_t	mtr;
4272 	byte*	log_ptr;
4273 
4274 	ut_ad(!dict_table_is_temporary(index->table));
4275 	mtr.start();
4276 	log_ptr = mlog_open(&mtr, 11 + 8);
4277 	log_ptr = mlog_write_initial_log_record_low(
4278 		MLOG_INDEX_LOAD,
4279 		index->space, index->page, log_ptr, &mtr);
4280 	mach_write_to_8(log_ptr, index->id);
4281 	mlog_close(&mtr, log_ptr + 8);
4282 	mtr.commit();
4283 }
4284 
4285 /** Build indexes on a table by reading a clustered index, creating a temporary
4286 file containing index entries, merge sorting these index entries and inserting
4287 sorted index entries to indexes.
4288 @param[in]	trx		transaction
4289 @param[in]	old_table	table where rows are read from
4290 @param[in]	new_table	table where indexes are created; identical to
4291 old_table unless creating a PRIMARY KEY
4292 @param[in]	online		true if creating indexes online
4293 @param[in]	indexes		indexes to be created
4294 @param[in]	key_numbers	MySQL key numbers
4295 @param[in]	n_indexes	size of indexes[]
4296 @param[in,out]	table		MySQL table, for reporting erroneous key value
4297 if applicable
4298 @param[in]	add_cols	default values of added columns, or NULL
4299 @param[in]	col_map		mapping of old column numbers to new ones, or
4300 NULL if old_table == new_table
4301 @param[in]	add_autoinc	number of added AUTO_INCREMENT columns, or
4302 ULINT_UNDEFINED if none is added
4303 @param[in,out]	sequence	autoinc sequence
4304 @param[in]	skip_pk_sort	whether the new PRIMARY KEY will follow
4305 existing order
4306 @param[in,out]	stage		performance schema accounting object, used by
4307 ALTER TABLE. stage->begin_phase_read_pk() will be called at the beginning of
4308 this function and it will be passed to other functions for further accounting.
4309 @param[in]	add_v		new virtual columns added along with indexes
4310 @param[in]	eval_table	mysql table used to evaluate virtual column
4311 				value, see innobase_get_computed_value().
4312 @return DB_SUCCESS or error code */
4313 dberr_t
row_merge_build_indexes(trx_t * trx,dict_table_t * old_table,dict_table_t * new_table,bool online,dict_index_t ** indexes,const ulint * key_numbers,ulint n_indexes,struct TABLE * table,const dtuple_t * add_cols,const ulint * col_map,ulint add_autoinc,ib_sequence_t & sequence,bool skip_pk_sort,ut_stage_alter_t * stage,const dict_add_v_col_t * add_v,struct TABLE * eval_table)4314 row_merge_build_indexes(
4315 	trx_t*			trx,
4316 	dict_table_t*		old_table,
4317 	dict_table_t*		new_table,
4318 	bool			online,
4319 	dict_index_t**		indexes,
4320 	const ulint*		key_numbers,
4321 	ulint			n_indexes,
4322 	struct TABLE*		table,
4323 	const dtuple_t*		add_cols,
4324 	const ulint*		col_map,
4325 	ulint			add_autoinc,
4326 	ib_sequence_t&		sequence,
4327 	bool			skip_pk_sort,
4328 	ut_stage_alter_t*	stage,
4329 	const dict_add_v_col_t*	add_v,
4330 	struct TABLE*		eval_table)
4331 {
4332 	merge_file_t*		merge_files;
4333 	row_merge_block_t*	block;
4334 	ut_new_pfx_t		block_pfx;
4335 	ulint			i;
4336 	ulint			j;
4337 	dberr_t			error;
4338 	int			tmpfd = -1;
4339 	dict_index_t*		fts_sort_idx = NULL;
4340 	fts_psort_t*		psort_info = NULL;
4341 	fts_psort_t*		merge_info = NULL;
4342 	int64_t			sig_count = 0;
4343 	bool			fts_psort_initiated = false;
4344 	DBUG_ENTER("row_merge_build_indexes");
4345 
4346 	ut_ad(!srv_read_only_mode);
4347 	ut_ad((old_table == new_table) == !col_map);
4348 	ut_ad(!add_cols || col_map);
4349 
4350 	stage->begin_phase_read_pk(skip_pk_sort && new_table != old_table
4351 				   ? n_indexes - 1
4352 				   : n_indexes);
4353 
4354 	/* Allocate memory for merge file data structure and initialize
4355 	fields */
4356 
4357 	ut_allocator<row_merge_block_t>	alloc(mem_key_row_merge_sort);
4358 
4359 	/* This will allocate "3 * srv_sort_buf_size" elements of type
4360 	row_merge_block_t. The latter is defined as byte. */
4361 	block = alloc.allocate_large(3 * srv_sort_buf_size, &block_pfx);
4362 
4363 	if (block == NULL) {
4364 		DBUG_RETURN(DB_OUT_OF_MEMORY);
4365 	}
4366 
4367 	trx_start_if_not_started_xa(trx, true);
4368 
4369 	/* Check if we need a flush observer to flush dirty pages.
4370 	Since we disable redo logging in bulk load, so we should flush
4371 	dirty pages before online log apply, because online log apply enables
4372 	redo logging(we can do further optimization here).
4373 	1. online add index: flush dirty pages right before row_log_apply().
4374 	2. table rebuild: flush dirty pages before row_log_table_apply().
4375 
4376 	we use bulk load to create all types of indexes except spatial index,
4377 	for which redo logging is enabled. If we create only spatial indexes,
4378 	we don't need to flush dirty pages at all. */
4379 	bool	need_flush_observer = (old_table != new_table);
4380 
4381 	for (i = 0; i < n_indexes; i++) {
4382 		if (!dict_index_is_spatial(indexes[i])) {
4383 			need_flush_observer = true;
4384 		}
4385 	}
4386 
4387 	FlushObserver*	flush_observer = NULL;
4388 	if (need_flush_observer) {
4389 		flush_observer = UT_NEW_NOKEY(
4390 			FlushObserver(new_table->space, trx, stage));
4391 
4392 		trx_set_flush_observer(trx, flush_observer);
4393 	}
4394 
4395 	merge_files = static_cast<merge_file_t*>(
4396 		ut_malloc_nokey(n_indexes * sizeof *merge_files));
4397 
4398 	/* Initialize all the merge file descriptors, so that we
4399 	don't call row_merge_file_destroy() on uninitialized
4400 	merge file descriptor */
4401 
4402 	for (i = 0; i < n_indexes; i++) {
4403 		merge_files[i].fd = -1;
4404 	}
4405 
4406 	for (i = 0; i < n_indexes; i++) {
4407 		if (indexes[i]->type & DICT_FTS) {
4408 			ibool	opt_doc_id_size = FALSE;
4409 
4410 			/* To build FTS index, we would need to extract
4411 			doc's word, Doc ID, and word's position, so
4412 			we need to build a "fts sort index" indexing
4413 			on above three 'fields' */
4414 			fts_sort_idx = row_merge_create_fts_sort_index(
4415 				indexes[i], old_table, &opt_doc_id_size);
4416 
4417 			row_merge_dup_t*	dup
4418 				= static_cast<row_merge_dup_t*>(
4419 					ut_malloc_nokey(sizeof *dup));
4420 			dup->index = fts_sort_idx;
4421 			dup->table = table;
4422 			dup->col_map = col_map;
4423 			dup->n_dup = 0;
4424 
4425 			row_fts_psort_info_init(
4426 				trx, dup, new_table, opt_doc_id_size,
4427 				&psort_info, &merge_info);
4428 
4429 			/* We need to ensure that we free the resources
4430 			allocated */
4431 			fts_psort_initiated = true;
4432 		}
4433 	}
4434 
4435 	/* Reset the MySQL row buffer that is used when reporting
4436 	duplicate keys. */
4437 	innobase_rec_reset(table);
4438 
4439 	/* Read clustered index of the table and create files for
4440 	secondary index entries for merge sort */
4441 	error = row_merge_read_clustered_index(
4442 		trx, table, old_table, new_table, online, indexes,
4443 		fts_sort_idx, psort_info, merge_files, key_numbers,
4444 		n_indexes, add_cols, add_v, col_map, add_autoinc,
4445 		sequence, block, skip_pk_sort, &tmpfd, stage, eval_table);
4446 
4447 	stage->end_phase_read_pk();
4448 
4449 	if (error != DB_SUCCESS) {
4450 
4451 		goto func_exit;
4452 	}
4453 
4454 	DEBUG_SYNC_C("row_merge_after_scan");
4455 
4456 	/* Now we have files containing index entries ready for
4457 	sorting and inserting. */
4458 
4459 	for (i = 0; i < n_indexes; i++) {
4460 		dict_index_t*	sort_idx = indexes[i];
4461 
4462 		if (dict_index_is_spatial(sort_idx)) {
4463 			continue;
4464 		}
4465 
4466 		if (indexes[i]->type & DICT_FTS) {
4467 			os_event_t	fts_parallel_merge_event;
4468 
4469 			sort_idx = fts_sort_idx;
4470 
4471 			fts_parallel_merge_event
4472 				= merge_info[0].psort_common->merge_event;
4473 
4474 			if (FTS_PLL_MERGE) {
4475 				ulint	trial_count = 0;
4476 				bool	all_exit = false;
4477 
4478 				os_event_reset(fts_parallel_merge_event);
4479 				row_fts_start_parallel_merge(merge_info);
4480 wait_again:
4481 				os_event_wait_time_low(
4482 					fts_parallel_merge_event, 1000000,
4483 					sig_count);
4484 
4485 				for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
4486 					if (merge_info[j].child_status
4487 					    != FTS_CHILD_COMPLETE
4488 					    && merge_info[j].child_status
4489 					    != FTS_CHILD_EXITING) {
4490 						sig_count = os_event_reset(
4491 						fts_parallel_merge_event);
4492 
4493 						goto wait_again;
4494 					}
4495 				}
4496 
4497 				/* Now all children should complete, wait
4498 				a bit until they all finish using event */
4499 				while (!all_exit && trial_count < 10000) {
4500 					all_exit = true;
4501 
4502 					for (j = 0; j < FTS_NUM_AUX_INDEX;
4503 					     j++) {
4504 						if (merge_info[j].child_status
4505 						    != FTS_CHILD_EXITING) {
4506 							all_exit = false;
4507 							os_thread_sleep(1000);
4508 							break;
4509 						}
4510 					}
4511 					trial_count++;
4512 				}
4513 
4514 				if (!all_exit) {
4515 					ib::error() << "Not all child merge"
4516 						" threads exited when creating"
4517 						" FTS index '"
4518 						<< indexes[i]->name << "'";
4519 				} else {
4520 					for (j = 0; j < FTS_NUM_AUX_INDEX;
4521 					     j++) {
4522 
4523 						os_thread_join(merge_info[j]
4524 							       .thread_hdl);
4525 					}
4526 				}
4527 			} else {
4528 				/* This cannot report duplicates; an
4529 				assertion would fail in that case. */
4530 				error = row_fts_merge_insert(
4531 					sort_idx, new_table,
4532 					psort_info, 0);
4533 			}
4534 
4535 #ifdef FTS_INTERNAL_DIAG_PRINT
4536 			DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n");
4537 #endif
4538 		} else if (merge_files[i].fd >= 0) {
4539 			row_merge_dup_t	dup = {
4540 				sort_idx, table, col_map, 0};
4541 
4542 			error = row_merge_sort(
4543 				trx, &dup, &merge_files[i],
4544 				block, &tmpfd, stage);
4545 
4546 			if (error == DB_SUCCESS) {
4547 				BtrBulk	btr_bulk(sort_idx, trx->id,
4548 						 flush_observer);
4549 				btr_bulk.init();
4550 
4551 				error = row_merge_insert_index_tuples(
4552 					trx->id, sort_idx, old_table,
4553 					merge_files[i].fd, block, NULL,
4554 					&btr_bulk, stage);
4555 
4556 				error = btr_bulk.finish(error);
4557 			}
4558 		}
4559 
4560 		/* Close the temporary file to free up space. */
4561 		row_merge_file_destroy(&merge_files[i]);
4562 
4563 		if (indexes[i]->type & DICT_FTS) {
4564 			row_fts_psort_info_destroy(psort_info, merge_info);
4565 			fts_psort_initiated = false;
4566 		} else if (error != DB_SUCCESS || !online) {
4567 			/* Do not apply any online log. */
4568 		} else if (old_table != new_table) {
4569 			ut_ad(!sort_idx->online_log);
4570 			ut_ad(sort_idx->online_status
4571 			      == ONLINE_INDEX_COMPLETE);
4572 		} else {
4573 			ut_ad(need_flush_observer);
4574 
4575 			flush_observer->flush();
4576 			row_merge_write_redo(indexes[i]);
4577 
4578 			DEBUG_SYNC_C("row_log_apply_before");
4579 			error = row_log_apply(trx, sort_idx, table, stage);
4580 			DEBUG_SYNC_C("row_log_apply_after");
4581 		}
4582 
4583 		if (error != DB_SUCCESS) {
4584 			trx->error_key_num = key_numbers[i];
4585 			goto func_exit;
4586 		}
4587 
4588 		if (indexes[i]->type & DICT_FTS && fts_enable_diag_print) {
4589 			ib::info() << "Finished building full-text index "
4590 				<< indexes[i]->name;
4591 		}
4592 	}
4593 
4594 func_exit:
4595 	DBUG_EXECUTE_IF(
4596 		"ib_build_indexes_too_many_concurrent_trxs",
4597 		error = DB_TOO_MANY_CONCURRENT_TRXS;
4598 		trx->error_state = error;);
4599 
4600 	if (fts_psort_initiated) {
4601 		/* Clean up FTS psort related resource */
4602 		row_fts_psort_info_destroy(psort_info, merge_info);
4603 		fts_psort_initiated = false;
4604 	}
4605 
4606 	row_merge_file_destroy_low(tmpfd);
4607 
4608 	for (i = 0; i < n_indexes; i++) {
4609 		row_merge_file_destroy(&merge_files[i]);
4610 	}
4611 
4612 	if (fts_sort_idx) {
4613 		dict_mem_index_free(fts_sort_idx);
4614 	}
4615 
4616 	ut_free(merge_files);
4617 
4618 	alloc.deallocate_large(block, &block_pfx);
4619 
4620 	DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID);
4621 
4622 	if (online && old_table == new_table && error != DB_SUCCESS) {
4623 		/* On error, flag all online secondary index creation
4624 		as aborted. */
4625 		for (i = 0; i < n_indexes; i++) {
4626 			ut_ad(!(indexes[i]->type & DICT_FTS));
4627 			ut_ad(!indexes[i]->is_committed());
4628 			ut_ad(!dict_index_is_clust(indexes[i]));
4629 
4630 			/* Completed indexes should be dropped as
4631 			well, and indexes whose creation was aborted
4632 			should be dropped from the persistent
4633 			storage. However, at this point we can only
4634 			set some flags in the not-yet-published
4635 			indexes. These indexes will be dropped later
4636 			in row_merge_drop_indexes(), called by
4637 			rollback_inplace_alter_table(). */
4638 
4639 			switch (dict_index_get_online_status(indexes[i])) {
4640 			case ONLINE_INDEX_COMPLETE:
4641 				break;
4642 			case ONLINE_INDEX_CREATION:
4643 				rw_lock_x_lock(
4644 					dict_index_get_lock(indexes[i]));
4645 				row_log_abort_sec(indexes[i]);
4646 				indexes[i]->type |= DICT_CORRUPT;
4647 				rw_lock_x_unlock(
4648 					dict_index_get_lock(indexes[i]));
4649 				new_table->drop_aborted = TRUE;
4650 				/* fall through */
4651 			case ONLINE_INDEX_ABORTED_DROPPED:
4652 			case ONLINE_INDEX_ABORTED:
4653 				MONITOR_ATOMIC_INC(
4654 					MONITOR_BACKGROUND_DROP_INDEX);
4655 			}
4656 		}
4657 	}
4658 
4659 	DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
4660 
4661 	if (flush_observer != NULL) {
4662 		ut_ad(need_flush_observer);
4663 
4664 		DBUG_EXECUTE_IF("ib_index_build_fail_before_flush",
4665 			error = DB_FAIL;
4666 		);
4667 
4668 		if (error != DB_SUCCESS) {
4669 			flush_observer->interrupted();
4670 		}
4671 
4672 		flush_observer->flush();
4673 
4674 		UT_DELETE(flush_observer);
4675 
4676 		if (trx_is_interrupted(trx)) {
4677 			error = DB_INTERRUPTED;
4678 		}
4679 
4680 		if (error == DB_SUCCESS && old_table != new_table) {
4681 			for (const dict_index_t* index
4682 				     = dict_table_get_first_index(new_table);
4683 			     index != NULL;
4684 			     index = dict_table_get_next_index(index)) {
4685 				row_merge_write_redo(index);
4686 			}
4687 		}
4688 	}
4689 
4690 	DBUG_RETURN(error);
4691 }
4692