1 /*****************************************************************************
2 
3 Copyright (c) 2011, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0log.cc
29 Modification log for online index creation and online table rebuild
30 
31 Created 2011-05-26 Marko Makela
32 *******************************************************/
33 
34 #include "row0log.h"
35 
36 #ifdef UNIV_NONINL
37 #include "row0log.ic"
38 #endif
39 
40 #include "row0row.h"
41 #include "row0ins.h"
42 #include "row0upd.h"
43 #include "row0merge.h"
44 #include "row0ext.h"
45 #include "data0data.h"
46 #include "que0que.h"
47 #include "srv0mon.h"
48 #include "handler0alter.h"
49 #include "ut0new.h"
50 #include "ut0stage.h"
51 #include "trx0rec.h"
52 
53 #include <algorithm>
54 #include <map>
55 
56 /** Table row modification operations during online table rebuild.
57 Delete-marked records are not copied to the rebuilt table. */
58 enum row_tab_op {
59 	/** Insert a record */
60 	ROW_T_INSERT = 0x41,
61 	/** Update a record in place */
62 	ROW_T_UPDATE,
63 	/** Delete (purge) a record */
64 	ROW_T_DELETE
65 };
66 
67 /** Index record modification operations during online index creation */
68 enum row_op {
69 	/** Insert a record */
70 	ROW_OP_INSERT = 0x61,
71 	/** Delete a record */
72 	ROW_OP_DELETE
73 };
74 
75 /** Size of the modification log entry header, in bytes */
76 #define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/
77 
78 /** Log block for modifications during online ALTER TABLE */
79 struct row_log_buf_t {
80 	byte*		block;	/*!< file block buffer */
81 	ut_new_pfx_t	block_pfx; /*!< opaque descriptor of "block". Set
82 				by ut_allocator::allocate_large() and fed to
83 				ut_allocator::deallocate_large(). */
84 	mrec_buf_t	buf;	/*!< buffer for accessing a record
85 				that spans two blocks */
86 	ulint		blocks; /*!< current position in blocks */
87 	ulint		bytes;	/*!< current position within block */
88 	ulonglong	total;	/*!< logical position, in bytes from
89 				the start of the row_log_table log;
90 				0 for row_log_online_op() and
91 				row_log_apply(). */
92 };
93 
94 /** Tracks BLOB allocation during online ALTER TABLE */
95 class row_log_table_blob_t {
96 public:
97 	/** Constructor (declaring a BLOB freed)
98 	@param offset_arg row_log_t::tail::total */
99 #ifdef UNIV_DEBUG
row_log_table_blob_t(ulonglong offset_arg)100 	row_log_table_blob_t(ulonglong offset_arg) :
101 		old_offset (0), free_offset (offset_arg),
102 		offset (BLOB_FREED) {}
103 #else /* UNIV_DEBUG */
104 	row_log_table_blob_t() :
105 		offset (BLOB_FREED) {}
106 #endif /* UNIV_DEBUG */
107 
108 	/** Declare a BLOB freed again.
109 	@param offset_arg row_log_t::tail::total */
110 #ifdef UNIV_DEBUG
blob_free(ulonglong offset_arg)111 	void blob_free(ulonglong offset_arg)
112 #else /* UNIV_DEBUG */
113 	void blob_free()
114 #endif /* UNIV_DEBUG */
115 	{
116 		ut_ad(offset < offset_arg);
117 		ut_ad(offset != BLOB_FREED);
118 		ut_d(old_offset = offset);
119 		ut_d(free_offset = offset_arg);
120 		offset = BLOB_FREED;
121 	}
122 	/** Declare a freed BLOB reused.
123 	@param offset_arg row_log_t::tail::total */
blob_alloc(ulonglong offset_arg)124 	void blob_alloc(ulonglong offset_arg) {
125 		ut_ad(free_offset <= offset_arg);
126 		ut_d(old_offset = offset);
127 		offset = offset_arg;
128 	}
129 	/** Determine if a BLOB was freed at a given log position
130 	@param offset_arg row_log_t::head::total after the log record
131 	@return true if freed */
is_freed(ulonglong offset_arg) const132 	bool is_freed(ulonglong offset_arg) const {
133 		/* This is supposed to be the offset at the end of the
134 		current log record. */
135 		ut_ad(offset_arg > 0);
136 		/* We should never get anywhere close the magic value. */
137 		ut_ad(offset_arg < BLOB_FREED);
138 		return(offset_arg < offset);
139 	}
140 private:
141 	/** Magic value for a freed BLOB */
142 	static const ulonglong BLOB_FREED = ~0ULL;
143 #ifdef UNIV_DEBUG
144 	/** Old offset, in case a page was freed, reused, freed, ... */
145 	ulonglong	old_offset;
146 	/** Offset of last blob_free() */
147 	ulonglong	free_offset;
148 #endif /* UNIV_DEBUG */
149 	/** Byte offset to the log file */
150 	ulonglong	offset;
151 };
152 
153 /** @brief Map of off-page column page numbers to 0 or log byte offsets.
154 
155 If there is no mapping for a page number, it is safe to access.
156 If a page number maps to 0, it is an off-page column that has been freed.
157 If a page number maps to a nonzero number, the number is a byte offset
158 into the index->online_log, indicating that the page is safe to access
159 when applying log records starting from that offset. */
160 typedef std::map<
161 	ulint,
162 	row_log_table_blob_t,
163 	std::less<ulint>,
164 	ut_allocator<std::pair<const ulint, row_log_table_blob_t> > >
165 	page_no_map;
166 
167 /** @brief Buffer for logging modifications during online index creation
168 
169 All modifications to an index that is being created will be logged by
170 row_log_online_op() to this buffer.
171 
172 All modifications to a table that is being rebuilt will be logged by
173 row_log_table_delete(), row_log_table_update(), row_log_table_insert()
174 to this buffer.
175 
176 When head.blocks == tail.blocks, the reader will access tail.block
177 directly. When also head.bytes == tail.bytes, both counts will be
178 reset to 0 and the file will be truncated. */
179 struct row_log_t {
180 	int		fd;	/*!< file descriptor */
181 	ib_mutex_t	mutex;	/*!< mutex protecting error,
182 				max_trx and tail */
183 	page_no_map*	blobs;	/*!< map of page numbers of off-page columns
184 				that have been freed during table-rebuilding
185 				ALTER TABLE (row_log_table_*); protected by
186 				index->lock X-latch only */
187 	dict_table_t*	table;	/*!< table that is being rebuilt,
188 				or NULL when this is a secondary
189 				index that is being created online */
190 	bool		same_pk;/*!< whether the definition of the PRIMARY KEY
191 				has remained the same */
192 	const dtuple_t*	add_cols;
193 				/*!< default values of added columns, or NULL */
194 	const ulint*	col_map;/*!< mapping of old column numbers to
195 				new ones, or NULL if !table */
196 	dberr_t		error;	/*!< error that occurred during online
197 				table rebuild */
198 	trx_id_t	max_trx;/*!< biggest observed trx_id in
199 				row_log_online_op();
200 				protected by mutex and index->lock S-latch,
201 				or by index->lock X-latch only */
202 	row_log_buf_t	tail;	/*!< writer context;
203 				protected by mutex and index->lock S-latch,
204 				or by index->lock X-latch only */
205 	row_log_buf_t	head;	/*!< reader context; protected by MDL only;
206 				modifiable by row_log_apply_ops() */
207 	ulint		n_old_col;
208 				/*!< number of non-virtual column in
209 				old table */
210 	ulint		n_old_vcol;
211 				/*!< number of virtual column in old table */
212 	const char*	path;	/*!< where to create temporary file during
213 				log operation */
214 };
215 
216 
217 /** Create the file or online log if it does not exist.
218 @param[in,out] log     online rebuild log
219 @return true if success, false if not */
220 static MY_ATTRIBUTE((warn_unused_result))
221 int
row_log_tmpfile(row_log_t * log)222 row_log_tmpfile(
223 	row_log_t*	log)
224 {
225 	DBUG_ENTER("row_log_tmpfile");
226 	if (log->fd < 0) {
227 		log->fd = row_merge_file_create_low(log->path);
228 		DBUG_EXECUTE_IF("row_log_tmpfile_fail",
229 				if (log->fd > 0)
230 					row_merge_file_destroy_low(log->fd);
231 				log->fd = -1;);
232 		if (log->fd >= 0) {
233 			MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_LOG_FILES);
234 		}
235 	}
236 
237 	DBUG_RETURN(log->fd);
238 }
239 
240 /** Allocate the memory for the log buffer.
241 @param[in,out]	log_buf	Buffer used for log operation
242 @return TRUE if success, false if not */
243 static MY_ATTRIBUTE((warn_unused_result))
244 bool
row_log_block_allocate(row_log_buf_t & log_buf)245 row_log_block_allocate(
246 	row_log_buf_t&	log_buf)
247 {
248 	DBUG_ENTER("row_log_block_allocate");
249 	if (log_buf.block == NULL) {
250 		DBUG_EXECUTE_IF(
251 			"simulate_row_log_allocation_failure",
252 			DBUG_RETURN(false);
253 		);
254 
255 		log_buf.block = ut_allocator<byte>(mem_key_row_log_buf)
256 			.allocate_large(srv_sort_buf_size, &log_buf.block_pfx);
257 
258 		if (log_buf.block == NULL) {
259 			DBUG_RETURN(false);
260 		}
261 	}
262 	DBUG_RETURN(true);
263 }
264 
265 /** Free the log buffer.
266 @param[in,out]	log_buf	Buffer used for log operation */
267 static
268 void
row_log_block_free(row_log_buf_t & log_buf)269 row_log_block_free(
270 	row_log_buf_t&	log_buf)
271 {
272 	DBUG_ENTER("row_log_block_free");
273 	if (log_buf.block != NULL) {
274 		ut_allocator<byte>(mem_key_row_log_buf).deallocate_large(
275 			log_buf.block, &log_buf.block_pfx);
276 		log_buf.block = NULL;
277 	}
278 	DBUG_VOID_RETURN;
279 }
280 
281 /******************************************************//**
282 Logs an operation to a secondary index that is (or was) being created. */
283 void
row_log_online_op(dict_index_t * index,const dtuple_t * tuple,trx_id_t trx_id)284 row_log_online_op(
285 /*==============*/
286 	dict_index_t*	index,	/*!< in/out: index, S or X latched */
287 	const dtuple_t* tuple,	/*!< in: index tuple */
288 	trx_id_t	trx_id)	/*!< in: transaction ID for insert,
289 				or 0 for delete */
290 {
291 	byte*		b;
292 	ulint		extra_size;
293 	ulint		size;
294 	ulint		mrec_size;
295 	ulint		avail_size;
296 	row_log_t*	log;
297 
298 	ut_ad(dtuple_validate(tuple));
299 	ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index));
300 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_S)
301 	      || rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
302 
303 	if (dict_index_is_corrupted(index)) {
304 		return;
305 	}
306 
307 	ut_ad(dict_index_is_online_ddl(index));
308 
309 	/* Compute the size of the record. This differs from
310 	row_merge_buf_encode(), because here we do not encode
311 	extra_size+1 (and reserve 0 as the end-of-chunk marker). */
312 
313 	size = rec_get_converted_size_temp(
314 		index, tuple->fields, tuple->n_fields, NULL, &extra_size);
315 	ut_ad(size >= extra_size);
316 	ut_ad(size <= sizeof log->tail.buf);
317 
318 	mrec_size = ROW_LOG_HEADER_SIZE
319 		+ (extra_size >= 0x80) + size
320 		+ (trx_id ? DATA_TRX_ID_LEN : 0);
321 
322 	log = index->online_log;
323 	mutex_enter(&log->mutex);
324 
325 	if (trx_id > log->max_trx) {
326 		log->max_trx = trx_id;
327 	}
328 
329 	if (!row_log_block_allocate(log->tail)) {
330 		log->error = DB_OUT_OF_MEMORY;
331 		goto err_exit;
332 	}
333 
334 	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
335 
336 	ut_ad(log->tail.bytes < srv_sort_buf_size);
337 	avail_size = srv_sort_buf_size - log->tail.bytes;
338 
339 	if (mrec_size > avail_size) {
340 		b = log->tail.buf;
341 	} else {
342 		b = log->tail.block + log->tail.bytes;
343 	}
344 
345 	if (trx_id != 0) {
346 		*b++ = ROW_OP_INSERT;
347 		trx_write_trx_id(b, trx_id);
348 		b += DATA_TRX_ID_LEN;
349 	} else {
350 		*b++ = ROW_OP_DELETE;
351 	}
352 
353 	if (extra_size < 0x80) {
354 		*b++ = (byte) extra_size;
355 	} else {
356 		ut_ad(extra_size < 0x8000);
357 		*b++ = (byte) (0x80 | (extra_size >> 8));
358 		*b++ = (byte) extra_size;
359 	}
360 
361 	rec_convert_dtuple_to_temp(
362 		b + extra_size, index, tuple->fields, tuple->n_fields, NULL);
363 	b += size;
364 
365 	if (mrec_size >= avail_size) {
366 		dberr_t			err;
367 		IORequest		request(IORequest::ROW_LOG | IORequest::WRITE);
368 		const os_offset_t	byte_offset
369 			= (os_offset_t) log->tail.blocks
370 			* srv_sort_buf_size;
371 
372 		if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
373 			goto write_failed;
374 		}
375 
376 		if (mrec_size == avail_size) {
377 			ut_ad(b == &log->tail.block[srv_sort_buf_size]);
378 		} else {
379 			ut_ad(b == log->tail.buf + mrec_size);
380 			memcpy(log->tail.block + log->tail.bytes,
381 			       log->tail.buf, avail_size);
382 		}
383 
384 		UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
385 
386 		if (row_log_tmpfile(log) < 0) {
387 			log->error = DB_OUT_OF_MEMORY;
388 			goto err_exit;
389 		}
390 
391 		err = os_file_write_int_fd(
392 			request,
393 			"(modification log)",
394 			log->fd,
395 			log->tail.block, byte_offset, srv_sort_buf_size);
396 
397 		log->tail.blocks++;
398 		if (err != DB_SUCCESS) {
399 write_failed:
400 			/* We set the flag directly instead of invoking
401 			dict_set_corrupted_index_cache_only(index) here,
402 			because the index is not "public" yet. */
403 			index->type |= DICT_CORRUPT;
404 		}
405 		UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
406 		memcpy(log->tail.block, log->tail.buf + avail_size,
407 		       mrec_size - avail_size);
408 		log->tail.bytes = mrec_size - avail_size;
409 	} else {
410 		log->tail.bytes += mrec_size;
411 		ut_ad(b == log->tail.block + log->tail.bytes);
412 	}
413 
414 	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
415 err_exit:
416 	mutex_exit(&log->mutex);
417 }
418 
419 /******************************************************//**
420 Gets the error status of the online index rebuild log.
421 @return DB_SUCCESS or error code */
422 dberr_t
row_log_table_get_error(const dict_index_t * index)423 row_log_table_get_error(
424 /*====================*/
425 	const dict_index_t*	index)	/*!< in: clustered index of a table
426 					that is being rebuilt online */
427 {
428 	ut_ad(dict_index_is_clust(index));
429 	ut_ad(dict_index_is_online_ddl(index));
430 	return(index->online_log->error);
431 }
432 
433 /******************************************************//**
434 Starts logging an operation to a table that is being rebuilt.
435 @return pointer to log, or NULL if no logging is necessary */
436 static MY_ATTRIBUTE((nonnull, warn_unused_result))
437 byte*
row_log_table_open(row_log_t * log,ulint size,ulint * avail)438 row_log_table_open(
439 /*===============*/
440 	row_log_t*	log,	/*!< in/out: online rebuild log */
441 	ulint		size,	/*!< in: size of log record */
442 	ulint*		avail)	/*!< out: available size for log record */
443 {
444 	mutex_enter(&log->mutex);
445 
446 	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
447 
448 	if (log->error != DB_SUCCESS) {
449 err_exit:
450 		mutex_exit(&log->mutex);
451 		return(NULL);
452 	}
453 
454 	if (!row_log_block_allocate(log->tail)) {
455 		log->error = DB_OUT_OF_MEMORY;
456 		goto err_exit;
457 	}
458 
459 	ut_ad(log->tail.bytes < srv_sort_buf_size);
460 	*avail = srv_sort_buf_size - log->tail.bytes;
461 
462 	if (size > *avail) {
463 		return(log->tail.buf);
464 	} else {
465 		return(log->tail.block + log->tail.bytes);
466 	}
467 }
468 
469 /******************************************************//**
470 Stops logging an operation to a table that is being rebuilt. */
471 static MY_ATTRIBUTE((nonnull))
472 void
row_log_table_close_func(row_log_t * log,const byte * b,ulint size,ulint avail)473 row_log_table_close_func(
474 /*=====================*/
475 	row_log_t*	log,	/*!< in/out: online rebuild log */
476 #ifdef UNIV_DEBUG
477 	const byte*	b,	/*!< in: end of log record */
478 #endif /* UNIV_DEBUG */
479 	ulint		size,	/*!< in: size of log record */
480 	ulint		avail)	/*!< in: available size for log record */
481 {
482 	ut_ad(mutex_own(&log->mutex));
483 
484 	if (size >= avail) {
485 		dberr_t			err;
486 		IORequest		request(IORequest::ROW_LOG | IORequest::WRITE);
487 
488 		const os_offset_t	byte_offset
489 			= (os_offset_t) log->tail.blocks
490 			* srv_sort_buf_size;
491 
492 		if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
493 			goto write_failed;
494 		}
495 
496 		if (size == avail) {
497 			ut_ad(b == &log->tail.block[srv_sort_buf_size]);
498 		} else {
499 			ut_ad(b == log->tail.buf + size);
500 			memcpy(log->tail.block + log->tail.bytes,
501 			       log->tail.buf, avail);
502 		}
503 
504 		UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
505 
506 		if (row_log_tmpfile(log) < 0) {
507 			log->error = DB_OUT_OF_MEMORY;
508 			goto err_exit;
509 		}
510 
511 		err = os_file_write_int_fd(
512 			request,
513 			"(modification log)",
514 			log->fd,
515 			log->tail.block, byte_offset, srv_sort_buf_size);
516 
517 		log->tail.blocks++;
518 		if (err != DB_SUCCESS) {
519 write_failed:
520 			log->error = DB_ONLINE_LOG_TOO_BIG;
521 		}
522 		UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
523 		memcpy(log->tail.block, log->tail.buf + avail, size - avail);
524 		log->tail.bytes = size - avail;
525 	} else {
526 		log->tail.bytes += size;
527 		ut_ad(b == log->tail.block + log->tail.bytes);
528 	}
529 
530 	log->tail.total += size;
531 	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
532 err_exit:
533 	mutex_exit(&log->mutex);
534 }
535 
536 #ifdef UNIV_DEBUG
537 # define row_log_table_close(log, b, size, avail)	\
538 	row_log_table_close_func(log, b, size, avail)
539 #else /* UNIV_DEBUG */
540 # define row_log_table_close(log, b, size, avail)	\
541 	row_log_table_close_func(log, size, avail)
542 #endif /* UNIV_DEBUG */
543 
544 /** Check whether a virtual column is indexed in the new table being
545 created during alter table
546 @param[in]	index	cluster index
547 @param[in]	v_no	virtual column number
548 @return true if it is indexed, else false */
549 bool
row_log_col_is_indexed(const dict_index_t * index,ulint v_no)550 row_log_col_is_indexed(
551 	const dict_index_t*	index,
552 	ulint			v_no)
553 {
554 	return(dict_table_get_nth_v_col(
555 		index->online_log->table, v_no)->m_col.ord_part);
556 }
557 
558 /******************************************************//**
559 Logs a delete operation to a table that is being rebuilt.
560 This will be merged in row_log_table_apply_delete(). */
561 void
row_log_table_delete(const rec_t * rec,const dtuple_t * ventry,dict_index_t * index,const ulint * offsets,const byte * sys)562 row_log_table_delete(
563 /*=================*/
564 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
565 				page X-latched */
566 	const dtuple_t*	ventry,	/*!< in: dtuple holding virtual column info */
567 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
568 				or X-latched */
569 	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
570 	const byte*	sys)	/*!< in: DB_TRX_ID,DB_ROLL_PTR that should
571 				be logged, or NULL to use those in rec */
572 {
573 	ulint		old_pk_extra_size;
574 	ulint		old_pk_size;
575 	ulint		ext_size = 0;
576 	ulint		mrec_size;
577 	ulint		avail_size;
578 	mem_heap_t*	heap		= NULL;
579 	const dtuple_t*	old_pk;
580 	row_ext_t*	ext;
581 
582 	ut_ad(dict_index_is_clust(index));
583 	ut_ad(rec_offs_validate(rec, index, offsets));
584 	ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
585 	ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
586 	ut_ad(rw_lock_own_flagged(
587 			&index->lock,
588 			RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
589 
590 	if (dict_index_is_corrupted(index)
591 	    || !dict_index_is_online_ddl(index)
592 	    || index->online_log->error != DB_SUCCESS) {
593 		return;
594 	}
595 
596 	dict_table_t* new_table = index->online_log->table;
597 	dict_index_t* new_index = dict_table_get_first_index(new_table);
598 
599 	ut_ad(dict_index_is_clust(new_index));
600 	ut_ad(!dict_index_is_online_ddl(new_index));
601 
602 	/* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */
603 	if (index->online_log->same_pk) {
604 		dtuple_t*	tuple;
605 		ut_ad(new_index->n_uniq == index->n_uniq);
606 
607 		/* The PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR are in the first
608 		fields of the record. */
609 		heap = mem_heap_create(
610 			DATA_TRX_ID_LEN
611 			+ DTUPLE_EST_ALLOC(new_index->n_uniq + 2));
612 		old_pk = tuple = dtuple_create(heap, new_index->n_uniq + 2);
613 		dict_index_copy_types(tuple, new_index, tuple->n_fields);
614 		dtuple_set_n_fields_cmp(tuple, new_index->n_uniq);
615 
616 		for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
617 			ulint		len;
618 			const void*	field	= rec_get_nth_field(
619 				rec, offsets, i, &len);
620 			dfield_t*	dfield	= dtuple_get_nth_field(
621 				tuple, i);
622 			ut_ad(len != UNIV_SQL_NULL);
623 			ut_ad(!rec_offs_nth_extern(offsets, i));
624 			dfield_set_data(dfield, field, len);
625 		}
626 
627 		if (sys) {
628 			dfield_set_data(
629 				dtuple_get_nth_field(tuple,
630 						     new_index->n_uniq),
631 				sys, DATA_TRX_ID_LEN);
632 			dfield_set_data(
633 				dtuple_get_nth_field(tuple,
634 						     new_index->n_uniq + 1),
635 				sys + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
636 		}
637 	} else {
638 		/* The PRIMARY KEY has changed. Translate the tuple. */
639 		old_pk = row_log_table_get_pk(
640 			rec, index, offsets, NULL, &heap);
641 
642 		if (!old_pk) {
643 			ut_ad(index->online_log->error != DB_SUCCESS);
644 			if (heap) {
645 				goto func_exit;
646 			}
647 			return;
648 		}
649 	}
650 
651 	ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
652 		      old_pk, old_pk->n_fields - 2)->len);
653 	ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
654 		      old_pk, old_pk->n_fields - 1)->len);
655 	old_pk_size = rec_get_converted_size_temp(
656 		new_index, old_pk->fields, old_pk->n_fields, NULL,
657 		&old_pk_extra_size);
658 	ut_ad(old_pk_extra_size < 0x100);
659 
660 	mrec_size = 6 + old_pk_size;
661 
662 	/* Log enough prefix of the BLOB unless both the
663 	old and new table are in COMPACT or REDUNDANT format,
664 	which store the prefix in the clustered index record. */
665 	if (rec_offs_any_extern(offsets)
666 	    && (dict_table_get_format(index->table) >= UNIV_FORMAT_B
667 		|| dict_table_get_format(new_table) >= UNIV_FORMAT_B)) {
668 
669 		/* Build a cache of those off-page column prefixes
670 		that are referenced by secondary indexes. It can be
671 		that none of the off-page columns are needed. */
672 		row_build(ROW_COPY_DATA, index, rec,
673 			  offsets, NULL, NULL, NULL, &ext, heap);
674 		if (ext) {
675 			/* Log the row_ext_t, ext->ext and ext->buf */
676 			ext_size = ext->n_ext * ext->max_len
677 				+ sizeof(*ext)
678 				+ ext->n_ext * sizeof(ulint)
679 				+ (ext->n_ext - 1) * sizeof ext->len;
680 			mrec_size += ext_size;
681 		}
682 	}
683 
684 	/* Check if we need to log virtual column data */
685 	if (ventry->n_v_fields > 0) {
686 		ulint	v_extra;
687 		mrec_size += rec_get_converted_size_temp(
688                         new_index, NULL, 0, ventry, &v_extra);
689         }
690 
691 	if (byte* b = row_log_table_open(index->online_log,
692 					 mrec_size, &avail_size)) {
693 		*b++ = ROW_T_DELETE;
694 		*b++ = static_cast<byte>(old_pk_extra_size);
695 
696 		/* Log the size of external prefix we saved */
697 		mach_write_to_4(b, ext_size);
698 		b += 4;
699 
700 		rec_convert_dtuple_to_temp(
701 			b + old_pk_extra_size, new_index,
702 			old_pk->fields, old_pk->n_fields, NULL);
703 
704 		b += old_pk_size;
705 
706 		if (ext_size) {
707 			ulint	cur_ext_size = sizeof(*ext)
708 				+ (ext->n_ext - 1) * sizeof ext->len;
709 
710 			memcpy(b, ext, cur_ext_size);
711 			b += cur_ext_size;
712 
713 			/* Check if we need to col_map to adjust the column
714 			number. If columns were added/removed/reordered,
715 			adjust the column number. */
716 			if (const ulint* col_map =
717 				index->online_log->col_map) {
718 				for (ulint i = 0; i < ext->n_ext; i++) {
719 					const_cast<ulint&>(ext->ext[i]) =
720 						col_map[ext->ext[i]];
721 				}
722 			}
723 
724 			memcpy(b, ext->ext, ext->n_ext * sizeof(*ext->ext));
725 			b += ext->n_ext * sizeof(*ext->ext);
726 
727 			ext_size -= cur_ext_size
728 				 + ext->n_ext * sizeof(*ext->ext);
729 			memcpy(b, ext->buf, ext_size);
730 			b += ext_size;
731 		}
732 
733 		/* log virtual columns */
734 		if (ventry->n_v_fields > 0) {
735                         rec_convert_dtuple_to_temp(
736                                 b, new_index, NULL, 0, ventry);
737                         b += mach_read_from_2(b);
738                 }
739 
740 		row_log_table_close(
741 			index->online_log, b, mrec_size, avail_size);
742 	}
743 
744 func_exit:
745 	mem_heap_free(heap);
746 }
747 
748 /******************************************************//**
749 Logs an insert or update to a table that is being rebuilt. */
750 static
751 void
row_log_table_low_redundant(const rec_t * rec,const dtuple_t * ventry,const dtuple_t * o_ventry,dict_index_t * index,bool insert,const dtuple_t * old_pk,const dict_index_t * new_index)752 row_log_table_low_redundant(
753 /*========================*/
754 	const rec_t*		rec,	/*!< in: clustered index leaf
755 					page record in ROW_FORMAT=REDUNDANT,
756 					page X-latched */
757 	const dtuple_t*		ventry,	/*!< in: dtuple holding virtual
758 					column info or NULL */
759 	const dtuple_t*		o_ventry,/*!< in: old dtuple holding virtual
760 					column info or NULL */
761 	dict_index_t*		index,	/*!< in/out: clustered index, S-latched
762 					or X-latched */
763 	bool			insert,	/*!< in: true if insert,
764 					false if update */
765 	const dtuple_t*		old_pk,	/*!< in: old PRIMARY KEY value
766 					(if !insert and a PRIMARY KEY
767 					is being created) */
768 	const dict_index_t*	new_index)
769 					/*!< in: clustered index of the
770 					new table, not latched */
771 {
772 	ulint		old_pk_size;
773 	ulint		old_pk_extra_size;
774 	ulint		size;
775 	ulint		extra_size;
776 	ulint		mrec_size;
777 	ulint		avail_size;
778 	mem_heap_t*	heap		= NULL;
779 	dtuple_t*	tuple;
780 	ulint		num_v = ventry ? dtuple_get_n_v_fields(ventry) : 0;
781 
782 	ut_ad(!page_is_comp(page_align(rec)));
783 	ut_ad(dict_index_get_n_fields(index) == rec_get_n_fields_old(rec));
784 	ut_ad(dict_tf2_is_valid(index->table->flags, index->table->flags2));
785 	ut_ad(!dict_table_is_comp(index->table));  /* redundant row format */
786 	ut_ad(dict_index_is_clust(new_index));
787 
788 	heap = mem_heap_create(DTUPLE_EST_ALLOC(index->n_fields));
789 	tuple = dtuple_create_with_vcol(heap, index->n_fields, num_v);
790 	dict_index_copy_types(tuple, index, index->n_fields);
791 
792 	if (num_v) {
793 		dict_table_copy_v_types(tuple, index->table);
794 	}
795 
796 	dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index));
797 
798 	if (rec_get_1byte_offs_flag(rec)) {
799 		for (ulint i = 0; i < index->n_fields; i++) {
800 			dfield_t*	dfield;
801 			ulint		len;
802 			const void*	field;
803 
804 			dfield = dtuple_get_nth_field(tuple, i);
805 			field = rec_get_nth_field_old(rec, i, &len);
806 
807 			dfield_set_data(dfield, field, len);
808 		}
809 	} else {
810 		for (ulint i = 0; i < index->n_fields; i++) {
811 			dfield_t*	dfield;
812 			ulint		len;
813 			const void*	field;
814 
815 			dfield = dtuple_get_nth_field(tuple, i);
816 			field = rec_get_nth_field_old(rec, i, &len);
817 
818 			dfield_set_data(dfield, field, len);
819 
820 			if (rec_2_is_field_extern(rec, i)) {
821 				dfield_set_ext(dfield);
822 			}
823 		}
824 	}
825 
826 	size = rec_get_converted_size_temp(
827 		index, tuple->fields, tuple->n_fields, ventry, &extra_size);
828 
829 	mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80);
830 
831 	if (num_v) {
832 		if (o_ventry) {
833 			ulint	v_extra = 0;
834 			mrec_size += rec_get_converted_size_temp(
835 				index, NULL, 0, o_ventry, &v_extra);
836 		}
837 	} else if (index->table->n_v_cols) {
838 		mrec_size += 2;
839 	}
840 
841 	if (insert || index->online_log->same_pk) {
842 		ut_ad(!old_pk);
843 		old_pk_extra_size = old_pk_size = 0;
844 	} else {
845 		ut_ad(old_pk);
846 		ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
847 		ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
848 			      old_pk, old_pk->n_fields - 2)->len);
849 		ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
850 			      old_pk, old_pk->n_fields - 1)->len);
851 
852 		old_pk_size = rec_get_converted_size_temp(
853 			new_index, old_pk->fields, old_pk->n_fields,
854 			NULL, &old_pk_extra_size);
855 		ut_ad(old_pk_extra_size < 0x100);
856 		mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
857 	}
858 
859 	if (byte* b = row_log_table_open(index->online_log,
860 					 mrec_size, &avail_size)) {
861 		*b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
862 
863 		if (old_pk_size) {
864 			*b++ = static_cast<byte>(old_pk_extra_size);
865 
866 			rec_convert_dtuple_to_temp(
867 				b + old_pk_extra_size, new_index,
868 				old_pk->fields, old_pk->n_fields,
869 				ventry);
870 			b += old_pk_size;
871 		}
872 
873 		if (extra_size < 0x80) {
874 			*b++ = static_cast<byte>(extra_size);
875 		} else {
876 			ut_ad(extra_size < 0x8000);
877 			*b++ = static_cast<byte>(0x80 | (extra_size >> 8));
878 			*b++ = static_cast<byte>(extra_size);
879 		}
880 
881 		rec_convert_dtuple_to_temp(
882 			b + extra_size, index, tuple->fields, tuple->n_fields,
883 			ventry);
884 		b += size;
885 
886 		if (num_v) {
887 			if (o_ventry) {
888 				rec_convert_dtuple_to_temp(
889 					b, new_index, NULL, 0, o_ventry);
890 				b += mach_read_from_2(b);
891 			}
892 		} else if (index->table->n_v_cols) {
893 			/* The table contains virtual columns, but nothing
894 			has changed for them, so just mark a 2 bytes length
895 			field */
896 			mach_write_to_2(b, 2);
897 			b += 2;
898 		}
899 
900 		row_log_table_close(
901 			index->online_log, b, mrec_size, avail_size);
902 	}
903 
904 	mem_heap_free(heap);
905 }
906 
907 /******************************************************//**
908 Logs an insert or update to a table that is being rebuilt. */
909 static
910 void
row_log_table_low(const rec_t * rec,const dtuple_t * ventry,const dtuple_t * o_ventry,dict_index_t * index,const ulint * offsets,bool insert,const dtuple_t * old_pk)911 row_log_table_low(
912 /*==============*/
913 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
914 				page X-latched */
915 	const dtuple_t*	ventry,	/*!< in: dtuple holding virtual column info */
916 	const dtuple_t*	o_ventry,/*!< in: dtuple holding old virtual column
917 				info */
918 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
919 				or X-latched */
920 	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
921 	bool		insert,	/*!< in: true if insert, false if update */
922 	const dtuple_t*	old_pk)	/*!< in: old PRIMARY KEY value (if !insert
923 				and a PRIMARY KEY is being created) */
924 {
925 	ulint			omit_size;
926 	ulint			old_pk_size;
927 	ulint			old_pk_extra_size;
928 	ulint			extra_size;
929 	ulint			mrec_size;
930 	ulint			avail_size;
931 	const dict_index_t*	new_index;
932 
933 	new_index = dict_table_get_first_index(index->online_log->table);
934 
935 	ut_ad(dict_index_is_clust(index));
936 	ut_ad(dict_index_is_clust(new_index));
937 	ut_ad(!dict_index_is_online_ddl(new_index));
938 	ut_ad(rec_offs_validate(rec, index, offsets));
939 	ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
940 	ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
941 	ut_ad(rw_lock_own_flagged(
942 			&index->lock,
943 			RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
944 	ut_ad(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX);
945 	ut_ad(page_is_leaf(page_align(rec)));
946 	ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets));
947 	/* old_pk=row_log_table_get_pk() [not needed in INSERT] is a prefix
948 	of the clustered index record (PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR),
949 	with no information on virtual columns */
950 	ut_ad(!old_pk || !insert);
951 	ut_ad(!old_pk || old_pk->n_v_fields == 0);
952 	ut_ad(!o_ventry || !insert);
953 	ut_ad(!o_ventry || ventry);
954 
955 	if (dict_index_is_corrupted(index)
956 	    || !dict_index_is_online_ddl(index)
957 	    || index->online_log->error != DB_SUCCESS) {
958 		return;
959 	}
960 
961 	if (!rec_offs_comp(offsets)) {
962 		row_log_table_low_redundant(
963 			rec, ventry, o_ventry, index, insert,
964 			old_pk, new_index);
965 		return;
966 	}
967 
968 	ut_ad(page_is_comp(page_align(rec)));
969 	ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
970 
971 	omit_size = REC_N_NEW_EXTRA_BYTES;
972 
973 	extra_size = rec_offs_extra_size(offsets) - omit_size;
974 
975 	mrec_size = ROW_LOG_HEADER_SIZE
976 		+ (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size;
977 
978 	if (ventry && ventry->n_v_fields > 0) {
979 		ulint		v_extra = 0;
980 		uint64_t	rec_size = rec_get_converted_size_temp(
981 			new_index, NULL, 0, ventry, &v_extra);
982 
983 		mrec_size += rec_size;
984 
985 		/* If there is actually nothing to be logged for new entry,
986 		then there must be also nothing to do with old entry.
987 		In this case, make it same with the case below, by only keep
988 		2 bytes length marker */
989 		if (rec_size > 2 && o_ventry != NULL) {
990 			mrec_size += rec_get_converted_size_temp(
991 				new_index, NULL, 0, o_ventry, &v_extra);
992 		}
993 	} else if (index->table->n_v_cols) {
994 		/* Always leave 2 bytes length marker for virtual column
995 		data logging even if there is none of them is indexed if table
996 		has virtual columns */
997 		mrec_size += 2;
998 	}
999 
1000 	if (insert || index->online_log->same_pk) {
1001 		ut_ad(!old_pk);
1002 		old_pk_extra_size = old_pk_size = 0;
1003 	} else {
1004 		ut_ad(old_pk);
1005 		ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
1006 		ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
1007 			      old_pk, old_pk->n_fields - 2)->len);
1008 		ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
1009 			      old_pk, old_pk->n_fields - 1)->len);
1010 
1011 		old_pk_size = rec_get_converted_size_temp(
1012 			new_index, old_pk->fields, old_pk->n_fields,
1013 			NULL, &old_pk_extra_size);
1014 		ut_ad(old_pk_extra_size < 0x100);
1015 		mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
1016 	}
1017 
1018 	if (byte* b = row_log_table_open(index->online_log,
1019 					 mrec_size, &avail_size)) {
1020 		*b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
1021 
1022 		if (old_pk_size) {
1023 			*b++ = static_cast<byte>(old_pk_extra_size);
1024 
1025 			rec_convert_dtuple_to_temp(
1026 				b + old_pk_extra_size, new_index,
1027 				old_pk->fields, old_pk->n_fields,
1028 				NULL);
1029 			b += old_pk_size;
1030 		}
1031 
1032 		if (extra_size < 0x80) {
1033 			*b++ = static_cast<byte>(extra_size);
1034 		} else {
1035 			ut_ad(extra_size < 0x8000);
1036 			*b++ = static_cast<byte>(0x80 | (extra_size >> 8));
1037 			*b++ = static_cast<byte>(extra_size);
1038 		}
1039 
1040 		memcpy(b, rec - rec_offs_extra_size(offsets), extra_size);
1041 		b += extra_size;
1042 		memcpy(b, rec, rec_offs_data_size(offsets));
1043 		b += rec_offs_data_size(offsets);
1044 
1045 		if (ventry && ventry->n_v_fields > 0) {
1046 			uint64_t	new_v_size;
1047 
1048 			rec_convert_dtuple_to_temp(
1049 				b, new_index, NULL, 0, ventry);
1050 			new_v_size = mach_read_from_2(b);
1051 			b += new_v_size;
1052 
1053 			/* Nothing for new entry to be logged,
1054 			skip the old one too. */
1055 			if (new_v_size != 2 && o_ventry != NULL) {
1056 				rec_convert_dtuple_to_temp(
1057 					b, new_index, NULL, 0, o_ventry);
1058 				b += mach_read_from_2(b);
1059 			}
1060 		} else if (index->table->n_v_cols) {
1061 			/* The table contains virtual columns, but nothing
1062 			has changed for them, so just mark a 2 bytes length
1063 			field */
1064 			mach_write_to_2(b, 2);
1065 			b += 2;
1066 		}
1067 
1068 		row_log_table_close(
1069 			index->online_log, b, mrec_size, avail_size);
1070 	}
1071 }
1072 
1073 /******************************************************//**
1074 Logs an update to a table that is being rebuilt.
1075 This will be merged in row_log_table_apply_update(). */
1076 void
row_log_table_update(const rec_t * rec,dict_index_t * index,const ulint * offsets,const dtuple_t * old_pk,const dtuple_t * new_v_row,const dtuple_t * old_v_row)1077 row_log_table_update(
1078 /*=================*/
1079 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
1080 				page X-latched */
1081 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
1082 				or X-latched */
1083 	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
1084 	const dtuple_t*	old_pk,	/*!< in: row_log_table_get_pk()
1085 				before the update */
1086 	const dtuple_t*	new_v_row,/*!< in: dtuple contains the new virtual
1087 				columns */
1088 	const dtuple_t*	old_v_row)/*!< in: dtuple contains the old virtual
1089 				columns */
1090 {
1091 	row_log_table_low(rec, new_v_row, old_v_row, index, offsets,
1092 			  false, old_pk);
1093 }
1094 
1095 /** Gets the old table column of a PRIMARY KEY column.
1096 @param table old table (before ALTER TABLE)
1097 @param col_map mapping of old column numbers to new ones
1098 @param col_no column position in the new table
1099 @return old table column, or NULL if this is an added column */
1100 static
1101 const dict_col_t*
row_log_table_get_pk_old_col(const dict_table_t * table,const ulint * col_map,ulint col_no)1102 row_log_table_get_pk_old_col(
1103 /*=========================*/
1104 	const dict_table_t*	table,
1105 	const ulint*		col_map,
1106 	ulint			col_no)
1107 {
1108 	for (ulint i = 0; i < table->n_cols; i++) {
1109 		if (col_no == col_map[i]) {
1110 			return(dict_table_get_nth_col(table, i));
1111 		}
1112 	}
1113 
1114 	return(NULL);
1115 }
1116 
1117 /** Maps an old table column of a PRIMARY KEY column.
1118 @param[in]	col		old table column (before ALTER TABLE)
1119 @param[in]	ifield		clustered index field in the new table (after
1120 ALTER TABLE)
1121 @param[in,out]	dfield		clustered index tuple field in the new table
1122 @param[in,out]	heap		memory heap for allocating dfield contents
1123 @param[in]	rec		clustered index leaf page record in the old
1124 table
1125 @param[in]	offsets		rec_get_offsets(rec)
1126 @param[in]	i		rec field corresponding to col
1127 @param[in]	page_size	page size of the old table
1128 @param[in]	max_len		maximum length of dfield
1129 @retval DB_INVALID_NULL		if a NULL value is encountered
1130 @retval DB_TOO_BIG_INDEX_COL	if the maximum prefix length is exceeded */
1131 static
1132 dberr_t
row_log_table_get_pk_col(const dict_col_t * col,const dict_field_t * ifield,dfield_t * dfield,mem_heap_t * heap,const rec_t * rec,const ulint * offsets,ulint i,const page_size_t & page_size,ulint max_len)1133 row_log_table_get_pk_col(
1134 	const dict_col_t*	col,
1135 	const dict_field_t*	ifield,
1136 	dfield_t*		dfield,
1137 	mem_heap_t*		heap,
1138 	const rec_t*		rec,
1139 	const ulint*		offsets,
1140 	ulint			i,
1141 	const page_size_t&	page_size,
1142 	ulint			max_len)
1143 {
1144 	const byte*	field;
1145 	ulint		len;
1146 
1147 	field = rec_get_nth_field(rec, offsets, i, &len);
1148 
1149 	if (len == UNIV_SQL_NULL) {
1150 		return(DB_INVALID_NULL);
1151 	}
1152 
1153 	if (rec_offs_nth_extern(offsets, i)) {
1154 		ulint	field_len = ifield->prefix_len;
1155 		byte*	blob_field;
1156 
1157 		if (!field_len) {
1158 			field_len = ifield->fixed_len;
1159 			if (!field_len) {
1160 				field_len = max_len + 1;
1161 			}
1162 		}
1163 
1164 		blob_field = static_cast<byte*>(
1165 			mem_heap_alloc(heap, field_len));
1166 
1167 		len = btr_copy_externally_stored_field_prefix(
1168 			blob_field, field_len, page_size, field, len);
1169 		if (len >= max_len + 1) {
1170 			return(DB_TOO_BIG_INDEX_COL);
1171 		}
1172 
1173 		dfield_set_data(dfield, blob_field, len);
1174 	} else {
1175 		dfield_set_data(dfield, mem_heap_dup(heap, field, len), len);
1176 	}
1177 
1178 	return(DB_SUCCESS);
1179 }
1180 
1181 /******************************************************//**
1182 Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR
1183 of a table that is being rebuilt.
1184 @return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table,
1185 or NULL if the PRIMARY KEY definition does not change */
1186 const dtuple_t*
row_log_table_get_pk(const rec_t * rec,dict_index_t * index,const ulint * offsets,byte * sys,mem_heap_t ** heap)1187 row_log_table_get_pk(
1188 /*=================*/
1189 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
1190 				page X-latched */
1191 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
1192 				or X-latched */
1193 	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
1194 	byte*		sys,	/*!< out: DB_TRX_ID,DB_ROLL_PTR for
1195 				row_log_table_delete(), or NULL */
1196 	mem_heap_t**	heap)	/*!< in/out: memory heap where allocated */
1197 {
1198 	dtuple_t*	tuple	= NULL;
1199 	row_log_t*	log	= index->online_log;
1200 
1201 	ut_ad(dict_index_is_clust(index));
1202 	ut_ad(dict_index_is_online_ddl(index));
1203 	ut_ad(!offsets || rec_offs_validate(rec, index, offsets));
1204 	ut_ad(rw_lock_own_flagged(
1205 			&index->lock,
1206 			RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1207 
1208 	ut_ad(log);
1209 	ut_ad(log->table);
1210 
1211 	if (log->same_pk) {
1212 		/* The PRIMARY KEY columns are unchanged. */
1213 		if (sys) {
1214 			/* Store the DB_TRX_ID,DB_ROLL_PTR. */
1215 			ulint	trx_id_offs = index->trx_id_offset;
1216 
1217 			if (!trx_id_offs) {
1218 				ulint	pos = dict_index_get_sys_col_pos(
1219 					index, DATA_TRX_ID);
1220 				ulint	len;
1221 				ut_ad(pos > 0);
1222 
1223 				if (!offsets) {
1224 					offsets = rec_get_offsets(
1225 						rec, index, NULL, pos + 1,
1226 						heap);
1227 				}
1228 
1229 				trx_id_offs = rec_get_nth_field_offs(
1230 					offsets, pos, &len);
1231 				ut_ad(len == DATA_TRX_ID_LEN);
1232 			}
1233 
1234 			memcpy(sys, rec + trx_id_offs,
1235 			       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1236 		}
1237 
1238 		return(NULL);
1239 	}
1240 
1241 	mutex_enter(&log->mutex);
1242 
1243 	/* log->error is protected by log->mutex. */
1244 	if (log->error == DB_SUCCESS) {
1245 		dict_table_t*	new_table	= log->table;
1246 		dict_index_t*	new_index
1247 			= dict_table_get_first_index(new_table);
1248 		const ulint	new_n_uniq
1249 			= dict_index_get_n_unique(new_index);
1250 
1251 		if (!*heap) {
1252 			ulint	size = 0;
1253 
1254 			if (!offsets) {
1255 				size += (1 + REC_OFFS_HEADER_SIZE
1256 					 + index->n_fields)
1257 					* sizeof *offsets;
1258 			}
1259 
1260 			for (ulint i = 0; i < new_n_uniq; i++) {
1261 				size += dict_col_get_min_size(
1262 					dict_index_get_nth_col(new_index, i));
1263 			}
1264 
1265 			*heap = mem_heap_create(
1266 				DTUPLE_EST_ALLOC(new_n_uniq + 2) + size);
1267 		}
1268 
1269 		if (!offsets) {
1270 			offsets = rec_get_offsets(rec, index, NULL,
1271 						  ULINT_UNDEFINED, heap);
1272 		}
1273 
1274 		tuple = dtuple_create(*heap, new_n_uniq + 2);
1275 		dict_index_copy_types(tuple, new_index, tuple->n_fields);
1276 		dtuple_set_n_fields_cmp(tuple, new_n_uniq);
1277 
1278 		const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table);
1279 
1280 		const page_size_t&	page_size
1281 			= dict_table_page_size(index->table);
1282 
1283 		for (ulint new_i = 0; new_i < new_n_uniq; new_i++) {
1284 			dict_field_t*	ifield;
1285 			dfield_t*	dfield;
1286 			ulint		prtype;
1287 			ulint		mbminmaxlen;
1288 
1289 			ifield = dict_index_get_nth_field(new_index, new_i);
1290 			dfield = dtuple_get_nth_field(tuple, new_i);
1291 
1292 			const ulint	col_no
1293 				= dict_field_get_col(ifield)->ind;
1294 
1295 			if (const dict_col_t* col
1296 			    = row_log_table_get_pk_old_col(
1297 				    index->table, log->col_map, col_no)) {
1298 				ulint	i = dict_col_get_clust_pos(col, index);
1299 
1300 				if (i == ULINT_UNDEFINED) {
1301 					ut_ad(0);
1302 					log->error = DB_CORRUPTION;
1303 					goto err_exit;
1304 				}
1305 
1306 				log->error = row_log_table_get_pk_col(
1307 					col, ifield, dfield, *heap,
1308 					rec, offsets, i, page_size, max_len);
1309 
1310 				if (log->error != DB_SUCCESS) {
1311 err_exit:
1312 					tuple = NULL;
1313 					goto func_exit;
1314 				}
1315 
1316 				mbminmaxlen = col->mbminmaxlen;
1317 				prtype = col->prtype;
1318 			} else {
1319 				/* No matching column was found in the old
1320 				table, so this must be an added column.
1321 				Copy the default value. */
1322 				ut_ad(log->add_cols);
1323 
1324 				dfield_copy(dfield, dtuple_get_nth_field(
1325 						    log->add_cols, col_no));
1326 				mbminmaxlen = dfield->type.mbminmaxlen;
1327 				prtype = dfield->type.prtype;
1328 			}
1329 
1330 			ut_ad(!dfield_is_ext(dfield));
1331 			ut_ad(!dfield_is_null(dfield));
1332 
1333 			if (ifield->prefix_len) {
1334 				ulint	len = dtype_get_at_most_n_mbchars(
1335 					prtype, mbminmaxlen,
1336 					ifield->prefix_len,
1337 					dfield_get_len(dfield),
1338 					static_cast<const char*>(
1339 						dfield_get_data(dfield)));
1340 
1341 				ut_ad(len <= dfield_get_len(dfield));
1342 				dfield_set_len(dfield, len);
1343 			}
1344 		}
1345 
1346 		const byte* trx_roll = rec
1347 			+ row_get_trx_id_offset(index, offsets);
1348 
1349 		/* Copy the fields, because the fields will be updated
1350 		or the record may be moved somewhere else in the B-tree
1351 		as part of the upcoming operation. */
1352 		if (sys) {
1353 			memcpy(sys, trx_roll,
1354 			       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1355 			trx_roll = sys;
1356 		} else {
1357 			trx_roll = static_cast<const byte*>(
1358 				mem_heap_dup(
1359 					*heap, trx_roll,
1360 					DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN));
1361 		}
1362 
1363 		dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq),
1364 				trx_roll, DATA_TRX_ID_LEN);
1365 		dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1),
1366 				trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
1367 	}
1368 
1369 func_exit:
1370 	mutex_exit(&log->mutex);
1371 	return(tuple);
1372 }
1373 
1374 /******************************************************//**
1375 Logs an insert to a table that is being rebuilt.
1376 This will be merged in row_log_table_apply_insert(). */
1377 void
row_log_table_insert(const rec_t * rec,const dtuple_t * ventry,dict_index_t * index,const ulint * offsets)1378 row_log_table_insert(
1379 /*=================*/
1380 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
1381 				page X-latched */
1382 	const dtuple_t*	ventry,	/*!< in: dtuple holding virtual column info */
1383 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
1384 				or X-latched */
1385 	const ulint*	offsets)/*!< in: rec_get_offsets(rec,index) */
1386 {
1387 	row_log_table_low(rec, ventry, NULL, index, offsets, true, NULL);
1388 }
1389 
1390 /******************************************************//**
1391 Notes that a BLOB is being freed during online ALTER TABLE. */
1392 void
row_log_table_blob_free(dict_index_t * index,ulint page_no)1393 row_log_table_blob_free(
1394 /*====================*/
1395 	dict_index_t*	index,	/*!< in/out: clustered index, X-latched */
1396 	ulint		page_no)/*!< in: starting page number of the BLOB */
1397 {
1398 	ut_ad(dict_index_is_clust(index));
1399 	ut_ad(dict_index_is_online_ddl(index));
1400 	ut_ad(rw_lock_own_flagged(
1401 			&index->lock,
1402 			RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1403 	ut_ad(page_no != FIL_NULL);
1404 
1405 	if (index->online_log->error != DB_SUCCESS) {
1406 		return;
1407 	}
1408 
1409 	page_no_map*	blobs	= index->online_log->blobs;
1410 
1411 	if (blobs == NULL) {
1412 		index->online_log->blobs = blobs = UT_NEW_NOKEY(page_no_map());
1413 	}
1414 
1415 #ifdef UNIV_DEBUG
1416 	const ulonglong	log_pos = index->online_log->tail.total;
1417 #else
1418 # define log_pos /* empty */
1419 #endif /* UNIV_DEBUG */
1420 
1421 	const page_no_map::value_type v(page_no,
1422 					row_log_table_blob_t(log_pos));
1423 
1424 	std::pair<page_no_map::iterator,bool> p = blobs->insert(v);
1425 
1426 	if (!p.second) {
1427 		/* Update the existing mapping. */
1428 		ut_ad(p.first->first == page_no);
1429 		p.first->second.blob_free(log_pos);
1430 	}
1431 #undef log_pos
1432 }
1433 
1434 /******************************************************//**
1435 Notes that a BLOB is being allocated during online ALTER TABLE. */
1436 void
row_log_table_blob_alloc(dict_index_t * index,ulint page_no)1437 row_log_table_blob_alloc(
1438 /*=====================*/
1439 	dict_index_t*	index,	/*!< in/out: clustered index, X-latched */
1440 	ulint		page_no)/*!< in: starting page number of the BLOB */
1441 {
1442 	ut_ad(dict_index_is_clust(index));
1443 	ut_ad(dict_index_is_online_ddl(index));
1444 
1445 	ut_ad(rw_lock_own_flagged(
1446 			&index->lock,
1447 			RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1448 
1449 	ut_ad(page_no != FIL_NULL);
1450 
1451 	if (index->online_log->error != DB_SUCCESS) {
1452 		return;
1453 	}
1454 
1455 	/* Only track allocations if the same page has been freed
1456 	earlier. Double allocation without a free is not allowed. */
1457 	if (page_no_map* blobs = index->online_log->blobs) {
1458 		page_no_map::iterator p = blobs->find(page_no);
1459 
1460 		if (p != blobs->end()) {
1461 			ut_ad(p->first == page_no);
1462 			p->second.blob_alloc(index->online_log->tail.total);
1463 		}
1464 	}
1465 }
1466 
1467 /******************************************************//**
1468 Converts a log record to a table row.
1469 @return converted row, or NULL if the conversion fails */
1470 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1471 const dtuple_t*
row_log_table_apply_convert_mrec(const mrec_t * mrec,dict_index_t * index,const ulint * offsets,const row_log_t * log,mem_heap_t * heap,trx_id_t trx_id,dberr_t * error)1472 row_log_table_apply_convert_mrec(
1473 /*=============================*/
1474 	const mrec_t*		mrec,		/*!< in: merge record */
1475 	dict_index_t*		index,		/*!< in: index of mrec */
1476 	const ulint*		offsets,	/*!< in: offsets of mrec */
1477 	const row_log_t*	log,		/*!< in: rebuild context */
1478 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1479 	trx_id_t		trx_id,		/*!< in: DB_TRX_ID of mrec */
1480 	dberr_t*		error)		/*!< out: DB_SUCCESS or
1481 						DB_MISSING_HISTORY or
1482 						reason of failure */
1483 {
1484 	dtuple_t*	row;
1485 	ulint		num_v = dict_table_get_n_v_cols(log->table);
1486 
1487 	*error = DB_SUCCESS;
1488 
1489 	/* This is based on row_build(). */
1490 	if (log->add_cols) {
1491 		row = dtuple_copy(log->add_cols, heap);
1492 		/* dict_table_copy_types() would set the fields to NULL */
1493 		for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) {
1494 			dict_col_copy_type(
1495 				dict_table_get_nth_col(log->table, i),
1496 				dfield_get_type(dtuple_get_nth_field(row, i)));
1497 		}
1498 	} else {
1499 		row = dtuple_create_with_vcol(
1500 			heap, dict_table_get_n_cols(log->table), num_v);
1501 		dict_table_copy_types(row, log->table);
1502 	}
1503 
1504 	for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
1505 		const dict_field_t*	ind_field
1506 			= dict_index_get_nth_field(index, i);
1507 
1508 		if (ind_field->prefix_len) {
1509 			/* Column prefixes can only occur in key
1510 			fields, which cannot be stored externally. For
1511 			a column prefix, there should also be the full
1512 			field in the clustered index tuple. The row
1513 			tuple comprises full fields, not prefixes. */
1514 			ut_ad(!rec_offs_nth_extern(offsets, i));
1515 			continue;
1516 		}
1517 
1518 		const dict_col_t*	col
1519 			= dict_field_get_col(ind_field);
1520 
1521 		ulint			col_no
1522 			= log->col_map[dict_col_get_no(col)];
1523 
1524 		if (col_no == ULINT_UNDEFINED) {
1525 			/* dropped column */
1526 			continue;
1527 		}
1528 
1529 		dfield_t*	dfield
1530 			= dtuple_get_nth_field(row, col_no);
1531 
1532 		ulint			len;
1533 		const byte*		data;
1534 
1535 		if (rec_offs_nth_extern(offsets, i)) {
1536 			ut_ad(rec_offs_any_extern(offsets));
1537 			rw_lock_x_lock(dict_index_get_lock(index));
1538 
1539 			if (const page_no_map* blobs = log->blobs) {
1540 				data = rec_get_nth_field(
1541 					mrec, offsets, i, &len);
1542 				ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
1543 
1544 				ulint	page_no = mach_read_from_4(
1545 					data + len - (BTR_EXTERN_FIELD_REF_SIZE
1546 						      - BTR_EXTERN_PAGE_NO));
1547 				page_no_map::const_iterator p = blobs->find(
1548 					page_no);
1549 				if (p != blobs->end()
1550 				    && p->second.is_freed(log->head.total)) {
1551 					/* This BLOB has been freed.
1552 					We must not access the row. */
1553 					*error = DB_MISSING_HISTORY;
1554 					dfield_set_data(dfield, data, len);
1555 					dfield_set_ext(dfield);
1556 					goto blob_done;
1557 				}
1558 			}
1559 
1560 			data = btr_rec_copy_externally_stored_field(
1561 				mrec, offsets,
1562 				dict_table_page_size(index->table),
1563 				i, &len, heap);
1564 			ut_a(data);
1565 			dfield_set_data(dfield, data, len);
1566 blob_done:
1567 			rw_lock_x_unlock(dict_index_get_lock(index));
1568 		} else {
1569 			data = rec_get_nth_field(mrec, offsets, i, &len);
1570 			dfield_set_data(dfield, data, len);
1571 		}
1572 
1573 		if (len != UNIV_SQL_NULL && col->mtype == DATA_MYSQL
1574 		    && col->len != len && !dict_table_is_comp(log->table)) {
1575 
1576 			ut_ad(col->len >= len);
1577 			if (dict_table_is_comp(index->table)) {
1578 				byte*	buf = (byte*) mem_heap_alloc(heap,
1579 								     col->len);
1580 				memcpy(buf, dfield->data, len);
1581 				memset(buf + len, 0x20, col->len - len);
1582 
1583 				dfield_set_data(dfield, buf, col->len);
1584 			} else {
1585 				/* field length mismatch should not happen
1586 				when rebuilding the redundant row format
1587 				table. */
1588 				ut_ad(0);
1589 				*error = DB_CORRUPTION;
1590 				return(NULL);
1591 			}
1592 		}
1593 
1594 		/* See if any columns were changed to NULL or NOT NULL. */
1595 		const dict_col_t*	new_col
1596 			= dict_table_get_nth_col(log->table, col_no);
1597 		ut_ad(new_col->mtype == col->mtype);
1598 
1599 		/* Assert that prtype matches except for nullability. */
1600 		ut_ad(!((new_col->prtype ^ col->prtype) & ~DATA_NOT_NULL));
1601 		ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype)
1602 			& ~DATA_NOT_NULL));
1603 
1604 		if (new_col->prtype == col->prtype) {
1605 			continue;
1606 		}
1607 
1608 		if ((new_col->prtype & DATA_NOT_NULL)
1609 		    && dfield_is_null(dfield)) {
1610 			/* We got a NULL value for a NOT NULL column. */
1611 			*error = DB_INVALID_NULL;
1612 			return(NULL);
1613 		}
1614 
1615 		/* Adjust the DATA_NOT_NULL flag in the parsed row. */
1616 		dfield_get_type(dfield)->prtype = new_col->prtype;
1617 
1618 		ut_ad(dict_col_type_assert_equal(new_col,
1619 						 dfield_get_type(dfield)));
1620 	}
1621 
1622 	/* read the virtual column data if any */
1623 	if (num_v) {
1624 		byte* b = const_cast<byte*>(mrec)
1625 			  + rec_offs_data_size(offsets);
1626 		trx_undo_read_v_cols(log->table, b, row, false,
1627 				     &(log->col_map[log->n_old_col]));
1628 	}
1629 
1630 	return(row);
1631 }
1632 
1633 /******************************************************//**
1634 Replays an insert operation on a table that was rebuilt.
1635 @return DB_SUCCESS or error code */
1636 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1637 dberr_t
row_log_table_apply_insert_low(que_thr_t * thr,const dtuple_t * row,trx_id_t trx_id,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup)1638 row_log_table_apply_insert_low(
1639 /*===========================*/
1640 	que_thr_t*		thr,		/*!< in: query graph */
1641 	const dtuple_t*		row,		/*!< in: table row
1642 						in the old table definition */
1643 	trx_id_t		trx_id,		/*!< in: trx_id of the row */
1644 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
1645 						that can be emptied */
1646 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1647 	row_merge_dup_t*	dup)		/*!< in/out: for reporting
1648 						duplicate key errors */
1649 {
1650 	dberr_t		error;
1651 	dtuple_t*	entry;
1652 	const row_log_t*log	= dup->index->online_log;
1653 	dict_index_t*	index	= dict_table_get_first_index(log->table);
1654 	ulint		n_index = 0;
1655 
1656 	ut_ad(dtuple_validate(row));
1657 	ut_ad(trx_id);
1658 
1659 	DBUG_PRINT("ib_alter_table",
1660 		   ("insert table " IB_ID_FMT "(index " IB_ID_FMT "): %s",
1661 		    index->table->id, index->id,
1662 		    rec_printer(row).str().c_str()));
1663 
1664 	static const ulint	flags
1665 		= (BTR_CREATE_FLAG
1666 		   | BTR_NO_LOCKING_FLAG
1667 		   | BTR_NO_UNDO_LOG_FLAG
1668 		   | BTR_KEEP_SYS_FLAG);
1669 
1670 	entry = row_build_index_entry(row, NULL, index, heap);
1671 
1672 	error = row_ins_clust_index_entry_low(
1673 		flags, BTR_MODIFY_TREE, index, index->n_uniq,
1674 		entry, 0, thr, false);
1675 
1676 	switch (error) {
1677 	case DB_SUCCESS:
1678 		break;
1679 	case DB_SUCCESS_LOCKED_REC:
1680 		/* The row had already been copied to the table. */
1681 		return(DB_SUCCESS);
1682 	default:
1683 		return(error);
1684 	}
1685 
1686 	do {
1687 		n_index++;
1688 		if (!(index = dict_table_get_next_index(index))) {
1689 			break;
1690 		}
1691 
1692 		if (index->type & DICT_FTS) {
1693 			continue;
1694 		}
1695 
1696 		entry = row_build_index_entry(row, NULL, index, heap);
1697 		error = row_ins_sec_index_entry_low(
1698 			flags, BTR_MODIFY_TREE,
1699 			index, offsets_heap, heap, entry, trx_id, thr,
1700 			false);
1701 
1702 		/* Report correct index name for duplicate key error. */
1703 		if (error == DB_DUPLICATE_KEY) {
1704 			thr_get_trx(thr)->error_key_num = n_index;
1705 		}
1706 
1707 	} while (error == DB_SUCCESS);
1708 
1709 	return(error);
1710 }
1711 
1712 /******************************************************//**
1713 Replays an insert operation on a table that was rebuilt.
1714 @return DB_SUCCESS or error code */
1715 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1716 dberr_t
row_log_table_apply_insert(que_thr_t * thr,const mrec_t * mrec,const ulint * offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup,trx_id_t trx_id)1717 row_log_table_apply_insert(
1718 /*=======================*/
1719 	que_thr_t*		thr,		/*!< in: query graph */
1720 	const mrec_t*		mrec,		/*!< in: record to insert */
1721 	const ulint*		offsets,	/*!< in: offsets of mrec */
1722 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
1723 						that can be emptied */
1724 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1725 	row_merge_dup_t*	dup,		/*!< in/out: for reporting
1726 						duplicate key errors */
1727 	trx_id_t		trx_id)		/*!< in: DB_TRX_ID of mrec */
1728 {
1729 	const row_log_t*log	= dup->index->online_log;
1730 	dberr_t		error;
1731 	const dtuple_t*	row	= row_log_table_apply_convert_mrec(
1732 		mrec, dup->index, offsets, log, heap, trx_id, &error);
1733 
1734 	switch (error) {
1735 	case DB_MISSING_HISTORY:
1736 		ut_ad(log->blobs);
1737 		/* Because some BLOBs are missing, we know that the
1738 		transaction was rolled back later (a rollback of
1739 		an insert can free BLOBs).
1740 		We can simply skip the insert: the subsequent
1741 		ROW_T_DELETE will be ignored, or a ROW_T_UPDATE will
1742 		be interpreted as ROW_T_INSERT. */
1743 		return(DB_SUCCESS);
1744 	case DB_SUCCESS:
1745 		ut_ad(row != NULL);
1746 		break;
1747 	default:
1748 		ut_ad(0);
1749 	case DB_INVALID_NULL:
1750 		ut_ad(row == NULL);
1751 		return(error);
1752 	}
1753 
1754 	error = row_log_table_apply_insert_low(
1755 		thr, row, trx_id, offsets_heap, heap, dup);
1756 	if (error != DB_SUCCESS) {
1757 		/* Report the erroneous row using the new
1758 		version of the table. */
1759 		innobase_row_to_mysql(dup->table, log->table, row);
1760 	}
1761 	return(error);
1762 }
1763 
1764 /******************************************************//**
1765 Deletes a record from a table that is being rebuilt.
1766 @return DB_SUCCESS or error code */
1767 static MY_ATTRIBUTE((warn_unused_result))
1768 dberr_t
row_log_table_apply_delete_low(btr_pcur_t * pcur,const dtuple_t * ventry,const ulint * offsets,const row_ext_t * save_ext,mem_heap_t * heap,mtr_t * mtr)1769 row_log_table_apply_delete_low(
1770 /*===========================*/
1771 	btr_pcur_t*		pcur,		/*!< in/out: B-tree cursor,
1772 						will be trashed */
1773 	const dtuple_t*		ventry,		/*!< in: dtuple holding
1774 						virtual column info */
1775 	const ulint*		offsets,	/*!< in: offsets on pcur */
1776 	const row_ext_t*	save_ext,	/*!< in: saved external field
1777 						info, or NULL */
1778 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1779 	mtr_t*			mtr)		/*!< in/out: mini-transaction,
1780 						will be committed */
1781 {
1782 	dberr_t		error;
1783 	row_ext_t*	ext;
1784 	dtuple_t*	row;
1785 	dict_index_t*	index	= btr_pcur_get_btr_cur(pcur)->index;
1786 
1787 	ut_ad(dict_index_is_clust(index));
1788 
1789 	DBUG_PRINT("ib_alter_table",
1790 		   ("delete table " IB_ID_FMT "(index " IB_ID_FMT "): %s",
1791 		    index->table->id, index->id,
1792 		    rec_printer(btr_pcur_get_rec(pcur),
1793 				offsets).str().c_str()));
1794 
1795 	if (dict_table_get_next_index(index)) {
1796 		/* Build a row template for purging secondary index entries. */
1797 		row = row_build(
1798 			ROW_COPY_DATA, index, btr_pcur_get_rec(pcur),
1799 			offsets, NULL, NULL, NULL,
1800 			save_ext ? NULL : &ext, heap);
1801 		if (ventry) {
1802 			dtuple_copy_v_fields(row, ventry);
1803 		}
1804 
1805 		if (!save_ext) {
1806 			save_ext = ext;
1807 		}
1808 	} else {
1809 		row = NULL;
1810 	}
1811 
1812 	btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur),
1813 				   BTR_CREATE_FLAG, false, mtr);
1814 	mtr_commit(mtr);
1815 
1816 	if (error != DB_SUCCESS) {
1817 		return(error);
1818 	}
1819 
1820 	while ((index = dict_table_get_next_index(index)) != NULL) {
1821 		if (index->type & DICT_FTS) {
1822 			continue;
1823 		}
1824 
1825 		const dtuple_t*	entry = row_build_index_entry(
1826 			row, save_ext, index, heap);
1827 		mtr_start(mtr);
1828 		mtr->set_named_space(index->space);
1829 		btr_pcur_open(index, entry, PAGE_CUR_LE,
1830 			      BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1831 			      pcur, mtr);
1832 #ifdef UNIV_DEBUG
1833 		switch (btr_pcur_get_btr_cur(pcur)->flag) {
1834 		case BTR_CUR_DELETE_REF:
1835 		case BTR_CUR_DEL_MARK_IBUF:
1836 		case BTR_CUR_DELETE_IBUF:
1837 		case BTR_CUR_INSERT_TO_IBUF:
1838 			/* We did not request buffering. */
1839 			break;
1840 		case BTR_CUR_HASH:
1841 		case BTR_CUR_HASH_FAIL:
1842 		case BTR_CUR_BINARY:
1843 			goto flag_ok;
1844 		}
1845 		ut_ad(0);
1846 flag_ok:
1847 #endif /* UNIV_DEBUG */
1848 
1849 		if (page_rec_is_infimum(btr_pcur_get_rec(pcur))
1850 		    || btr_pcur_get_low_match(pcur) < index->n_uniq) {
1851 			/* All secondary index entries should be
1852 			found, because new_table is being modified by
1853 			this thread only, and all indexes should be
1854 			updated in sync. */
1855 			mtr_commit(mtr);
1856 			return(DB_INDEX_CORRUPT);
1857 		}
1858 
1859 		btr_cur_pessimistic_delete(&error, FALSE,
1860 					   btr_pcur_get_btr_cur(pcur),
1861 					   BTR_CREATE_FLAG, false, mtr);
1862 		mtr_commit(mtr);
1863 	}
1864 
1865 	return(error);
1866 }
1867 
1868 /******************************************************//**
1869 Replays a delete operation on a table that was rebuilt.
1870 @return DB_SUCCESS or error code */
1871 static MY_ATTRIBUTE((nonnull(1, 3, 4, 5, 6, 7), warn_unused_result))
1872 dberr_t
row_log_table_apply_delete(que_thr_t * thr,ulint trx_id_col,const mrec_t * mrec,const ulint * moffsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const row_log_t * log,const row_ext_t * save_ext,ulint ext_size)1873 row_log_table_apply_delete(
1874 /*=======================*/
1875 	que_thr_t*		thr,		/*!< in: query graph */
1876 	ulint			trx_id_col,	/*!< in: position of
1877 						DB_TRX_ID in the new
1878 						clustered index */
1879 	const mrec_t*		mrec,		/*!< in: merge record */
1880 	const ulint*		moffsets,	/*!< in: offsets of mrec */
1881 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
1882 						that can be emptied */
1883 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1884 	const row_log_t*	log,		/*!< in: online log */
1885 	const row_ext_t*	save_ext,	/*!< in: saved external field
1886 						info, or NULL */
1887 	ulint			ext_size)	/*!< in: external field size */
1888 {
1889 	dict_table_t*	new_table = log->table;
1890 	dict_index_t*	index = dict_table_get_first_index(new_table);
1891 	dtuple_t*	old_pk;
1892 	mtr_t		mtr;
1893 	btr_pcur_t	pcur;
1894 	ulint*		offsets;
1895 	ulint		num_v = new_table->n_v_cols;
1896 
1897 	ut_ad(rec_offs_n_fields(moffsets)
1898 	      == dict_index_get_n_unique(index) + 2);
1899 	ut_ad(!rec_offs_any_extern(moffsets));
1900 
1901 	/* Convert the row to a search tuple. */
1902 	old_pk = dtuple_create_with_vcol(heap, index->n_uniq, num_v);
1903 	dict_index_copy_types(old_pk, index, index->n_uniq);
1904 
1905 	if (num_v) {
1906                 dict_table_copy_v_types(old_pk, index->table);
1907         }
1908 
1909 	for (ulint i = 0; i < index->n_uniq; i++) {
1910 		ulint		len;
1911 		const void*	field;
1912 		field = rec_get_nth_field(mrec, moffsets, i, &len);
1913 		ut_ad(len != UNIV_SQL_NULL);
1914 		dfield_set_data(dtuple_get_nth_field(old_pk, i),
1915 				field, len);
1916 	}
1917 
1918 	mtr_start(&mtr);
1919 	mtr.set_named_space(index->space);
1920 	btr_pcur_open(index, old_pk, PAGE_CUR_LE,
1921 		      BTR_MODIFY_TREE | BTR_LATCH_FOR_DELETE,
1922 		      &pcur, &mtr);
1923 #ifdef UNIV_DEBUG
1924 	switch (btr_pcur_get_btr_cur(&pcur)->flag) {
1925 	case BTR_CUR_DELETE_REF:
1926 	case BTR_CUR_DEL_MARK_IBUF:
1927 	case BTR_CUR_DELETE_IBUF:
1928 	case BTR_CUR_INSERT_TO_IBUF:
1929 		/* We did not request buffering. */
1930 		break;
1931 	case BTR_CUR_HASH:
1932 	case BTR_CUR_HASH_FAIL:
1933 	case BTR_CUR_BINARY:
1934 		goto flag_ok;
1935 	}
1936 	ut_ad(0);
1937 flag_ok:
1938 #endif /* UNIV_DEBUG */
1939 
1940 	if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
1941 	    || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
1942 all_done:
1943 		mtr_commit(&mtr);
1944 		/* The record was not found. All done. */
1945 		/* This should only happen when an earlier
1946 		ROW_T_INSERT was skipped or
1947 		ROW_T_UPDATE was interpreted as ROW_T_DELETE
1948 		due to BLOBs having been freed by rollback. */
1949 		return(DB_SUCCESS);
1950 	}
1951 
1952 	offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL,
1953 				  ULINT_UNDEFINED, &offsets_heap);
1954 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
1955 	ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets));
1956 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
1957 
1958 	/* Only remove the record if DB_TRX_ID,DB_ROLL_PTR match. */
1959 
1960 	{
1961 		ulint		len;
1962 		const byte*	mrec_trx_id
1963 			= rec_get_nth_field(mrec, moffsets, trx_id_col, &len);
1964 		ut_ad(len == DATA_TRX_ID_LEN);
1965 		const byte*	rec_trx_id
1966 			= rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1967 					    trx_id_col, &len);
1968 		ut_ad(len == DATA_TRX_ID_LEN);
1969 
1970 		ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len)
1971 		      == mrec_trx_id + DATA_TRX_ID_LEN);
1972 		ut_ad(len == DATA_ROLL_PTR_LEN);
1973 		ut_ad(rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1974 					trx_id_col + 1, &len)
1975 		      == rec_trx_id + DATA_TRX_ID_LEN);
1976 		ut_ad(len == DATA_ROLL_PTR_LEN);
1977 
1978 		if (memcmp(mrec_trx_id, rec_trx_id,
1979 			   DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
1980 			/* The ROW_T_DELETE was logged for a different
1981 			PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR.
1982 			This is possible if a ROW_T_INSERT was skipped
1983 			or a ROW_T_UPDATE was interpreted as ROW_T_DELETE
1984 			because some BLOBs were missing due to
1985 			(1) rolling back the initial insert, or
1986 			(2) purging the BLOB for a later ROW_T_DELETE
1987 			(3) purging 'old values' for a later ROW_T_UPDATE
1988 			or ROW_T_DELETE. */
1989 			ut_ad(!log->same_pk);
1990 			goto all_done;
1991 		}
1992 	}
1993 
1994 	if (num_v) {
1995                 byte* b = (byte*)mrec + rec_offs_data_size(moffsets)
1996 			  + ext_size;
1997                 trx_undo_read_v_cols(log->table, b, old_pk, false,
1998 				     &(log->col_map[log->n_old_col]));
1999         }
2000 
2001 	return(row_log_table_apply_delete_low(&pcur, old_pk,
2002 					      offsets, save_ext,
2003 					      heap, &mtr));
2004 }
2005 
2006 /******************************************************//**
2007 Replays an update operation on a table that was rebuilt.
2008 @return DB_SUCCESS or error code */
2009 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2010 dberr_t
row_log_table_apply_update(que_thr_t * thr,ulint new_trx_id_col,const mrec_t * mrec,const ulint * offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup,trx_id_t trx_id,const dtuple_t * old_pk)2011 row_log_table_apply_update(
2012 /*=======================*/
2013 	que_thr_t*		thr,		/*!< in: query graph */
2014 	ulint			new_trx_id_col,	/*!< in: position of
2015 						DB_TRX_ID in the new
2016 						clustered index */
2017 	const mrec_t*		mrec,		/*!< in: new value */
2018 	const ulint*		offsets,	/*!< in: offsets of mrec */
2019 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
2020 						that can be emptied */
2021 	mem_heap_t*		heap,		/*!< in/out: memory heap */
2022 	row_merge_dup_t*	dup,		/*!< in/out: for reporting
2023 						duplicate key errors */
2024 	trx_id_t		trx_id,		/*!< in: DB_TRX_ID of mrec */
2025 	const dtuple_t*		old_pk)		/*!< in: PRIMARY KEY and
2026 						DB_TRX_ID,DB_ROLL_PTR
2027 						of the old value,
2028 						or PRIMARY KEY if same_pk */
2029 {
2030 	const row_log_t*log	= dup->index->online_log;
2031 	const dtuple_t*	row;
2032 	dict_index_t*	index	= dict_table_get_first_index(log->table);
2033 	mtr_t		mtr;
2034 	btr_pcur_t	pcur;
2035 	dberr_t		error;
2036 	ulint		n_index = 0;
2037 
2038 	ut_ad(dtuple_get_n_fields_cmp(old_pk)
2039 	      == dict_index_get_n_unique(index));
2040 	ut_ad(dtuple_get_n_fields(old_pk)
2041 	      == dict_index_get_n_unique(index)
2042 	      + (log->same_pk ? 0 : 2));
2043 
2044 	row = row_log_table_apply_convert_mrec(
2045 		mrec, dup->index, offsets, log, heap, trx_id, &error);
2046 
2047 	switch (error) {
2048 	case DB_MISSING_HISTORY:
2049 		/* The record contained BLOBs that are now missing. */
2050 		ut_ad(log->blobs);
2051 		/* Whether or not we are updating the PRIMARY KEY, we
2052 		know that there should be a subsequent
2053 		ROW_T_DELETE for rolling back a preceding ROW_T_INSERT,
2054 		overriding this ROW_T_UPDATE record. (*1)
2055 
2056 		This allows us to interpret this ROW_T_UPDATE
2057 		as ROW_T_DELETE.
2058 
2059 		When applying the subsequent ROW_T_DELETE, no matching
2060 		record will be found. */
2061 		/* Fall through. */
2062 	case DB_SUCCESS:
2063 		ut_ad(row != NULL);
2064 		break;
2065 	default:
2066 		ut_ad(0);
2067 	case DB_INVALID_NULL:
2068 		ut_ad(row == NULL);
2069 		return(error);
2070 	}
2071 
2072 	mtr_start(&mtr);
2073 	mtr.set_named_space(index->space);
2074 	btr_pcur_open(index, old_pk, PAGE_CUR_LE,
2075 		      BTR_MODIFY_TREE, &pcur, &mtr);
2076 #ifdef UNIV_DEBUG
2077 	switch (btr_pcur_get_btr_cur(&pcur)->flag) {
2078 	case BTR_CUR_DELETE_REF:
2079 	case BTR_CUR_DEL_MARK_IBUF:
2080 	case BTR_CUR_DELETE_IBUF:
2081 	case BTR_CUR_INSERT_TO_IBUF:
2082 		ut_ad(0);/* We did not request buffering. */
2083 	case BTR_CUR_HASH:
2084 	case BTR_CUR_HASH_FAIL:
2085 	case BTR_CUR_BINARY:
2086 		break;
2087 	}
2088 #endif /* UNIV_DEBUG */
2089 
2090 	if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
2091 	    || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
2092 		/* The record was not found. This should only happen
2093 		when an earlier ROW_T_INSERT or ROW_T_UPDATE was
2094 		diverted because BLOBs were freed when the insert was
2095 		later rolled back. */
2096 
2097 		ut_ad(log->blobs);
2098 
2099 		if (error == DB_SUCCESS) {
2100 			/* An earlier ROW_T_INSERT could have been
2101 			skipped because of a missing BLOB, like this:
2102 
2103 			BEGIN;
2104 			INSERT INTO t SET blob_col='blob value';
2105 			UPDATE t SET blob_col='';
2106 			ROLLBACK;
2107 
2108 			This would generate the following records:
2109 			ROW_T_INSERT (referring to 'blob value')
2110 			ROW_T_UPDATE
2111 			ROW_T_UPDATE (referring to 'blob value')
2112 			ROW_T_DELETE
2113 			[ROLLBACK removes the 'blob value']
2114 
2115 			The ROW_T_INSERT would have been skipped
2116 			because of a missing BLOB. Now we are
2117 			executing the first ROW_T_UPDATE.
2118 			The second ROW_T_UPDATE (for the ROLLBACK)
2119 			would be interpreted as ROW_T_DELETE, because
2120 			the BLOB would be missing.
2121 
2122 			We could probably assume that the transaction
2123 			has been rolled back and simply skip the
2124 			'insert' part of this ROW_T_UPDATE record.
2125 			However, there might be some complex scenario
2126 			that could interfere with such a shortcut.
2127 			So, we will insert the row (and risk
2128 			introducing a bogus duplicate key error
2129 			for the ALTER TABLE), and a subsequent
2130 			ROW_T_UPDATE or ROW_T_DELETE will delete it. */
2131 			mtr_commit(&mtr);
2132 			error = row_log_table_apply_insert_low(
2133 				thr, row, trx_id, offsets_heap, heap, dup);
2134 		} else {
2135 			/* Some BLOBs are missing, so we are interpreting
2136 			this ROW_T_UPDATE as ROW_T_DELETE (see *1).
2137 			Because the record was not found, we do nothing. */
2138 			ut_ad(error == DB_MISSING_HISTORY);
2139 			error = DB_SUCCESS;
2140 func_exit:
2141 			mtr_commit(&mtr);
2142 		}
2143 func_exit_committed:
2144 		ut_ad(mtr.has_committed());
2145 
2146 		if (error != DB_SUCCESS) {
2147 			/* Report the erroneous row using the new
2148 			version of the table. */
2149 			innobase_row_to_mysql(dup->table, log->table, row);
2150 		}
2151 
2152 		return(error);
2153 	}
2154 
2155 	/* Prepare to update (or delete) the record. */
2156 	ulint*		cur_offsets	= rec_get_offsets(
2157 		btr_pcur_get_rec(&pcur),
2158 		index, NULL, ULINT_UNDEFINED, &offsets_heap);
2159 
2160 	if (!log->same_pk) {
2161 		/* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what
2162 		was buffered. */
2163 		ulint		len;
2164 		const void*	rec_trx_id
2165 			= rec_get_nth_field(btr_pcur_get_rec(&pcur),
2166 					    cur_offsets, index->n_uniq, &len);
2167 		ut_ad(len == DATA_TRX_ID_LEN);
2168 		ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq)->len
2169 		      == DATA_TRX_ID_LEN);
2170 		ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq + 1)->len
2171 		      == DATA_ROLL_PTR_LEN);
2172 		ut_ad(DATA_TRX_ID_LEN + static_cast<const char*>(
2173 			      dtuple_get_nth_field(old_pk,
2174 						   index->n_uniq)->data)
2175 		      == dtuple_get_nth_field(old_pk,
2176 					      index->n_uniq + 1)->data);
2177 		if (memcmp(rec_trx_id,
2178 			   dtuple_get_nth_field(old_pk, index->n_uniq)->data,
2179 			   DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
2180 			/* The ROW_T_UPDATE was logged for a different
2181 			DB_TRX_ID,DB_ROLL_PTR. This is possible if an
2182 			earlier ROW_T_INSERT or ROW_T_UPDATE was diverted
2183 			because some BLOBs were missing due to rolling
2184 			back the initial insert or due to purging
2185 			the old BLOB values of an update. */
2186 			ut_ad(log->blobs);
2187 			if (error != DB_SUCCESS) {
2188 				ut_ad(error == DB_MISSING_HISTORY);
2189 				/* Some BLOBs are missing, so we are
2190 				interpreting this ROW_T_UPDATE as
2191 				ROW_T_DELETE (see *1).
2192 				Because this is a different row,
2193 				we will do nothing. */
2194 				error = DB_SUCCESS;
2195 			} else {
2196 				/* Because the user record is missing due to
2197 				BLOBs that were missing when processing
2198 				an earlier log record, we should
2199 				interpret the ROW_T_UPDATE as ROW_T_INSERT.
2200 				However, there is a different user record
2201 				with the same PRIMARY KEY value already. */
2202 				error = DB_DUPLICATE_KEY;
2203 			}
2204 
2205 			goto func_exit;
2206 		}
2207 	}
2208 
2209 	if (error != DB_SUCCESS) {
2210 		ut_ad(error == DB_MISSING_HISTORY);
2211 		ut_ad(log->blobs);
2212 		/* Some BLOBs are missing, so we are interpreting
2213 		this ROW_T_UPDATE as ROW_T_DELETE (see *1). */
2214 		error = row_log_table_apply_delete_low(
2215 			&pcur, old_pk, cur_offsets, NULL, heap, &mtr);
2216 		goto func_exit_committed;
2217 	}
2218 
2219 	/** It allows to create tuple with virtual column information. */
2220 	dtuple_t*	entry	= row_build_index_entry_low(
2221 		row, NULL, index, heap, ROW_BUILD_FOR_INSERT);
2222 	upd_t*		update	= row_upd_build_difference_binary(
2223 		index, entry, btr_pcur_get_rec(&pcur), cur_offsets,
2224 		false, NULL, heap, dup->table, &error);
2225 	if (error != DB_SUCCESS) {
2226 			goto func_exit;
2227 	}
2228 
2229 	if (!update->n_fields) {
2230 		/* Nothing to do. */
2231 		goto func_exit;
2232 	}
2233 
2234 	const bool	pk_updated
2235 		= upd_get_nth_field(update, 0)->field_no < new_trx_id_col;
2236 
2237 	if (pk_updated || rec_offs_any_extern(cur_offsets)) {
2238 		/* If the record contains any externally stored
2239 		columns, perform the update by delete and insert,
2240 		because we will not write any undo log that would
2241 		allow purge to free any orphaned externally stored
2242 		columns. */
2243 
2244 		if (pk_updated && log->same_pk) {
2245 			/* The ROW_T_UPDATE log record should only be
2246 			written when the PRIMARY KEY fields of the
2247 			record did not change in the old table.  We
2248 			can only get a change of PRIMARY KEY columns
2249 			in the rebuilt table if the PRIMARY KEY was
2250 			redefined (!same_pk). */
2251 			ut_ad(0);
2252 			error = DB_CORRUPTION;
2253 			goto func_exit;
2254 		}
2255 
2256 		error = row_log_table_apply_delete_low(
2257 			&pcur, old_pk, cur_offsets, NULL, heap, &mtr);
2258 		ut_ad(mtr.has_committed());
2259 
2260 		if (error == DB_SUCCESS) {
2261 			error = row_log_table_apply_insert_low(
2262 				thr, row, trx_id, offsets_heap, heap, dup);
2263 		}
2264 
2265 		goto func_exit_committed;
2266 	}
2267 
2268 	dtuple_t*	old_row;
2269 	row_ext_t*	old_ext;
2270 
2271 	if (dict_table_get_next_index(index) != NULL) {
2272 		/* Construct the row corresponding to the old value of
2273 		the record. */
2274 		old_row = row_build(
2275                         ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur),
2276                         cur_offsets, NULL, NULL, NULL, &old_ext, heap);
2277 		ut_ad(old_row);
2278 		DBUG_PRINT("ib_alter_table",
2279 			   ("update table " IB_ID_FMT
2280 			    "(index " IB_ID_FMT "): %s to %s",
2281 			    index->table->id, index->id,
2282 			    rec_printer(old_row).str().c_str(),
2283 			    rec_printer(row).str().c_str()));
2284 	} else {
2285 		old_row = NULL;
2286 		old_ext = NULL;
2287 	}
2288 
2289 	big_rec_t*	big_rec;
2290 
2291 	error = btr_cur_pessimistic_update(
2292 		BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2293 		| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG
2294 		| BTR_KEEP_POS_FLAG,
2295 		btr_pcur_get_btr_cur(&pcur),
2296 		&cur_offsets, &offsets_heap, heap, &big_rec,
2297 		update, 0, thr, 0, &mtr);
2298 
2299 	if (big_rec) {
2300 		if (error == DB_SUCCESS) {
2301 			error = btr_store_big_rec_extern_fields(
2302 				&pcur, update, cur_offsets, big_rec, &mtr,
2303 				BTR_STORE_UPDATE);
2304 		}
2305 
2306 		dtuple_big_rec_free(big_rec);
2307 	}
2308 
2309 	bool vfields_copied = false;
2310 	while ((index = dict_table_get_next_index(index)) != NULL) {
2311 
2312 		n_index++;
2313 		if (error != DB_SUCCESS) {
2314 			break;
2315 		}
2316 
2317 		if (index->type & DICT_FTS) {
2318 			continue;
2319 		}
2320 
2321 		if (!vfields_copied && dict_index_has_virtual(index)) {
2322 			dtuple_copy_v_fields(old_row, old_pk);
2323 			vfields_copied = true;
2324 		}
2325 
2326 		if (!row_upd_changes_ord_field_binary(
2327 			    index, update, thr, old_row, NULL)) {
2328 			continue;
2329 		}
2330 
2331 		mtr_commit(&mtr);
2332 
2333 		entry = row_build_index_entry(old_row, old_ext, index, heap);
2334 		if (!entry) {
2335 			ut_ad(0);
2336 			return(DB_CORRUPTION);
2337 		}
2338 
2339 		mtr_start(&mtr);
2340 		mtr.set_named_space(index->space);
2341 
2342 		if (ROW_FOUND != row_search_index_entry(
2343 			    index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) {
2344 			ut_ad(0);
2345 			error = DB_CORRUPTION;
2346 			break;
2347 		}
2348 
2349 		btr_cur_pessimistic_delete(
2350 			&error, FALSE, btr_pcur_get_btr_cur(&pcur),
2351 			BTR_CREATE_FLAG, false, &mtr);
2352 
2353 		if (error != DB_SUCCESS) {
2354 			break;
2355 		}
2356 
2357 		mtr_commit(&mtr);
2358 
2359 		entry = row_build_index_entry(row, NULL, index, heap);
2360 		error = row_ins_sec_index_entry_low(
2361 			BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2362 			| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
2363 			BTR_MODIFY_TREE, index, offsets_heap, heap,
2364 			entry, trx_id, thr, false);
2365 
2366 		/* Report correct index name for duplicate key error. */
2367 		if (error == DB_DUPLICATE_KEY) {
2368 			thr_get_trx(thr)->error_key_num = n_index;
2369 		}
2370 
2371 		mtr_start(&mtr);
2372 		mtr.set_named_space(index->space);
2373 	}
2374 
2375 	goto func_exit;
2376 }
2377 
2378 /******************************************************//**
2379 Applies an operation to a table that was rebuilt.
2380 @return NULL on failure (mrec corruption) or when out of data;
2381 pointer to next record on success */
2382 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2383 const mrec_t*
row_log_table_apply_op(que_thr_t * thr,ulint trx_id_col,ulint new_trx_id_col,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,mem_heap_t * heap,const mrec_t * mrec,const mrec_t * mrec_end,ulint * offsets)2384 row_log_table_apply_op(
2385 /*===================*/
2386 	que_thr_t*		thr,		/*!< in: query graph */
2387 	ulint			trx_id_col,	/*!< in: position of
2388 						DB_TRX_ID in old index */
2389 	ulint			new_trx_id_col,	/*!< in: position of
2390 						DB_TRX_ID in new index */
2391 	row_merge_dup_t*	dup,		/*!< in/out: for reporting
2392 						duplicate key errors */
2393 	dberr_t*		error,		/*!< out: DB_SUCCESS
2394 						or error code */
2395 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
2396 						that can be emptied */
2397 	mem_heap_t*		heap,		/*!< in/out: memory heap */
2398 	const mrec_t*		mrec,		/*!< in: merge record */
2399 	const mrec_t*		mrec_end,	/*!< in: end of buffer */
2400 	ulint*			offsets)	/*!< in/out: work area
2401 						for parsing mrec */
2402 {
2403 	row_log_t*	log	= dup->index->online_log;
2404 	dict_index_t*	new_index = dict_table_get_first_index(log->table);
2405 	ulint		extra_size;
2406 	const mrec_t*	next_mrec;
2407 	dtuple_t*	old_pk;
2408 	row_ext_t*	ext;
2409 	ulint		ext_size;
2410 
2411 	ut_ad(dict_index_is_clust(dup->index));
2412 	ut_ad(dup->index->table != log->table);
2413 	ut_ad(log->head.total <= log->tail.total);
2414 
2415 	*error = DB_SUCCESS;
2416 
2417 	/* 3 = 1 (op type) + 1 (ext_size) + at least 1 byte payload */
2418 	if (mrec + 3 >= mrec_end) {
2419 		return(NULL);
2420 	}
2421 
2422 	const mrec_t* const mrec_start = mrec;
2423 
2424 	switch (*mrec++) {
2425 	default:
2426 		ut_ad(0);
2427 		*error = DB_CORRUPTION;
2428 		return(NULL);
2429 	case ROW_T_INSERT:
2430 		extra_size = *mrec++;
2431 
2432 		if (extra_size >= 0x80) {
2433 			/* Read another byte of extra_size. */
2434 
2435 			extra_size = (extra_size & 0x7f) << 8;
2436 			extra_size |= *mrec++;
2437 		}
2438 
2439 		mrec += extra_size;
2440 
2441 		if (mrec > mrec_end) {
2442 			return(NULL);
2443 		}
2444 
2445 		rec_offs_set_n_fields(offsets, dup->index->n_fields);
2446 		rec_init_offsets_temp(mrec, dup->index, offsets);
2447 
2448 		next_mrec = mrec + rec_offs_data_size(offsets);
2449 
2450 		if (log->table->n_v_cols) {
2451 			if (next_mrec + 2 > mrec_end) {
2452 				return(NULL);
2453 			}
2454 			next_mrec += mach_read_from_2(next_mrec);
2455 		}
2456 
2457 		if (next_mrec > mrec_end) {
2458 			return(NULL);
2459 		} else {
2460 			log->head.total += next_mrec - mrec_start;
2461 
2462 			ulint		len;
2463 			const byte*	db_trx_id
2464 				= rec_get_nth_field(
2465 					mrec, offsets, trx_id_col, &len);
2466 			ut_ad(len == DATA_TRX_ID_LEN);
2467 			*error = row_log_table_apply_insert(
2468 				thr, mrec, offsets, offsets_heap,
2469 				heap, dup, trx_read_trx_id(db_trx_id));
2470 		}
2471 		break;
2472 
2473 	case ROW_T_DELETE:
2474 		/* 1 (extra_size) + 4 (ext_size) + at least 1 (payload) */
2475 		if (mrec + 6 >= mrec_end) {
2476 			return(NULL);
2477 		}
2478 
2479 		extra_size = *mrec++;
2480 		ext_size = mach_read_from_4(mrec);
2481 		mrec += 4;
2482 		ut_ad(mrec < mrec_end);
2483 
2484 		/* We assume extra_size < 0x100 for the PRIMARY KEY prefix.
2485 		For fixed-length PRIMARY key columns, it is 0. */
2486 		mrec += extra_size;
2487 
2488 		rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
2489 		rec_init_offsets_temp(mrec, new_index, offsets);
2490 		next_mrec = mrec + rec_offs_data_size(offsets) + ext_size;
2491 		if (log->table->n_v_cols) {
2492 			if (next_mrec + 2 > mrec_end) {
2493 				return(NULL);
2494 			}
2495 
2496 			next_mrec += mach_read_from_2(next_mrec);
2497 		}
2498 
2499 		if (next_mrec > mrec_end) {
2500 			return(NULL);
2501 		}
2502 
2503 		log->head.total += next_mrec - mrec_start;
2504 
2505 		/* If there are external fields, retrieve those logged
2506 		prefix info and reconstruct the row_ext_t */
2507 		if (ext_size) {
2508 			/* We use memcpy to avoid unaligned
2509 			access on some non-x86 platforms.*/
2510 			ext = static_cast<row_ext_t*>(
2511 				mem_heap_dup(heap,
2512 					     mrec + rec_offs_data_size(offsets),
2513 					     ext_size));
2514 
2515 			byte*	ext_start = reinterpret_cast<byte*>(ext);
2516 
2517 			ulint	ext_len = sizeof(*ext)
2518 				+ (ext->n_ext - 1) * sizeof ext->len;
2519 
2520 			ext->ext = reinterpret_cast<ulint*>(ext_start + ext_len);
2521 			ext_len += ext->n_ext * sizeof(*ext->ext);
2522 
2523 			ext->buf = static_cast<byte*>(ext_start + ext_len);
2524 		} else {
2525 			ext = NULL;
2526 		}
2527 
2528 		*error = row_log_table_apply_delete(
2529 			thr, new_trx_id_col,
2530 			mrec, offsets, offsets_heap, heap,
2531 			log, ext, ext_size);
2532 		break;
2533 
2534 	case ROW_T_UPDATE:
2535 		/* Logically, the log entry consists of the
2536 		(PRIMARY KEY,DB_TRX_ID) of the old value (converted
2537 		to the new primary key definition) followed by
2538 		the new value in the old table definition. If the
2539 		definition of the columns belonging to PRIMARY KEY
2540 		is not changed, the log will only contain
2541 		DB_TRX_ID,new_row. */
2542 		ulint           num_v = new_index->table->n_v_cols;
2543 
2544 		if (dup->index->online_log->same_pk) {
2545 			ut_ad(new_index->n_uniq == dup->index->n_uniq);
2546 
2547 			extra_size = *mrec++;
2548 
2549 			if (extra_size >= 0x80) {
2550 				/* Read another byte of extra_size. */
2551 
2552 				extra_size = (extra_size & 0x7f) << 8;
2553 				extra_size |= *mrec++;
2554 			}
2555 
2556 			mrec += extra_size;
2557 
2558 			if (mrec > mrec_end) {
2559 				return(NULL);
2560 			}
2561 
2562 			rec_offs_set_n_fields(offsets, dup->index->n_fields);
2563 			rec_init_offsets_temp(mrec, dup->index, offsets);
2564 
2565 			next_mrec = mrec + rec_offs_data_size(offsets);
2566 
2567 			if (next_mrec > mrec_end) {
2568 				return(NULL);
2569 			}
2570 
2571 			old_pk = dtuple_create_with_vcol(
2572 				heap, new_index->n_uniq, num_v);
2573 			dict_index_copy_types(
2574 				old_pk, new_index, old_pk->n_fields);
2575 			if (num_v) {
2576 		                dict_table_copy_v_types(
2577 					old_pk, new_index->table);
2578 			}
2579 
2580 			/* Copy the PRIMARY KEY fields from mrec to old_pk. */
2581 			for (ulint i = 0; i < new_index->n_uniq; i++) {
2582 				const void*	field;
2583 				ulint		len;
2584 				dfield_t*	dfield;
2585 
2586 				ut_ad(!rec_offs_nth_extern(offsets, i));
2587 
2588 				field = rec_get_nth_field(
2589 					mrec, offsets, i, &len);
2590 				ut_ad(len != UNIV_SQL_NULL);
2591 
2592 				dfield = dtuple_get_nth_field(old_pk, i);
2593 				dfield_set_data(dfield, field, len);
2594 			}
2595 		} else {
2596 			/* We assume extra_size < 0x100
2597 			for the PRIMARY KEY prefix. */
2598 			mrec += *mrec + 1;
2599 
2600 			if (mrec > mrec_end) {
2601 				return(NULL);
2602 			}
2603 
2604 			/* Get offsets for PRIMARY KEY,
2605 			DB_TRX_ID, DB_ROLL_PTR. */
2606 			rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
2607 			rec_init_offsets_temp(mrec, new_index, offsets);
2608 
2609 			next_mrec = mrec + rec_offs_data_size(offsets);
2610 			if (next_mrec + 2 > mrec_end) {
2611 				return(NULL);
2612 			}
2613 
2614 			/* Copy the PRIMARY KEY fields and
2615 			DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */
2616 			old_pk = dtuple_create_with_vcol(
2617 				heap, new_index->n_uniq + 2, num_v);
2618 			dict_index_copy_types(old_pk, new_index,
2619 					      old_pk->n_fields);
2620 
2621 			if (num_v) {
2622 		                dict_table_copy_v_types(
2623 					old_pk, new_index->table);
2624 			}
2625 
2626 			for (ulint i = 0;
2627 			     i < dict_index_get_n_unique(new_index) + 2;
2628 			     i++) {
2629 				const void*	field;
2630 				ulint		len;
2631 				dfield_t*	dfield;
2632 
2633 				ut_ad(!rec_offs_nth_extern(offsets, i));
2634 
2635 				field = rec_get_nth_field(
2636 					mrec, offsets, i, &len);
2637 				ut_ad(len != UNIV_SQL_NULL);
2638 
2639 				dfield = dtuple_get_nth_field(old_pk, i);
2640 				dfield_set_data(dfield, field, len);
2641 			}
2642 
2643 			mrec = next_mrec;
2644 
2645 			/* Fetch the new value of the row as it was
2646 			in the old table definition. */
2647 			extra_size = *mrec++;
2648 
2649 			if (extra_size >= 0x80) {
2650 				/* Read another byte of extra_size. */
2651 
2652 				extra_size = (extra_size & 0x7f) << 8;
2653 				extra_size |= *mrec++;
2654 			}
2655 
2656 			mrec += extra_size;
2657 
2658 			if (mrec > mrec_end) {
2659 				return(NULL);
2660 			}
2661 
2662 			rec_offs_set_n_fields(offsets, dup->index->n_fields);
2663 			rec_init_offsets_temp(mrec, dup->index, offsets);
2664 
2665 			next_mrec = mrec + rec_offs_data_size(offsets);
2666 
2667 			if (next_mrec > mrec_end) {
2668 				return(NULL);
2669 			}
2670 		}
2671 
2672 		/* Read virtual column info from log */
2673 		if (num_v) {
2674 			ulint		o_v_size = 0;
2675 			ulint		n_v_size = 0;
2676 
2677 			if (next_mrec + 2 > mrec_end) {
2678 				return(NULL);
2679 			}
2680 
2681 			n_v_size = mach_read_from_2(next_mrec);
2682 			next_mrec += n_v_size;
2683 			if (next_mrec > mrec_end) {
2684 				return(NULL);
2685 			}
2686 
2687 			/* if there is more than 2 bytes length info */
2688 			if (n_v_size > 2) {
2689 				trx_undo_read_v_cols(
2690 					log->table, const_cast<byte*>(
2691 					next_mrec), old_pk, false,
2692 					&(log->col_map[log->n_old_col]));
2693 				o_v_size = mach_read_from_2(next_mrec);
2694 			}
2695 
2696 			next_mrec += o_v_size;
2697 			if (next_mrec > mrec_end) {
2698 				return(NULL);
2699 			}
2700 		}
2701 
2702 		ut_ad(next_mrec <= mrec_end);
2703 		log->head.total += next_mrec - mrec_start;
2704 		dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq);
2705 
2706 		{
2707 			ulint		len;
2708 			const byte*	db_trx_id
2709 				= rec_get_nth_field(
2710 					mrec, offsets, trx_id_col, &len);
2711 			ut_ad(len == DATA_TRX_ID_LEN);
2712 			*error = row_log_table_apply_update(
2713 				thr, new_trx_id_col,
2714 				mrec, offsets, offsets_heap,
2715 				heap, dup, trx_read_trx_id(db_trx_id), old_pk);
2716 		}
2717 
2718 		break;
2719 	}
2720 
2721 	ut_ad(log->head.total <= log->tail.total);
2722 	mem_heap_empty(offsets_heap);
2723 	mem_heap_empty(heap);
2724 	return(next_mrec);
2725 }
2726 
2727 #ifdef HAVE_PSI_STAGE_INTERFACE
2728 /** Estimate how much an ALTER TABLE progress should be incremented per
2729 one block of log applied.
2730 For the other phases of ALTER TABLE we increment the progress with 1 per
2731 page processed.
2732 @return amount of abstract units to add to work_completed when one block
2733 of log is applied.
2734 */
2735 inline
2736 ulint
row_log_progress_inc_per_block()2737 row_log_progress_inc_per_block()
2738 {
2739 	/* We must increment the progress once per page (as in
2740 	univ_page_size, usually 16KiB). One block here is srv_sort_buf_size
2741 	(usually 1MiB). */
2742 	const ulint	pages_per_block = std::max(
2743 		static_cast<unsigned long>(
2744 			srv_sort_buf_size / univ_page_size.physical()),
2745 		1UL);
2746 
2747 	/* Multiply by an artificial factor of 6 to even the pace with
2748 	the rest of the ALTER TABLE phases, they process page_size amount
2749 	of data faster. */
2750 	return(pages_per_block * 6);
2751 }
2752 
2753 /** Estimate how much work is to be done by the log apply phase
2754 of an ALTER TABLE for this index.
2755 @param[in]	index	index whose log to assess
2756 @return work to be done by log-apply in abstract units
2757 */
2758 ulint
row_log_estimate_work(const dict_index_t * index)2759 row_log_estimate_work(
2760 	const dict_index_t*	index)
2761 {
2762 	if (index == NULL || index->online_log == NULL) {
2763 		return(0);
2764 	}
2765 
2766 	const row_log_t*	l = index->online_log;
2767 	const ulint		bytes_left =
2768 		static_cast<ulint>(l->tail.total - l->head.total);
2769 	const ulint		blocks_left = bytes_left / srv_sort_buf_size;
2770 
2771 	return(blocks_left * row_log_progress_inc_per_block());
2772 }
2773 #else /* HAVE_PSI_STAGE_INTERFACE */
2774 inline
2775 ulint
row_log_progress_inc_per_block()2776 row_log_progress_inc_per_block()
2777 {
2778 	return(0);
2779 }
2780 #endif /* HAVE_PSI_STAGE_INTERFACE */
2781 
2782 /** Applies operations to a table was rebuilt.
2783 @param[in]	thr	query graph
2784 @param[in,out]	dup	for reporting duplicate key errors
2785 @param[in,out]	stage	performance schema accounting object, used by
2786 ALTER TABLE. If not NULL, then stage->inc() will be called for each block
2787 of log that is applied.
2788 @return DB_SUCCESS, or error code on failure */
2789 static MY_ATTRIBUTE((warn_unused_result))
2790 dberr_t
row_log_table_apply_ops(que_thr_t * thr,row_merge_dup_t * dup,ut_stage_alter_t * stage)2791 row_log_table_apply_ops(
2792 	que_thr_t*		thr,
2793 	row_merge_dup_t*	dup,
2794 	ut_stage_alter_t*	stage)
2795 {
2796 	dberr_t		error;
2797 	const mrec_t*	mrec		= NULL;
2798 	const mrec_t*	next_mrec;
2799 	const mrec_t*	mrec_end	= NULL; /* silence bogus warning */
2800 	const mrec_t*	next_mrec_end;
2801 	mem_heap_t*	heap;
2802 	mem_heap_t*	offsets_heap;
2803 	ulint*		offsets;
2804 	bool		has_index_lock;
2805 	dict_index_t*	index		= const_cast<dict_index_t*>(
2806 		dup->index);
2807 	dict_table_t*	new_table	= index->online_log->table;
2808 	dict_index_t*	new_index	= dict_table_get_first_index(
2809 		new_table);
2810 	const ulint	i		= 1 + REC_OFFS_HEADER_SIZE
2811 		+ ut_max(dict_index_get_n_fields(index),
2812 			 dict_index_get_n_unique(new_index) + 2);
2813 	const ulint	trx_id_col	= dict_col_get_clust_pos(
2814 		dict_table_get_sys_col(index->table, DATA_TRX_ID), index);
2815 	const ulint	new_trx_id_col	= dict_col_get_clust_pos(
2816 		dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index);
2817 	trx_t*		trx		= thr_get_trx(thr);
2818 	dberr_t		err;
2819 
2820 	ut_ad(dict_index_is_clust(index));
2821 	ut_ad(dict_index_is_online_ddl(index));
2822 	ut_ad(trx->mysql_thd);
2823 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
2824 	ut_ad(!dict_index_is_online_ddl(new_index));
2825 	ut_ad(trx_id_col > 0);
2826 	ut_ad(trx_id_col != ULINT_UNDEFINED);
2827 	ut_ad(new_trx_id_col > 0);
2828 	ut_ad(new_trx_id_col != ULINT_UNDEFINED);
2829 
2830 	UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
2831 
2832 	offsets = static_cast<ulint*>(ut_malloc_nokey(i * sizeof *offsets));
2833 	offsets[0] = i;
2834 	offsets[1] = dict_index_get_n_fields(index);
2835 
2836 	heap = mem_heap_create(UNIV_PAGE_SIZE);
2837 	offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
2838 	has_index_lock = true;
2839 
2840 next_block:
2841 	ut_ad(has_index_lock);
2842 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
2843 	ut_ad(index->online_log->head.bytes == 0);
2844 
2845 	stage->inc(row_log_progress_inc_per_block());
2846 
2847 	if (trx_is_interrupted(trx)) {
2848 		goto interrupted;
2849 	}
2850 
2851 	if (dict_index_is_corrupted(index)) {
2852 		error = DB_INDEX_CORRUPT;
2853 		goto func_exit;
2854 	}
2855 
2856 	ut_ad(dict_index_is_online_ddl(index));
2857 
2858 	error = index->online_log->error;
2859 
2860 	if (error != DB_SUCCESS) {
2861 		goto func_exit;
2862 	}
2863 
2864 	if (UNIV_UNLIKELY(index->online_log->head.blocks
2865 			  > index->online_log->tail.blocks)) {
2866 unexpected_eof:
2867 		ib::error() << "Unexpected end of temporary file for table "
2868 			<< index->table->name;
2869 corruption:
2870 		error = DB_CORRUPTION;
2871 		goto func_exit;
2872 	}
2873 
2874 	if (index->online_log->head.blocks
2875 	    == index->online_log->tail.blocks) {
2876 		if (index->online_log->head.blocks) {
2877 #ifdef HAVE_FTRUNCATE
2878 			/* Truncate the file in order to save space. */
2879 			if (index->online_log->fd > 0
2880 			    && ftruncate(index->online_log->fd, 0) == -1) {
2881 				perror("ftruncate");
2882 			}
2883 #endif /* HAVE_FTRUNCATE */
2884 			index->online_log->head.blocks
2885 				= index->online_log->tail.blocks = 0;
2886 		}
2887 
2888 		next_mrec = index->online_log->tail.block;
2889 		next_mrec_end = next_mrec + index->online_log->tail.bytes;
2890 
2891 		if (next_mrec_end == next_mrec) {
2892 			/* End of log reached. */
2893 all_done:
2894 			ut_ad(has_index_lock);
2895 			ut_ad(index->online_log->head.blocks == 0);
2896 			ut_ad(index->online_log->tail.blocks == 0);
2897 			index->online_log->head.bytes = 0;
2898 			index->online_log->tail.bytes = 0;
2899 			error = DB_SUCCESS;
2900 			goto func_exit;
2901 		}
2902 	} else {
2903 		os_offset_t	ofs;
2904 
2905 		ofs = (os_offset_t) index->online_log->head.blocks
2906 			* srv_sort_buf_size;
2907 
2908 		ut_ad(has_index_lock);
2909 		has_index_lock = false;
2910 		rw_lock_x_unlock(dict_index_get_lock(index));
2911 
2912 		log_free_check();
2913 
2914 		ut_ad(dict_index_is_online_ddl(index));
2915 
2916 		if (!row_log_block_allocate(index->online_log->head)) {
2917 			error = DB_OUT_OF_MEMORY;
2918 			goto func_exit;
2919 		}
2920 
2921 		IORequest request(IORequest::READ | IORequest::ROW_LOG);
2922 		err = os_file_read_no_error_handling_int_fd(
2923 			request,
2924 			index->online_log->fd,
2925 			index->online_log->head.block, ofs,
2926 			srv_sort_buf_size,
2927 			NULL);
2928 
2929 		if (err != DB_SUCCESS) {
2930 			ib::error()
2931 				<< "Unable to read temporary file"
2932 				" for table " << index->table_name;
2933 			goto corruption;
2934 		}
2935 
2936 #ifdef POSIX_FADV_DONTNEED
2937 		/* Each block is read exactly once.  Free up the file cache. */
2938 		posix_fadvise(index->online_log->fd,
2939 			      ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
2940 #endif /* POSIX_FADV_DONTNEED */
2941 
2942 		next_mrec = index->online_log->head.block;
2943 		next_mrec_end = next_mrec + srv_sort_buf_size;
2944 	}
2945 
2946 	/* This read is not protected by index->online_log->mutex for
2947 	performance reasons. We will eventually notice any error that
2948 	was flagged by a DML thread. */
2949 	error = index->online_log->error;
2950 
2951 	if (error != DB_SUCCESS) {
2952 		goto func_exit;
2953 	}
2954 
2955 	if (mrec) {
2956 		/* A partial record was read from the previous block.
2957 		Copy the temporary buffer full, as we do not know the
2958 		length of the record. Parse subsequent records from
2959 		the bigger buffer index->online_log->head.block
2960 		or index->online_log->tail.block. */
2961 
2962 		ut_ad(mrec == index->online_log->head.buf);
2963 		ut_ad(mrec_end > mrec);
2964 		ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
2965 
2966 		memcpy((mrec_t*) mrec_end, next_mrec,
2967 		       (&index->online_log->head.buf)[1] - mrec_end);
2968 		mrec = row_log_table_apply_op(
2969 			thr, trx_id_col, new_trx_id_col,
2970 			dup, &error, offsets_heap, heap,
2971 			index->online_log->head.buf,
2972 			(&index->online_log->head.buf)[1], offsets);
2973 		if (error != DB_SUCCESS) {
2974 			goto func_exit;
2975 		} else if (UNIV_UNLIKELY(mrec == NULL)) {
2976 			/* The record was not reassembled properly. */
2977 			goto corruption;
2978 		}
2979 		/* The record was previously found out to be
2980 		truncated. Now that the parse buffer was extended,
2981 		it should proceed beyond the old end of the buffer. */
2982 		ut_a(mrec > mrec_end);
2983 
2984 		index->online_log->head.bytes = mrec - mrec_end;
2985 		next_mrec += index->online_log->head.bytes;
2986 	}
2987 
2988 	ut_ad(next_mrec <= next_mrec_end);
2989 	/* The following loop must not be parsing the temporary
2990 	buffer, but head.block or tail.block. */
2991 
2992 	/* mrec!=NULL means that the next record starts from the
2993 	middle of the block */
2994 	ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
2995 
2996 #ifdef UNIV_DEBUG
2997 	if (next_mrec_end == index->online_log->head.block
2998 	    + srv_sort_buf_size) {
2999 		/* If tail.bytes == 0, next_mrec_end can also be at
3000 		the end of tail.block. */
3001 		if (index->online_log->tail.bytes == 0) {
3002 			ut_ad(next_mrec == next_mrec_end);
3003 			ut_ad(index->online_log->tail.blocks == 0);
3004 			ut_ad(index->online_log->head.blocks == 0);
3005 			ut_ad(index->online_log->head.bytes == 0);
3006 		} else {
3007 			ut_ad(next_mrec == index->online_log->head.block
3008 			      + index->online_log->head.bytes);
3009 			ut_ad(index->online_log->tail.blocks
3010 			      > index->online_log->head.blocks);
3011 		}
3012 	} else if (next_mrec_end == index->online_log->tail.block
3013 		   + index->online_log->tail.bytes) {
3014 		ut_ad(next_mrec == index->online_log->tail.block
3015 		      + index->online_log->head.bytes);
3016 		ut_ad(index->online_log->tail.blocks == 0);
3017 		ut_ad(index->online_log->head.blocks == 0);
3018 		ut_ad(index->online_log->head.bytes
3019 		      <= index->online_log->tail.bytes);
3020 	} else {
3021 		ut_error;
3022 	}
3023 #endif /* UNIV_DEBUG */
3024 
3025 	mrec_end = next_mrec_end;
3026 
3027 	while (!trx_is_interrupted(trx)) {
3028 		mrec = next_mrec;
3029 		ut_ad(mrec <= mrec_end);
3030 
3031 		if (mrec == mrec_end) {
3032 			/* We are at the end of the log.
3033 			   Mark the replay all_done. */
3034 			if (has_index_lock) {
3035 				goto all_done;
3036 			}
3037 		}
3038 
3039 		if (!has_index_lock) {
3040 			/* We are applying operations from a different
3041 			block than the one that is being written to.
3042 			We do not hold index->lock in order to
3043 			allow other threads to concurrently buffer
3044 			modifications. */
3045 			ut_ad(mrec >= index->online_log->head.block);
3046 			ut_ad(mrec_end == index->online_log->head.block
3047 			      + srv_sort_buf_size);
3048 			ut_ad(index->online_log->head.bytes
3049 			      < srv_sort_buf_size);
3050 
3051 			/* Take the opportunity to do a redo log
3052 			checkpoint if needed. */
3053 			log_free_check();
3054 		} else {
3055 			/* We are applying operations from the last block.
3056 			Do not allow other threads to buffer anything,
3057 			so that we can finally catch up and synchronize. */
3058 			ut_ad(index->online_log->head.blocks == 0);
3059 			ut_ad(index->online_log->tail.blocks == 0);
3060 			ut_ad(mrec_end == index->online_log->tail.block
3061 			      + index->online_log->tail.bytes);
3062 			ut_ad(mrec >= index->online_log->tail.block);
3063 		}
3064 
3065 		/* This read is not protected by index->online_log->mutex
3066 		for performance reasons. We will eventually notice any
3067 		error that was flagged by a DML thread. */
3068 		error = index->online_log->error;
3069 
3070 		if (error != DB_SUCCESS) {
3071 			goto func_exit;
3072 		}
3073 
3074 		next_mrec = row_log_table_apply_op(
3075 			thr, trx_id_col, new_trx_id_col,
3076 			dup, &error, offsets_heap, heap,
3077 			mrec, mrec_end, offsets);
3078 
3079 		if (error != DB_SUCCESS) {
3080 			goto func_exit;
3081 		} else if (next_mrec == next_mrec_end) {
3082 			/* The record happened to end on a block boundary.
3083 			Do we have more blocks left? */
3084 			if (has_index_lock) {
3085 				/* The index will be locked while
3086 				applying the last block. */
3087 				goto all_done;
3088 			}
3089 
3090 			mrec = NULL;
3091 process_next_block:
3092 			rw_lock_x_lock(dict_index_get_lock(index));
3093 			has_index_lock = true;
3094 
3095 			index->online_log->head.bytes = 0;
3096 			index->online_log->head.blocks++;
3097 			goto next_block;
3098 		} else if (next_mrec != NULL) {
3099 			ut_ad(next_mrec < next_mrec_end);
3100 			index->online_log->head.bytes += next_mrec - mrec;
3101 		} else if (has_index_lock) {
3102 			/* When mrec is within tail.block, it should
3103 			be a complete record, because we are holding
3104 			index->lock and thus excluding the writer. */
3105 			ut_ad(index->online_log->tail.blocks == 0);
3106 			ut_ad(mrec_end == index->online_log->tail.block
3107 			      + index->online_log->tail.bytes);
3108 			ut_ad(0);
3109 			goto unexpected_eof;
3110 		} else {
3111 			memcpy(index->online_log->head.buf, mrec,
3112 			       mrec_end - mrec);
3113 			mrec_end += index->online_log->head.buf - mrec;
3114 			mrec = index->online_log->head.buf;
3115 			goto process_next_block;
3116 		}
3117 	}
3118 
3119 interrupted:
3120 	error = DB_INTERRUPTED;
3121 func_exit:
3122 	if (!has_index_lock) {
3123 		rw_lock_x_lock(dict_index_get_lock(index));
3124 	}
3125 
3126 	mem_heap_free(offsets_heap);
3127 	mem_heap_free(heap);
3128 	row_log_block_free(index->online_log->head);
3129 	ut_free(offsets);
3130 	return(error);
3131 }
3132 
3133 /** Apply the row_log_table log to a table upon completing rebuild.
3134 @param[in]	thr		query graph
3135 @param[in]	old_table	old table
3136 @param[in,out]	table		MySQL table (for reporting duplicates)
3137 @param[in,out]	stage		performance schema accounting object, used by
3138 ALTER TABLE. stage->begin_phase_log_table() will be called initially and then
3139 stage->inc() will be called for each block of log that is applied.
3140 @return DB_SUCCESS, or error code on failure */
3141 dberr_t
row_log_table_apply(que_thr_t * thr,dict_table_t * old_table,struct TABLE * table,ut_stage_alter_t * stage)3142 row_log_table_apply(
3143 	que_thr_t*		thr,
3144 	dict_table_t*		old_table,
3145 	struct TABLE*		table,
3146 	ut_stage_alter_t*	stage)
3147 {
3148 	dberr_t		error;
3149 	dict_index_t*	clust_index;
3150 
3151 	thr_get_trx(thr)->error_key_num = 0;
3152 	DBUG_EXECUTE_IF("innodb_trx_duplicates",
3153 			thr_get_trx(thr)->duplicates = TRX_DUP_REPLACE;);
3154 
3155 	stage->begin_phase_log_table();
3156 
3157 	ut_ad(!rw_lock_own(dict_operation_lock, RW_LOCK_S));
3158 	clust_index = dict_table_get_first_index(old_table);
3159 
3160 	rw_lock_x_lock(dict_index_get_lock(clust_index));
3161 
3162 	if (!clust_index->online_log) {
3163 		ut_ad(dict_index_get_online_status(clust_index)
3164 		      == ONLINE_INDEX_COMPLETE);
3165 		/* This function should not be called unless
3166 		rebuilding a table online. Build in some fault
3167 		tolerance. */
3168 		ut_ad(0);
3169 		error = DB_ERROR;
3170 	} else {
3171 		row_merge_dup_t	dup = {
3172 			clust_index, table,
3173 			clust_index->online_log->col_map, 0
3174 		};
3175 
3176 		error = row_log_table_apply_ops(thr, &dup, stage);
3177 
3178 		ut_ad(error != DB_SUCCESS
3179 		      || clust_index->online_log->head.total
3180 		      == clust_index->online_log->tail.total);
3181 	}
3182 
3183 	rw_lock_x_unlock(dict_index_get_lock(clust_index));
3184 	DBUG_EXECUTE_IF("innodb_trx_duplicates",
3185 			thr_get_trx(thr)->duplicates = 0;);
3186 
3187 	return(error);
3188 }
3189 
3190 /******************************************************//**
3191 Allocate the row log for an index and flag the index
3192 for online creation.
3193 @retval true if success, false if not */
3194 bool
row_log_allocate(dict_index_t * index,dict_table_t * table,bool same_pk,const dtuple_t * add_cols,const ulint * col_map,const char * path)3195 row_log_allocate(
3196 /*=============*/
3197 	dict_index_t*	index,	/*!< in/out: index */
3198 	dict_table_t*	table,	/*!< in/out: new table being rebuilt,
3199 				or NULL when creating a secondary index */
3200 	bool		same_pk,/*!< in: whether the definition of the
3201 				PRIMARY KEY has remained the same */
3202 	const dtuple_t*	add_cols,
3203 				/*!< in: default values of
3204 				added columns, or NULL */
3205 	const ulint*	col_map,/*!< in: mapping of old column
3206 				numbers to new ones, or NULL if !table */
3207 	const char*	path)	/*!< in: where to create temporary file */
3208 {
3209 	row_log_t*	log;
3210 	DBUG_ENTER("row_log_allocate");
3211 
3212 	ut_ad(!dict_index_is_online_ddl(index));
3213 	ut_ad(dict_index_is_clust(index) == !!table);
3214 	ut_ad(!table || index->table != table);
3215 	ut_ad(same_pk || table);
3216 	ut_ad(!table || col_map);
3217 	ut_ad(!add_cols || col_map);
3218 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3219 
3220 	log = static_cast<row_log_t*>(ut_malloc_nokey(sizeof *log));
3221 
3222 	if (log == NULL) {
3223 		DBUG_RETURN(false);
3224 	}
3225 
3226 	log->fd = -1;
3227 	mutex_create(LATCH_ID_INDEX_ONLINE_LOG, &log->mutex);
3228 
3229 	log->blobs = NULL;
3230 	log->table = table;
3231 	log->same_pk = same_pk;
3232 	log->add_cols = add_cols;
3233 	log->col_map = col_map;
3234 	log->error = DB_SUCCESS;
3235 	log->max_trx = 0;
3236 	log->tail.blocks = log->tail.bytes = 0;
3237 	log->tail.total = 0;
3238 	log->tail.block = log->head.block = NULL;
3239 	log->head.blocks = log->head.bytes = 0;
3240 	log->head.total = 0;
3241 	log->path = path;
3242 	log->n_old_col = index->table->n_cols;
3243 	log->n_old_vcol = index->table->n_v_cols;
3244 
3245 	dict_index_set_online_status(index, ONLINE_INDEX_CREATION);
3246 	index->online_log = log;
3247 
3248 	/* While we might be holding an exclusive data dictionary lock
3249 	here, in row_log_abort_sec() we will not always be holding it. Use
3250 	atomic operations in both cases. */
3251 	MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX);
3252 
3253 	DBUG_RETURN(true);
3254 }
3255 
3256 /******************************************************//**
3257 Free the row log for an index that was being created online. */
3258 void
row_log_free(row_log_t * & log)3259 row_log_free(
3260 /*=========*/
3261 	row_log_t*&	log)	/*!< in,own: row log */
3262 {
3263 	MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX);
3264 
3265 	UT_DELETE(log->blobs);
3266 	row_log_block_free(log->tail);
3267 	row_log_block_free(log->head);
3268 	row_merge_file_destroy_low(log->fd);
3269 	mutex_free(&log->mutex);
3270 	ut_free(log);
3271 	log = NULL;
3272 }
3273 
3274 /******************************************************//**
3275 Get the latest transaction ID that has invoked row_log_online_op()
3276 during online creation.
3277 @return latest transaction ID, or 0 if nothing was logged */
3278 trx_id_t
row_log_get_max_trx(dict_index_t * index)3279 row_log_get_max_trx(
3280 /*================*/
3281 	dict_index_t*	index)	/*!< in: index, must be locked */
3282 {
3283 	ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION);
3284 
3285 	ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_S)
3286 	       && mutex_own(&index->online_log->mutex))
3287 	      || rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3288 
3289 	return(index->online_log->max_trx);
3290 }
3291 
3292 /******************************************************//**
3293 Applies an operation to a secondary index that was being created. */
3294 static MY_ATTRIBUTE((nonnull))
3295 void
row_log_apply_op_low(dict_index_t * index,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,bool has_index_lock,enum row_op op,trx_id_t trx_id,const dtuple_t * entry)3296 row_log_apply_op_low(
3297 /*=================*/
3298 	dict_index_t*	index,		/*!< in/out: index */
3299 	row_merge_dup_t*dup,		/*!< in/out: for reporting
3300 					duplicate key errors */
3301 	dberr_t*	error,		/*!< out: DB_SUCCESS or error code */
3302 	mem_heap_t*	offsets_heap,	/*!< in/out: memory heap for
3303 					allocating offsets; can be emptied */
3304 	bool		has_index_lock, /*!< in: true if holding index->lock
3305 					in exclusive mode */
3306 	enum row_op	op,		/*!< in: operation being applied */
3307 	trx_id_t	trx_id,		/*!< in: transaction identifier */
3308 	const dtuple_t*	entry)		/*!< in: row */
3309 {
3310 	mtr_t		mtr;
3311 	btr_cur_t	cursor;
3312 	ulint*		offsets = NULL;
3313 
3314 	ut_ad(!dict_index_is_clust(index));
3315 
3316 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)
3317 	      == has_index_lock);
3318 
3319 	ut_ad(!dict_index_is_corrupted(index));
3320 	ut_ad(trx_id != 0 || op == ROW_OP_DELETE);
3321 
3322 	DBUG_PRINT("ib_create_index",
3323 		   ("%s %s index " IB_ID_FMT "," TRX_ID_FMT ": %s",
3324 		    op == ROW_OP_INSERT ? "insert" : "delete",
3325 		    has_index_lock ? "locked" : "unlocked",
3326 		    index->id, trx_id,
3327 		    rec_printer(entry).str().c_str()));
3328 
3329 	mtr_start(&mtr);
3330 	mtr.set_named_space(index->space);
3331 
3332 	/* We perform the pessimistic variant of the operations if we
3333 	already hold index->lock exclusively. First, search the
3334 	record. The operation may already have been performed,
3335 	depending on when the row in the clustered index was
3336 	scanned. */
3337 	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
3338 				    has_index_lock
3339 				    ? BTR_MODIFY_TREE
3340 				    : BTR_MODIFY_LEAF,
3341 				    &cursor, 0, __FILE__, __LINE__,
3342 				    &mtr);
3343 
3344 	ut_ad(dict_index_get_n_unique(index) > 0);
3345 	/* This test is somewhat similar to row_ins_must_modify_rec(),
3346 	but not identical for unique secondary indexes. */
3347 	if (cursor.low_match >= dict_index_get_n_unique(index)
3348 	    && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) {
3349 		/* We have a matching record. */
3350 		bool	exists	= (cursor.low_match
3351 				   == dict_index_get_n_fields(index));
3352 #ifdef UNIV_DEBUG
3353 		rec_t*	rec	= btr_cur_get_rec(&cursor);
3354 		ut_ad(page_rec_is_user_rec(rec));
3355 		ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec)));
3356 #endif /* UNIV_DEBUG */
3357 
3358 		ut_ad(exists || dict_index_is_unique(index));
3359 
3360 		switch (op) {
3361 		case ROW_OP_DELETE:
3362 			if (!exists) {
3363 				/* The existing record matches the
3364 				unique secondary index key, but the
3365 				PRIMARY KEY columns differ. So, this
3366 				exact record does not exist. For
3367 				example, we could detect a duplicate
3368 				key error in some old index before
3369 				logging an ROW_OP_INSERT for our
3370 				index. This ROW_OP_DELETE could have
3371 				been logged for rolling back
3372 				TRX_UNDO_INSERT_REC. */
3373 				goto func_exit;
3374 			}
3375 
3376 			if (btr_cur_optimistic_delete(
3377 				    &cursor, BTR_CREATE_FLAG, &mtr)) {
3378 				*error = DB_SUCCESS;
3379 				break;
3380 			}
3381 
3382 			if (!has_index_lock) {
3383 				/* This needs a pessimistic operation.
3384 				Lock the index tree exclusively. */
3385 				mtr_commit(&mtr);
3386 				mtr_start(&mtr);
3387 				mtr.set_named_space(index->space);
3388 				btr_cur_search_to_nth_level(
3389 					index, 0, entry, PAGE_CUR_LE,
3390 					BTR_MODIFY_TREE, &cursor, 0,
3391 					__FILE__, __LINE__, &mtr);
3392 
3393 				/* No other thread than the current one
3394 				is allowed to modify the index tree.
3395 				Thus, the record should still exist. */
3396 				ut_ad(cursor.low_match
3397 				      >= dict_index_get_n_fields(index));
3398 				ut_ad(page_rec_is_user_rec(
3399 					      btr_cur_get_rec(&cursor)));
3400 			}
3401 
3402 			/* As there are no externally stored fields in
3403 			a secondary index record, the parameter
3404 			rollback=false will be ignored. */
3405 
3406 			btr_cur_pessimistic_delete(
3407 				error, FALSE, &cursor,
3408 				BTR_CREATE_FLAG, false, &mtr);
3409 			break;
3410 		case ROW_OP_INSERT:
3411 			if (exists) {
3412 				/* The record already exists. There
3413 				is nothing to be inserted.
3414 				This could happen when processing
3415 				TRX_UNDO_DEL_MARK_REC in statement
3416 				rollback:
3417 
3418 				UPDATE of PRIMARY KEY can lead to
3419 				statement rollback if the updated
3420 				value of the PRIMARY KEY already
3421 				exists. In this case, the UPDATE would
3422 				be mapped to DELETE;INSERT, and we
3423 				only wrote undo log for the DELETE
3424 				part. The duplicate key error would be
3425 				triggered before logging the INSERT
3426 				part.
3427 
3428 				Theoretically, we could also get a
3429 				similar situation when a DELETE operation
3430 				is blocked by a FOREIGN KEY constraint. */
3431 				goto func_exit;
3432 			}
3433 
3434 			if (dtuple_contains_null(entry)) {
3435 				/* The UNIQUE KEY columns match, but
3436 				there is a NULL value in the key, and
3437 				NULL!=NULL. */
3438 				goto insert_the_rec;
3439 			}
3440 
3441 			goto duplicate;
3442 		}
3443 	} else {
3444 		switch (op) {
3445 			rec_t*		rec;
3446 			big_rec_t*	big_rec;
3447 		case ROW_OP_DELETE:
3448 			/* The record does not exist. For example, we
3449 			could detect a duplicate key error in some old
3450 			index before logging an ROW_OP_INSERT for our
3451 			index. This ROW_OP_DELETE could be logged for
3452 			rolling back TRX_UNDO_INSERT_REC. */
3453 			goto func_exit;
3454 		case ROW_OP_INSERT:
3455 			if (dict_index_is_unique(index)
3456 			    && (cursor.up_match
3457 				>= dict_index_get_n_unique(index)
3458 				|| cursor.low_match
3459 				>= dict_index_get_n_unique(index))
3460 			    && (!index->n_nullable
3461 				|| !dtuple_contains_null(entry))) {
3462 duplicate:
3463 				/* Duplicate key */
3464 				ut_ad(dict_index_is_unique(index));
3465 				row_merge_dup_report(dup, entry->fields);
3466 				*error = DB_DUPLICATE_KEY;
3467 				goto func_exit;
3468 			}
3469 insert_the_rec:
3470 			/* Insert the record. As we are inserting into
3471 			a secondary index, there cannot be externally
3472 			stored columns (!big_rec). */
3473 			*error = btr_cur_optimistic_insert(
3474 				BTR_NO_UNDO_LOG_FLAG
3475 				| BTR_NO_LOCKING_FLAG
3476 				| BTR_CREATE_FLAG,
3477 				&cursor, &offsets, &offsets_heap,
3478 				const_cast<dtuple_t*>(entry),
3479 				&rec, &big_rec, 0, NULL, &mtr);
3480 			ut_ad(!big_rec);
3481 			if (*error != DB_FAIL) {
3482 				break;
3483 			}
3484 
3485 			if (!has_index_lock) {
3486 				/* This needs a pessimistic operation.
3487 				Lock the index tree exclusively. */
3488 				mtr_commit(&mtr);
3489 				mtr_start(&mtr);
3490 				mtr.set_named_space(index->space);
3491 				btr_cur_search_to_nth_level(
3492 					index, 0, entry, PAGE_CUR_LE,
3493 					BTR_MODIFY_TREE, &cursor, 0,
3494 					__FILE__, __LINE__, &mtr);
3495 			}
3496 
3497 			/* We already determined that the
3498 			record did not exist. No other thread
3499 			than the current one is allowed to
3500 			modify the index tree. Thus, the
3501 			record should still not exist. */
3502 
3503 			*error = btr_cur_pessimistic_insert(
3504 				BTR_NO_UNDO_LOG_FLAG
3505 				| BTR_NO_LOCKING_FLAG
3506 				| BTR_CREATE_FLAG,
3507 				&cursor, &offsets, &offsets_heap,
3508 				const_cast<dtuple_t*>(entry),
3509 				&rec, &big_rec,
3510 				0, NULL, &mtr);
3511 			ut_ad(!big_rec);
3512 			break;
3513 		}
3514 		mem_heap_empty(offsets_heap);
3515 	}
3516 
3517 	if (*error == DB_SUCCESS && trx_id) {
3518 		page_update_max_trx_id(btr_cur_get_block(&cursor),
3519 				       btr_cur_get_page_zip(&cursor),
3520 				       trx_id, &mtr);
3521 	}
3522 
3523 func_exit:
3524 	mtr_commit(&mtr);
3525 }
3526 
3527 /******************************************************//**
3528 Applies an operation to a secondary index that was being created.
3529 @return NULL on failure (mrec corruption) or when out of data;
3530 pointer to next record on success */
3531 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3532 const mrec_t*
row_log_apply_op(dict_index_t * index,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,mem_heap_t * heap,bool has_index_lock,const mrec_t * mrec,const mrec_t * mrec_end,ulint * offsets)3533 row_log_apply_op(
3534 /*=============*/
3535 	dict_index_t*	index,		/*!< in/out: index */
3536 	row_merge_dup_t*dup,		/*!< in/out: for reporting
3537 					duplicate key errors */
3538 	dberr_t*	error,		/*!< out: DB_SUCCESS or error code */
3539 	mem_heap_t*	offsets_heap,	/*!< in/out: memory heap for
3540 					allocating offsets; can be emptied */
3541 	mem_heap_t*	heap,		/*!< in/out: memory heap for
3542 					allocating data tuples */
3543 	bool		has_index_lock, /*!< in: true if holding index->lock
3544 					in exclusive mode */
3545 	const mrec_t*	mrec,		/*!< in: merge record */
3546 	const mrec_t*	mrec_end,	/*!< in: end of buffer */
3547 	ulint*		offsets)	/*!< in/out: work area for
3548 					rec_init_offsets_temp() */
3549 
3550 {
3551 	enum row_op	op;
3552 	ulint		extra_size;
3553 	ulint		data_size;
3554 	ulint		n_ext;
3555 	dtuple_t*	entry;
3556 	trx_id_t	trx_id;
3557 
3558 	/* Online index creation is only used for secondary indexes. */
3559 	ut_ad(!dict_index_is_clust(index));
3560 
3561 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X)
3562 	      == has_index_lock);
3563 
3564 	if (dict_index_is_corrupted(index)) {
3565 		*error = DB_INDEX_CORRUPT;
3566 		return(NULL);
3567 	}
3568 
3569 	*error = DB_SUCCESS;
3570 
3571 	if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) {
3572 		return(NULL);
3573 	}
3574 
3575 	switch (*mrec) {
3576 	case ROW_OP_INSERT:
3577 		if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) {
3578 			return(NULL);
3579 		}
3580 
3581 		op = static_cast<enum row_op>(*mrec++);
3582 		trx_id = trx_read_trx_id(mrec);
3583 		mrec += DATA_TRX_ID_LEN;
3584 		break;
3585 	case ROW_OP_DELETE:
3586 		op = static_cast<enum row_op>(*mrec++);
3587 		trx_id = 0;
3588 		break;
3589 	default:
3590 corrupted:
3591 		ut_ad(0);
3592 		*error = DB_CORRUPTION;
3593 		return(NULL);
3594 	}
3595 
3596 	extra_size = *mrec++;
3597 
3598 	ut_ad(mrec < mrec_end);
3599 
3600 	if (extra_size >= 0x80) {
3601 		/* Read another byte of extra_size. */
3602 
3603 		extra_size = (extra_size & 0x7f) << 8;
3604 		extra_size |= *mrec++;
3605 	}
3606 
3607 	mrec += extra_size;
3608 
3609 	if (mrec > mrec_end) {
3610 		return(NULL);
3611 	}
3612 
3613 	rec_init_offsets_temp(mrec, index, offsets);
3614 
3615 	if (rec_offs_any_extern(offsets)) {
3616 		/* There should never be any externally stored fields
3617 		in a secondary index, which is what online index
3618 		creation is used for. Therefore, the log file must be
3619 		corrupted. */
3620 		goto corrupted;
3621 	}
3622 
3623 	data_size = rec_offs_data_size(offsets);
3624 
3625 	mrec += data_size;
3626 
3627 	if (mrec > mrec_end) {
3628 		return(NULL);
3629 	}
3630 
3631 	entry = row_rec_to_index_entry_low(
3632 		mrec - data_size, index, offsets, &n_ext, heap);
3633 	/* Online index creation is only implemented for secondary
3634 	indexes, which never contain off-page columns. */
3635 	ut_ad(n_ext == 0);
3636 
3637 	row_log_apply_op_low(index, dup, error, offsets_heap,
3638 			     has_index_lock, op, trx_id, entry);
3639 	return(mrec);
3640 }
3641 
3642 /** Applies operations to a secondary index that was being created.
3643 @param[in]	trx	transaction (for checking if the operation was
3644 interrupted)
3645 @param[in,out]	index	index
3646 @param[in,out]	dup	for reporting duplicate key errors
3647 @param[in,out]	stage	performance schema accounting object, used by
3648 ALTER TABLE. If not NULL, then stage->inc() will be called for each block
3649 of log that is applied.
3650 @return DB_SUCCESS, or error code on failure */
3651 static
3652 dberr_t
row_log_apply_ops(const trx_t * trx,dict_index_t * index,row_merge_dup_t * dup,ut_stage_alter_t * stage)3653 row_log_apply_ops(
3654 	const trx_t*		trx,
3655 	dict_index_t*		index,
3656 	row_merge_dup_t*	dup,
3657 	ut_stage_alter_t*	stage)
3658 {
3659 	dberr_t		error;
3660 	const mrec_t*	mrec	= NULL;
3661 	const mrec_t*	next_mrec;
3662 	const mrec_t*	mrec_end= NULL; /* silence bogus warning */
3663 	const mrec_t*	next_mrec_end;
3664 	mem_heap_t*	offsets_heap;
3665 	mem_heap_t*	heap;
3666 	ulint*		offsets;
3667 	bool		has_index_lock;
3668 	const ulint	i	= 1 + REC_OFFS_HEADER_SIZE
3669 		+ dict_index_get_n_fields(index);
3670 
3671 	ut_ad(dict_index_is_online_ddl(index));
3672 	ut_ad(!index->is_committed());
3673 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3674 	ut_ad(index->online_log);
3675 	UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
3676 
3677 	offsets = static_cast<ulint*>(ut_malloc_nokey(i * sizeof *offsets));
3678 	offsets[0] = i;
3679 	offsets[1] = dict_index_get_n_fields(index);
3680 
3681 	offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
3682 	heap = mem_heap_create(UNIV_PAGE_SIZE);
3683 	has_index_lock = true;
3684 
3685 next_block:
3686 	ut_ad(has_index_lock);
3687 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
3688 	ut_ad(index->online_log->head.bytes == 0);
3689 
3690 	stage->inc(row_log_progress_inc_per_block());
3691 
3692 	if (trx_is_interrupted(trx)) {
3693 		goto interrupted;
3694 	}
3695 
3696 	error = index->online_log->error;
3697 	if (error != DB_SUCCESS) {
3698 		goto func_exit;
3699 	}
3700 
3701 	if (dict_index_is_corrupted(index)) {
3702 		error = DB_INDEX_CORRUPT;
3703 		goto func_exit;
3704 	}
3705 
3706 	if (UNIV_UNLIKELY(index->online_log->head.blocks
3707 			  > index->online_log->tail.blocks)) {
3708 unexpected_eof:
3709 		ib::error() << "Unexpected end of temporary file for index "
3710 			<< index->name;
3711 corruption:
3712 		error = DB_CORRUPTION;
3713 		goto func_exit;
3714 	}
3715 
3716 	if (index->online_log->head.blocks
3717 	    == index->online_log->tail.blocks) {
3718 		if (index->online_log->head.blocks) {
3719 #ifdef HAVE_FTRUNCATE
3720 			/* Truncate the file in order to save space. */
3721 			if (index->online_log->fd > 0
3722 			    && ftruncate(index->online_log->fd, 0) == -1) {
3723 				perror("ftruncate");
3724 			}
3725 #endif /* HAVE_FTRUNCATE */
3726 			index->online_log->head.blocks
3727 				= index->online_log->tail.blocks = 0;
3728 		}
3729 
3730 		next_mrec = index->online_log->tail.block;
3731 		next_mrec_end = next_mrec + index->online_log->tail.bytes;
3732 
3733 		if (next_mrec_end == next_mrec) {
3734 			/* End of log reached. */
3735 all_done:
3736 			ut_ad(has_index_lock);
3737 			ut_ad(index->online_log->head.blocks == 0);
3738 			ut_ad(index->online_log->tail.blocks == 0);
3739 			error = DB_SUCCESS;
3740 			goto func_exit;
3741 		}
3742 	} else {
3743 		os_offset_t	ofs;
3744 
3745 		ofs = (os_offset_t) index->online_log->head.blocks
3746 			* srv_sort_buf_size;
3747 
3748 		ut_ad(has_index_lock);
3749 		has_index_lock = false;
3750 		rw_lock_x_unlock(dict_index_get_lock(index));
3751 
3752 		log_free_check();
3753 
3754 		if (!row_log_block_allocate(index->online_log->head)) {
3755 			error = DB_OUT_OF_MEMORY;
3756 			goto func_exit;
3757 		}
3758 
3759 		IORequest	request(IORequest::READ | IORequest::ROW_LOG);
3760 		dberr_t	err = os_file_read_no_error_handling_int_fd(
3761 			request,
3762 				index->online_log->fd,
3763 			index->online_log->head.block, ofs,
3764 			srv_sort_buf_size,
3765 			NULL);
3766 
3767 		if (err != DB_SUCCESS) {
3768 			ib::error()
3769 				<< "Unable to read temporary file"
3770 				" for index " << index->name;
3771 			goto corruption;
3772 		}
3773 
3774 #ifdef POSIX_FADV_DONTNEED
3775 		/* Each block is read exactly once.  Free up the file cache. */
3776 		posix_fadvise(index->online_log->fd,
3777 			      ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
3778 #endif /* POSIX_FADV_DONTNEED */
3779 
3780 		next_mrec = index->online_log->head.block;
3781 		next_mrec_end = next_mrec + srv_sort_buf_size;
3782 	}
3783 
3784 	if (mrec) {
3785 		/* A partial record was read from the previous block.
3786 		Copy the temporary buffer full, as we do not know the
3787 		length of the record. Parse subsequent records from
3788 		the bigger buffer index->online_log->head.block
3789 		or index->online_log->tail.block. */
3790 
3791 		ut_ad(mrec == index->online_log->head.buf);
3792 		ut_ad(mrec_end > mrec);
3793 		ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
3794 
3795 		memcpy((mrec_t*) mrec_end, next_mrec,
3796 		       (&index->online_log->head.buf)[1] - mrec_end);
3797 		mrec = row_log_apply_op(
3798 			index, dup, &error, offsets_heap, heap,
3799 			has_index_lock, index->online_log->head.buf,
3800 			(&index->online_log->head.buf)[1], offsets);
3801 		if (error != DB_SUCCESS) {
3802 			goto func_exit;
3803 		} else if (UNIV_UNLIKELY(mrec == NULL)) {
3804 			/* The record was not reassembled properly. */
3805 			goto corruption;
3806 		}
3807 		/* The record was previously found out to be
3808 		truncated. Now that the parse buffer was extended,
3809 		it should proceed beyond the old end of the buffer. */
3810 		ut_a(mrec > mrec_end);
3811 
3812 		index->online_log->head.bytes = mrec - mrec_end;
3813 		next_mrec += index->online_log->head.bytes;
3814 	}
3815 
3816 	ut_ad(next_mrec <= next_mrec_end);
3817 	/* The following loop must not be parsing the temporary
3818 	buffer, but head.block or tail.block. */
3819 
3820 	/* mrec!=NULL means that the next record starts from the
3821 	middle of the block */
3822 	ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
3823 
3824 #ifdef UNIV_DEBUG
3825 	if (next_mrec_end == index->online_log->head.block
3826 	    + srv_sort_buf_size) {
3827 		/* If tail.bytes == 0, next_mrec_end can also be at
3828 		the end of tail.block. */
3829 		if (index->online_log->tail.bytes == 0) {
3830 			ut_ad(next_mrec == next_mrec_end);
3831 			ut_ad(index->online_log->tail.blocks == 0);
3832 			ut_ad(index->online_log->head.blocks == 0);
3833 			ut_ad(index->online_log->head.bytes == 0);
3834 		} else {
3835 			ut_ad(next_mrec == index->online_log->head.block
3836 			      + index->online_log->head.bytes);
3837 			ut_ad(index->online_log->tail.blocks
3838 			      > index->online_log->head.blocks);
3839 		}
3840 	} else if (next_mrec_end == index->online_log->tail.block
3841 		   + index->online_log->tail.bytes) {
3842 		ut_ad(next_mrec == index->online_log->tail.block
3843 		      + index->online_log->head.bytes);
3844 		ut_ad(index->online_log->tail.blocks == 0);
3845 		ut_ad(index->online_log->head.blocks == 0);
3846 		ut_ad(index->online_log->head.bytes
3847 		      <= index->online_log->tail.bytes);
3848 	} else {
3849 		ut_error;
3850 	}
3851 #endif /* UNIV_DEBUG */
3852 
3853 	mrec_end = next_mrec_end;
3854 
3855 	while (!trx_is_interrupted(trx)) {
3856 		mrec = next_mrec;
3857 		ut_ad(mrec < mrec_end);
3858 
3859 		if (!has_index_lock) {
3860 			/* We are applying operations from a different
3861 			block than the one that is being written to.
3862 			We do not hold index->lock in order to
3863 			allow other threads to concurrently buffer
3864 			modifications. */
3865 			ut_ad(mrec >= index->online_log->head.block);
3866 			ut_ad(mrec_end == index->online_log->head.block
3867 			      + srv_sort_buf_size);
3868 			ut_ad(index->online_log->head.bytes
3869 			      < srv_sort_buf_size);
3870 
3871 			/* Take the opportunity to do a redo log
3872 			checkpoint if needed. */
3873 			log_free_check();
3874 		} else {
3875 			/* We are applying operations from the last block.
3876 			Do not allow other threads to buffer anything,
3877 			so that we can finally catch up and synchronize. */
3878 			ut_ad(index->online_log->head.blocks == 0);
3879 			ut_ad(index->online_log->tail.blocks == 0);
3880 			ut_ad(mrec_end == index->online_log->tail.block
3881 			      + index->online_log->tail.bytes);
3882 			ut_ad(mrec >= index->online_log->tail.block);
3883 		}
3884 
3885 		next_mrec = row_log_apply_op(
3886 			index, dup, &error, offsets_heap, heap,
3887 			has_index_lock, mrec, mrec_end, offsets);
3888 
3889 		if (error != DB_SUCCESS) {
3890 			goto func_exit;
3891 		} else if (next_mrec == next_mrec_end) {
3892 			/* The record happened to end on a block boundary.
3893 			Do we have more blocks left? */
3894 			if (has_index_lock) {
3895 				/* The index will be locked while
3896 				applying the last block. */
3897 				goto all_done;
3898 			}
3899 
3900 			mrec = NULL;
3901 process_next_block:
3902 			rw_lock_x_lock(dict_index_get_lock(index));
3903 			has_index_lock = true;
3904 
3905 			index->online_log->head.bytes = 0;
3906 			index->online_log->head.blocks++;
3907 			goto next_block;
3908 		} else if (next_mrec != NULL) {
3909 			ut_ad(next_mrec < next_mrec_end);
3910 			index->online_log->head.bytes += next_mrec - mrec;
3911 		} else if (has_index_lock) {
3912 			/* When mrec is within tail.block, it should
3913 			be a complete record, because we are holding
3914 			index->lock and thus excluding the writer. */
3915 			ut_ad(index->online_log->tail.blocks == 0);
3916 			ut_ad(mrec_end == index->online_log->tail.block
3917 			      + index->online_log->tail.bytes);
3918 			ut_ad(0);
3919 			goto unexpected_eof;
3920 		} else {
3921 			memcpy(index->online_log->head.buf, mrec,
3922 			       mrec_end - mrec);
3923 			mrec_end += index->online_log->head.buf - mrec;
3924 			mrec = index->online_log->head.buf;
3925 			goto process_next_block;
3926 		}
3927 	}
3928 
3929 interrupted:
3930 	error = DB_INTERRUPTED;
3931 func_exit:
3932 	if (!has_index_lock) {
3933 		rw_lock_x_lock(dict_index_get_lock(index));
3934 	}
3935 
3936 	switch (error) {
3937 	case DB_SUCCESS:
3938 		break;
3939 	case DB_INDEX_CORRUPT:
3940 		if (((os_offset_t) index->online_log->tail.blocks + 1)
3941 		    * srv_sort_buf_size >= srv_online_max_size) {
3942 			/* The log file grew too big. */
3943 			error = DB_ONLINE_LOG_TOO_BIG;
3944 		}
3945 		/* fall through */
3946 	default:
3947 		/* We set the flag directly instead of invoking
3948 		dict_set_corrupted_index_cache_only(index) here,
3949 		because the index is not "public" yet. */
3950 		index->type |= DICT_CORRUPT;
3951 	}
3952 
3953 	mem_heap_free(heap);
3954 	mem_heap_free(offsets_heap);
3955 	row_log_block_free(index->online_log->head);
3956 	ut_free(offsets);
3957 	return(error);
3958 }
3959 
3960 /** Apply the row log to the index upon completing index creation.
3961 @param[in]	trx	transaction (for checking if the operation was
3962 interrupted)
3963 @param[in,out]	index	secondary index
3964 @param[in,out]	table	MySQL table (for reporting duplicates)
3965 @param[in,out]	stage	performance schema accounting object, used by
3966 ALTER TABLE. stage->begin_phase_log_index() will be called initially and then
3967 stage->inc() will be called for each block of log that is applied.
3968 @return DB_SUCCESS, or error code on failure */
3969 dberr_t
row_log_apply(const trx_t * trx,dict_index_t * index,struct TABLE * table,ut_stage_alter_t * stage)3970 row_log_apply(
3971 	const trx_t*		trx,
3972 	dict_index_t*		index,
3973 	struct TABLE*		table,
3974 	ut_stage_alter_t*	stage)
3975 {
3976 	dberr_t		error;
3977 	row_log_t*	log;
3978 	row_merge_dup_t	dup = { index, table, NULL, 0 };
3979 	DBUG_ENTER("row_log_apply");
3980 
3981 	ut_ad(dict_index_is_online_ddl(index));
3982 	ut_ad(!dict_index_is_clust(index));
3983 
3984 	stage->begin_phase_log_index();
3985 
3986 	log_free_check();
3987 
3988 	rw_lock_x_lock(dict_index_get_lock(index));
3989 
3990 	if (!dict_table_is_corrupted(index->table)) {
3991 		error = row_log_apply_ops(trx, index, &dup, stage);
3992 	} else {
3993 		error = DB_SUCCESS;
3994 	}
3995 
3996 	if (error != DB_SUCCESS) {
3997 		ut_a(!dict_table_is_discarded(index->table));
3998 		/* We set the flag directly instead of invoking
3999 		dict_set_corrupted_index_cache_only(index) here,
4000 		because the index is not "public" yet. */
4001 		index->type |= DICT_CORRUPT;
4002 		index->table->drop_aborted = TRUE;
4003 
4004 		dict_index_set_online_status(index, ONLINE_INDEX_ABORTED);
4005 	} else {
4006 		ut_ad(dup.n_dup == 0);
4007 		dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE);
4008 	}
4009 
4010 	log = index->online_log;
4011 	index->online_log = NULL;
4012 	rw_lock_x_unlock(dict_index_get_lock(index));
4013 
4014 	row_log_free(log);
4015 
4016 	DBUG_RETURN(error);
4017 }
4018