1 /*****************************************************************************
2 
3 Copyright (c) 2011, 2018, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0log.cc
29 Modification log for online index creation and online table rebuild
30 
31 Created 2011-05-26 Marko Makela
32 *******************************************************/
33 
34 #include "row0log.h"
35 
36 #ifdef UNIV_NONINL
37 #include "row0log.ic"
38 #endif
39 
40 #include "row0row.h"
41 #include "row0ins.h"
42 #include "row0upd.h"
43 #include "row0merge.h"
44 #include "row0ext.h"
45 #include "data0data.h"
46 #include "que0que.h"
47 #include "handler0alter.h"
48 
49 #include<map>
50 
51 /** Table row modification operations during online table rebuild.
52 Delete-marked records are not copied to the rebuilt table. */
53 enum row_tab_op {
54 	/** Insert a record */
55 	ROW_T_INSERT = 0x41,
56 	/** Update a record in place */
57 	ROW_T_UPDATE,
58 	/** Delete (purge) a record */
59 	ROW_T_DELETE
60 };
61 
62 /** Index record modification operations during online index creation */
63 enum row_op {
64 	/** Insert a record */
65 	ROW_OP_INSERT = 0x61,
66 	/** Delete a record */
67 	ROW_OP_DELETE
68 };
69 
70 #ifdef UNIV_DEBUG
71 /** Write information about the applied record to the error log */
72 # define ROW_LOG_APPLY_PRINT
73 #endif /* UNIV_DEBUG */
74 
75 #ifdef ROW_LOG_APPLY_PRINT
76 /** When set, write information about the applied record to the error log */
77 static bool row_log_apply_print;
78 #endif /* ROW_LOG_APPLY_PRINT */
79 
80 /** Size of the modification log entry header, in bytes */
81 #define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/
82 
83 /** Log block for modifications during online ALTER TABLE */
84 struct row_log_buf_t {
85 	byte*		block;	/*!< file block buffer */
86 	mrec_buf_t	buf;	/*!< buffer for accessing a record
87 				that spans two blocks */
88 	ulint		blocks; /*!< current position in blocks */
89 	ulint		bytes;	/*!< current position within block */
90 	ulonglong	total;	/*!< logical position, in bytes from
91 				the start of the row_log_table log;
92 				0 for row_log_online_op() and
93 				row_log_apply(). */
94 	ulint		size;	/*!< allocated size of block */
95 };
96 
97 /** Tracks BLOB allocation during online ALTER TABLE */
98 class row_log_table_blob_t {
99 public:
100 	/** Constructor (declaring a BLOB freed)
101 	@param offset_arg	row_log_t::tail::total */
102 #ifdef UNIV_DEBUG
row_log_table_blob_t(ulonglong offset_arg)103 	row_log_table_blob_t(ulonglong offset_arg) :
104 		old_offset (0), free_offset (offset_arg),
105 		offset (BLOB_FREED) {}
106 #else /* UNIV_DEBUG */
107 	row_log_table_blob_t() :
108 		offset (BLOB_FREED) {}
109 #endif /* UNIV_DEBUG */
110 
111 	/** Declare a BLOB freed again.
112 	@param offset_arg	row_log_t::tail::total */
113 #ifdef UNIV_DEBUG
blob_free(ulonglong offset_arg)114 	void blob_free(ulonglong offset_arg)
115 #else /* UNIV_DEBUG */
116 	void blob_free()
117 #endif /* UNIV_DEBUG */
118 	{
119 		ut_ad(offset < offset_arg);
120 		ut_ad(offset != BLOB_FREED);
121 		ut_d(old_offset = offset);
122 		ut_d(free_offset = offset_arg);
123 		offset = BLOB_FREED;
124 	}
125 	/** Declare a freed BLOB reused.
126 	@param offset_arg	row_log_t::tail::total */
blob_alloc(ulonglong offset_arg)127 	void blob_alloc(ulonglong offset_arg) {
128 		ut_ad(free_offset <= offset_arg);
129 		ut_d(old_offset = offset);
130 		offset = offset_arg;
131 	}
132 	/** Determine if a BLOB was freed at a given log position
133 	@param offset_arg	row_log_t::head::total after the log record
134 	@return true if freed */
is_freed(ulonglong offset_arg) const135 	bool is_freed(ulonglong offset_arg) const {
136 		/* This is supposed to be the offset at the end of the
137 		current log record. */
138 		ut_ad(offset_arg > 0);
139 		/* We should never get anywhere close the magic value. */
140 		ut_ad(offset_arg < BLOB_FREED);
141 		return(offset_arg < offset);
142 	}
143 private:
144 	/** Magic value for a freed BLOB */
145 	static const ulonglong BLOB_FREED = ~0ULL;
146 #ifdef UNIV_DEBUG
147 	/** Old offset, in case a page was freed, reused, freed, ... */
148 	ulonglong	old_offset;
149 	/** Offset of last blob_free() */
150 	ulonglong	free_offset;
151 #endif /* UNIV_DEBUG */
152 	/** Byte offset to the log file */
153 	ulonglong	offset;
154 };
155 
156 /** @brief Map of off-page column page numbers to 0 or log byte offsets.
157 
158 If there is no mapping for a page number, it is safe to access.
159 If a page number maps to 0, it is an off-page column that has been freed.
160 If a page number maps to a nonzero number, the number is a byte offset
161 into the index->online_log, indicating that the page is safe to access
162 when applying log records starting from that offset. */
163 typedef std::map<ulint, row_log_table_blob_t> page_no_map;
164 
165 /** @brief Buffer for logging modifications during online index creation
166 
167 All modifications to an index that is being created will be logged by
168 row_log_online_op() to this buffer.
169 
170 All modifications to a table that is being rebuilt will be logged by
171 row_log_table_delete(), row_log_table_update(), row_log_table_insert()
172 to this buffer.
173 
174 When head.blocks == tail.blocks, the reader will access tail.block
175 directly. When also head.bytes == tail.bytes, both counts will be
176 reset to 0 and the file will be truncated. */
177 struct row_log_t {
178 	int		fd;	/*!< file descriptor */
179 	ib_mutex_t	mutex;	/*!< mutex protecting error,
180 				max_trx and tail */
181 	page_no_map*	blobs;	/*!< map of page numbers of off-page columns
182 				that have been freed during table-rebuilding
183 				ALTER TABLE (row_log_table_*); protected by
184 				index->lock X-latch only */
185 	dict_table_t*	table;	/*!< table that is being rebuilt,
186 				or NULL when this is a secondary
187 				index that is being created online */
188 	bool		same_pk;/*!< whether the definition of the PRIMARY KEY
189 				has remained the same */
190 	const dtuple_t*	add_cols;
191 				/*!< default values of added columns, or NULL */
192 	const ulint*	col_map;/*!< mapping of old column numbers to
193 				new ones, or NULL if !table */
194 	dberr_t		error;	/*!< error that occurred during online
195 				table rebuild */
196 	trx_id_t	max_trx;/*!< biggest observed trx_id in
197 				row_log_online_op();
198 				protected by mutex and index->lock S-latch,
199 				or by index->lock X-latch only */
200 	row_log_buf_t	tail;	/*!< writer context;
201 				protected by mutex and index->lock S-latch,
202 				or by index->lock X-latch only */
203 	row_log_buf_t	head;	/*!< reader context; protected by MDL only;
204 				modifiable by row_log_apply_ops() */
205 	const char*	path;	/*!< where to create temporary file during
206 				log operation */
207 };
208 
209 /** Create the file or online log if it does not exist.
210 @param[in,out]	log	online rebuild log
211 @return file descriptor. */
212 static MY_ATTRIBUTE((warn_unused_result))
213 int
row_log_tmpfile(row_log_t * log)214 row_log_tmpfile(
215 	row_log_t*	log)
216 {
217 	DBUG_ENTER("row_log_tmpfile");
218 	if (log->fd < 0) {
219 		log->fd = row_merge_file_create_low(log->path);
220 	}
221 
222 	DBUG_RETURN(log->fd);
223 }
224 
225 /** Allocate the memory for the log buffer.
226 @param[in,out]	log_buf	Buffer used for log operation
227 @return TRUE if success, false if not */
228 static MY_ATTRIBUTE((warn_unused_result))
229 bool
row_log_block_allocate(row_log_buf_t & log_buf)230 row_log_block_allocate(
231 	row_log_buf_t&	log_buf)
232 {
233 	DBUG_ENTER("row_log_block_allocate");
234 	if (log_buf.block == NULL) {
235 		log_buf.size = srv_sort_buf_size;
236 		log_buf.block = (byte*) os_mem_alloc_large(&log_buf.size, false);
237 		DBUG_EXECUTE_IF("simulate_row_log_allocation_failure",
238 			if (log_buf.block)
239 				os_mem_free_large(log_buf.block, log_buf.size);
240 			log_buf.block = NULL;);
241 		if (!log_buf.block) {
242 			DBUG_RETURN(false);
243 		}
244 	}
245 	DBUG_RETURN(true);
246 }
247 
248 /** Free the log buffer.
249 @param[in,out]	log_buf	Buffer used for log operation */
250 static
251 void
row_log_block_free(row_log_buf_t & log_buf)252 row_log_block_free(
253 	row_log_buf_t&	log_buf)
254 {
255 	DBUG_ENTER("row_log_block_free");
256 	if (log_buf.block != NULL) {
257 		os_mem_free_large(log_buf.block, log_buf.size);
258 		log_buf.block = NULL;
259 	}
260 	DBUG_VOID_RETURN;
261 }
262 
263 /******************************************************//**
264 Logs an operation to a secondary index that is (or was) being created. */
265 UNIV_INTERN
266 void
row_log_online_op(dict_index_t * index,const dtuple_t * tuple,trx_id_t trx_id)267 row_log_online_op(
268 /*==============*/
269 	dict_index_t*	index,	/*!< in/out: index, S or X latched */
270 	const dtuple_t* tuple,	/*!< in: index tuple */
271 	trx_id_t	trx_id)	/*!< in: transaction ID for insert,
272 				or 0 for delete */
273 {
274 	byte*		b;
275 	ulint		extra_size;
276 	ulint		size;
277 	ulint		mrec_size;
278 	ulint		avail_size;
279 	row_log_t*	log;
280 
281 	ut_ad(dtuple_validate(tuple));
282 	ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index));
283 #ifdef UNIV_SYNC_DEBUG
284 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED)
285 	      || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
286 #endif /* UNIV_SYNC_DEBUG */
287 
288 	if (dict_index_is_corrupted(index)) {
289 		return;
290 	}
291 
292 	ut_ad(dict_index_is_online_ddl(index));
293 
294 	/* Compute the size of the record. This differs from
295 	row_merge_buf_encode(), because here we do not encode
296 	extra_size+1 (and reserve 0 as the end-of-chunk marker). */
297 
298 	size = rec_get_converted_size_temp(
299 		index, tuple->fields, tuple->n_fields, &extra_size);
300 	ut_ad(size >= extra_size);
301 	ut_ad(size <= sizeof log->tail.buf);
302 
303 	mrec_size = ROW_LOG_HEADER_SIZE
304 		+ (extra_size >= 0x80) + size
305 		+ (trx_id ? DATA_TRX_ID_LEN : 0);
306 
307 	log = index->online_log;
308 	mutex_enter(&log->mutex);
309 
310 	if (trx_id > log->max_trx) {
311 		log->max_trx = trx_id;
312 	}
313 
314 	if (!row_log_block_allocate(log->tail)) {
315 		log->error = DB_OUT_OF_MEMORY;
316 		goto err_exit;
317 	}
318 
319 	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
320 
321 	ut_ad(log->tail.bytes < srv_sort_buf_size);
322 	avail_size = srv_sort_buf_size - log->tail.bytes;
323 
324 	if (mrec_size > avail_size) {
325 		b = log->tail.buf;
326 	} else {
327 		b = log->tail.block + log->tail.bytes;
328 	}
329 
330 	if (trx_id != 0) {
331 		*b++ = ROW_OP_INSERT;
332 		trx_write_trx_id(b, trx_id);
333 		b += DATA_TRX_ID_LEN;
334 	} else {
335 		*b++ = ROW_OP_DELETE;
336 	}
337 
338 	if (extra_size < 0x80) {
339 		*b++ = (byte) extra_size;
340 	} else {
341 		ut_ad(extra_size < 0x8000);
342 		*b++ = (byte) (0x80 | (extra_size >> 8));
343 		*b++ = (byte) extra_size;
344 	}
345 
346 	rec_convert_dtuple_to_temp(
347 		b + extra_size, index, tuple->fields, tuple->n_fields);
348 	b += size;
349 
350 	if (mrec_size >= avail_size) {
351 		const os_offset_t	byte_offset
352 			= (os_offset_t) log->tail.blocks
353 			* srv_sort_buf_size;
354 		ibool			ret;
355 
356 		if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
357 			goto write_failed;
358 		}
359 
360 		if (mrec_size == avail_size) {
361 			ut_ad(b == &log->tail.block[srv_sort_buf_size]);
362 		} else {
363 			ut_ad(b == log->tail.buf + mrec_size);
364 			memcpy(log->tail.block + log->tail.bytes,
365 			       log->tail.buf, avail_size);
366 		}
367 		UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
368 
369 		if (row_log_tmpfile(log) < 0) {
370 			log->error = DB_OUT_OF_MEMORY;
371 			goto err_exit;
372 		}
373 
374 		ret = os_file_write_int_fd(
375 			"(modification log)",
376 			log->fd,
377 			log->tail.block, byte_offset, srv_sort_buf_size);
378 		log->tail.blocks++;
379 		if (!ret) {
380 write_failed:
381 			/* We set the flag directly instead of invoking
382 			dict_set_corrupted_index_cache_only(index) here,
383 			because the index is not "public" yet. */
384 			index->type |= DICT_CORRUPT;
385 		}
386 		UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
387 		memcpy(log->tail.block, log->tail.buf + avail_size,
388 		       mrec_size - avail_size);
389 		log->tail.bytes = mrec_size - avail_size;
390 	} else {
391 		log->tail.bytes += mrec_size;
392 		ut_ad(b == log->tail.block + log->tail.bytes);
393 	}
394 
395 	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
396 err_exit:
397 	mutex_exit(&log->mutex);
398 }
399 
400 /******************************************************//**
401 Gets the error status of the online index rebuild log.
402 @return DB_SUCCESS or error code */
403 UNIV_INTERN
404 dberr_t
row_log_table_get_error(const dict_index_t * index)405 row_log_table_get_error(
406 /*====================*/
407 	const dict_index_t*	index)	/*!< in: clustered index of a table
408 					that is being rebuilt online */
409 {
410 	ut_ad(dict_index_is_clust(index));
411 	ut_ad(dict_index_is_online_ddl(index));
412 	return(index->online_log->error);
413 }
414 
415 /******************************************************//**
416 Starts logging an operation to a table that is being rebuilt.
417 @return pointer to log, or NULL if no logging is necessary */
418 static MY_ATTRIBUTE((nonnull, warn_unused_result))
419 byte*
row_log_table_open(row_log_t * log,ulint size,ulint * avail)420 row_log_table_open(
421 /*===============*/
422 	row_log_t*	log,	/*!< in/out: online rebuild log */
423 	ulint		size,	/*!< in: size of log record */
424 	ulint*		avail)	/*!< out: available size for log record */
425 {
426 	mutex_enter(&log->mutex);
427 
428 	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
429 
430 	if (log->error != DB_SUCCESS) {
431 err_exit:
432 		mutex_exit(&log->mutex);
433 		return(NULL);
434 	}
435 
436 	if (!row_log_block_allocate(log->tail)) {
437 		log->error = DB_OUT_OF_MEMORY;
438 		goto err_exit;
439 	}
440 
441 	ut_ad(log->tail.bytes < srv_sort_buf_size);
442 	*avail = srv_sort_buf_size - log->tail.bytes;
443 
444 	if (size > *avail) {
445 		return(log->tail.buf);
446 	} else {
447 		return(log->tail.block + log->tail.bytes);
448 	}
449 }
450 
451 /******************************************************//**
452 Stops logging an operation to a table that is being rebuilt. */
453 static MY_ATTRIBUTE((nonnull))
454 void
row_log_table_close_func(row_log_t * log,const byte * b,ulint size,ulint avail)455 row_log_table_close_func(
456 /*=====================*/
457 	row_log_t*	log,	/*!< in/out: online rebuild log */
458 #ifdef UNIV_DEBUG
459 	const byte*	b,	/*!< in: end of log record */
460 #endif /* UNIV_DEBUG */
461 	ulint		size,	/*!< in: size of log record */
462 	ulint		avail)	/*!< in: available size for log record */
463 {
464 	ut_ad(mutex_own(&log->mutex));
465 
466 	if (size >= avail) {
467 		const os_offset_t	byte_offset
468 			= (os_offset_t) log->tail.blocks
469 			* srv_sort_buf_size;
470 		ibool			ret;
471 
472 		if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
473 			goto write_failed;
474 		}
475 
476 		if (size == avail) {
477 			ut_ad(b == &log->tail.block[srv_sort_buf_size]);
478 		} else {
479 			ut_ad(b == log->tail.buf + size);
480 			memcpy(log->tail.block + log->tail.bytes,
481 			       log->tail.buf, avail);
482 		}
483 		UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
484 
485 		if (row_log_tmpfile(log) < 0) {
486 			log->error = DB_OUT_OF_MEMORY;
487 			goto err_exit;
488 		}
489 
490 		ret = os_file_write_int_fd(
491 			"(modification log)",
492 			log->fd,
493 			log->tail.block, byte_offset, srv_sort_buf_size);
494 		log->tail.blocks++;
495 		if (!ret) {
496 write_failed:
497 			log->error = DB_ONLINE_LOG_TOO_BIG;
498 		}
499 		UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
500 		memcpy(log->tail.block, log->tail.buf + avail, size - avail);
501 		log->tail.bytes = size - avail;
502 	} else {
503 		log->tail.bytes += size;
504 		ut_ad(b == log->tail.block + log->tail.bytes);
505 	}
506 
507 	log->tail.total += size;
508 	UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
509 err_exit:
510 	mutex_exit(&log->mutex);
511 }
512 
513 #ifdef UNIV_DEBUG
514 # define row_log_table_close(log, b, size, avail)	\
515 	row_log_table_close_func(log, b, size, avail)
516 #else /* UNIV_DEBUG */
517 # define row_log_table_close(log, b, size, avail)	\
518 	row_log_table_close_func(log, size, avail)
519 #endif /* UNIV_DEBUG */
520 
521 /******************************************************//**
522 Logs a delete operation to a table that is being rebuilt.
523 This will be merged in row_log_table_apply_delete(). */
524 UNIV_INTERN
525 void
row_log_table_delete(const rec_t * rec,dict_index_t * index,const ulint * offsets,const byte * sys)526 row_log_table_delete(
527 /*=================*/
528 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
529 				page X-latched */
530 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
531 				or X-latched */
532 	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
533 	const byte*	sys)	/*!< in: DB_TRX_ID,DB_ROLL_PTR that should
534 				be logged, or NULL to use those in rec */
535 {
536 	ulint		old_pk_extra_size;
537 	ulint		old_pk_size;
538 	ulint		ext_size = 0;
539 	ulint		mrec_size;
540 	ulint		avail_size;
541 	mem_heap_t*	heap		= NULL;
542 	const dtuple_t*	old_pk;
543 	row_ext_t*	ext;
544 
545 	ut_ad(dict_index_is_clust(index));
546 	ut_ad(rec_offs_validate(rec, index, offsets));
547 	ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
548 	ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
549 #ifdef UNIV_SYNC_DEBUG
550 	ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
551 	      || rw_lock_own(&index->lock, RW_LOCK_EX));
552 #endif /* UNIV_SYNC_DEBUG */
553 
554 	if (dict_index_is_corrupted(index)
555 	    || !dict_index_is_online_ddl(index)
556 	    || index->online_log->error != DB_SUCCESS) {
557 		return;
558 	}
559 
560 	dict_table_t* new_table = index->online_log->table;
561 	dict_index_t* new_index = dict_table_get_first_index(new_table);
562 
563 	ut_ad(dict_index_is_clust(new_index));
564 	ut_ad(!dict_index_is_online_ddl(new_index));
565 
566 	/* Create the tuple PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in new_table. */
567 	if (index->online_log->same_pk) {
568 		dtuple_t*	tuple;
569 		ut_ad(new_index->n_uniq == index->n_uniq);
570 
571 		/* The PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR are in the first
572 		fields of the record. */
573 		heap = mem_heap_create(
574 			DATA_TRX_ID_LEN
575 			+ DTUPLE_EST_ALLOC(new_index->n_uniq + 2));
576 		old_pk = tuple = dtuple_create(heap, new_index->n_uniq + 2);
577 		dict_index_copy_types(tuple, new_index, tuple->n_fields);
578 		dtuple_set_n_fields_cmp(tuple, new_index->n_uniq);
579 
580 		for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
581 			ulint		len;
582 			const void*	field	= rec_get_nth_field(
583 				rec, offsets, i, &len);
584 			dfield_t*	dfield	= dtuple_get_nth_field(
585 				tuple, i);
586 			ut_ad(len != UNIV_SQL_NULL);
587 			ut_ad(!rec_offs_nth_extern(offsets, i));
588 			dfield_set_data(dfield, field, len);
589 		}
590 
591 		if (sys) {
592 			dfield_set_data(
593 				dtuple_get_nth_field(tuple,
594 						     new_index->n_uniq),
595 				sys, DATA_TRX_ID_LEN);
596 			dfield_set_data(
597 				dtuple_get_nth_field(tuple,
598 						     new_index->n_uniq + 1),
599 				sys + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
600 		}
601 	} else {
602 		/* The PRIMARY KEY has changed. Translate the tuple. */
603 		old_pk = row_log_table_get_pk(
604 			rec, index, offsets, NULL, &heap);
605 
606 		if (!old_pk) {
607 			ut_ad(index->online_log->error != DB_SUCCESS);
608 			if (heap) {
609 				goto func_exit;
610 			}
611 			return;
612 		}
613 	}
614 
615 	ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
616 		      old_pk, old_pk->n_fields - 2)->len);
617 	ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
618 		      old_pk, old_pk->n_fields - 1)->len);
619 	old_pk_size = rec_get_converted_size_temp(
620 		new_index, old_pk->fields, old_pk->n_fields,
621 		&old_pk_extra_size);
622 	ut_ad(old_pk_extra_size < 0x100);
623 
624 	mrec_size = 6 + old_pk_size;
625 
626 	/* Log enough prefix of the BLOB unless both the
627 	old and new table are in COMPACT or REDUNDANT format,
628 	which store the prefix in the clustered index record. */
629 	if (rec_offs_any_extern(offsets)
630 	    && (dict_table_get_format(index->table) >= UNIV_FORMAT_B
631 		|| dict_table_get_format(new_table) >= UNIV_FORMAT_B)) {
632 
633 		/* Build a cache of those off-page column prefixes
634 		that are referenced by secondary indexes. It can be
635 		that none of the off-page columns are needed. */
636 		row_build(ROW_COPY_DATA, index, rec,
637 			  offsets, NULL, NULL, NULL, &ext, heap);
638 		if (ext) {
639 			/* Log the row_ext_t, ext->ext and ext->buf */
640 			ext_size = ext->n_ext * ext->max_len
641 				+ sizeof(*ext)
642 				+ ext->n_ext * sizeof(ulint)
643 				+ (ext->n_ext - 1) * sizeof ext->len;
644 			mrec_size += ext_size;
645 		}
646 	}
647 
648 	if (byte* b = row_log_table_open(index->online_log,
649 					 mrec_size, &avail_size)) {
650 		*b++ = ROW_T_DELETE;
651 		*b++ = static_cast<byte>(old_pk_extra_size);
652 
653 		/* Log the size of external prefix we saved */
654 		mach_write_to_4(b, ext_size);
655 		b += 4;
656 
657 		rec_convert_dtuple_to_temp(
658 			b + old_pk_extra_size, new_index,
659 			old_pk->fields, old_pk->n_fields);
660 
661 		b += old_pk_size;
662 
663 		if (ext_size) {
664 			ulint	cur_ext_size = sizeof(*ext)
665 				+ (ext->n_ext - 1) * sizeof ext->len;
666 
667 			memcpy(b, ext, cur_ext_size);
668 			b += cur_ext_size;
669 
670 			/* Check if we need to col_map to adjust the column
671 			number. If columns were added/removed/reordered,
672 			adjust the column number. */
673 			if (const ulint* col_map =
674 				index->online_log->col_map) {
675 				for (ulint i = 0; i < ext->n_ext; i++) {
676 					const_cast<ulint&>(ext->ext[i]) =
677 						col_map[ext->ext[i]];
678 				}
679 			}
680 
681 			memcpy(b, ext->ext, ext->n_ext * sizeof(*ext->ext));
682 			b += ext->n_ext * sizeof(*ext->ext);
683 
684 			ext_size -= cur_ext_size
685 				 + ext->n_ext * sizeof(*ext->ext);
686 			memcpy(b, ext->buf, ext_size);
687 			b += ext_size;
688 		}
689 
690 		row_log_table_close(
691 			index->online_log, b, mrec_size, avail_size);
692 	}
693 
694 func_exit:
695 	mem_heap_free(heap);
696 }
697 
698 /******************************************************//**
699 Logs an insert or update to a table that is being rebuilt. */
700 static
701 void
row_log_table_low_redundant(const rec_t * rec,dict_index_t * index,bool insert,const dtuple_t * old_pk,const dict_index_t * new_index)702 row_log_table_low_redundant(
703 /*========================*/
704 	const rec_t*		rec,	/*!< in: clustered index leaf
705 					page record in ROW_FORMAT=REDUNDANT,
706 					page X-latched */
707 	dict_index_t*		index,	/*!< in/out: clustered index, S-latched
708 					or X-latched */
709 	bool			insert,	/*!< in: true if insert,
710 					false if update */
711 	const dtuple_t*		old_pk,	/*!< in: old PRIMARY KEY value
712 					(if !insert and a PRIMARY KEY
713 					is being created) */
714 	const dict_index_t*	new_index)
715 					/*!< in: clustered index of the
716 					new table, not latched */
717 {
718 	ulint		old_pk_size;
719 	ulint		old_pk_extra_size;
720 	ulint		size;
721 	ulint		extra_size;
722 	ulint		mrec_size;
723 	ulint		avail_size;
724 	mem_heap_t*	heap		= NULL;
725 	dtuple_t*	tuple;
726 
727 	ut_ad(!page_is_comp(page_align(rec)));
728 	ut_ad(dict_index_get_n_fields(index) == rec_get_n_fields_old(rec));
729 	ut_ad(dict_tf_is_valid(index->table->flags));
730 	ut_ad(!dict_table_is_comp(index->table));  /* redundant row format */
731 	ut_ad(dict_index_is_clust(new_index));
732 
733 	heap = mem_heap_create(DTUPLE_EST_ALLOC(index->n_fields));
734 	tuple = dtuple_create(heap, index->n_fields);
735 	dict_index_copy_types(tuple, index, index->n_fields);
736 	dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index));
737 
738 	if (rec_get_1byte_offs_flag(rec)) {
739 		for (ulint i = 0; i < index->n_fields; i++) {
740 			dfield_t*	dfield;
741 			ulint		len;
742 			const void*	field;
743 
744 			dfield = dtuple_get_nth_field(tuple, i);
745 			field = rec_get_nth_field_old(rec, i, &len);
746 
747 			dfield_set_data(dfield, field, len);
748 		}
749 	} else {
750 		for (ulint i = 0; i < index->n_fields; i++) {
751 			dfield_t*	dfield;
752 			ulint		len;
753 			const void*	field;
754 
755 			dfield = dtuple_get_nth_field(tuple, i);
756 			field = rec_get_nth_field_old(rec, i, &len);
757 
758 			dfield_set_data(dfield, field, len);
759 
760 			if (rec_2_is_field_extern(rec, i)) {
761 				dfield_set_ext(dfield);
762 			}
763 		}
764 	}
765 
766 	size = rec_get_converted_size_temp(
767 		index, tuple->fields, tuple->n_fields, &extra_size);
768 
769 	mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80);
770 
771 	if (insert || index->online_log->same_pk) {
772 		ut_ad(!old_pk);
773 		old_pk_extra_size = old_pk_size = 0;
774 	} else {
775 		ut_ad(old_pk);
776 		ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
777 		ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
778 			      old_pk, old_pk->n_fields - 2)->len);
779 		ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
780 			      old_pk, old_pk->n_fields - 1)->len);
781 
782 		old_pk_size = rec_get_converted_size_temp(
783 			new_index, old_pk->fields, old_pk->n_fields,
784 			&old_pk_extra_size);
785 		ut_ad(old_pk_extra_size < 0x100);
786 		mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
787 	}
788 
789 	if (byte* b = row_log_table_open(index->online_log,
790 					 mrec_size, &avail_size)) {
791 		*b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
792 
793 		if (old_pk_size) {
794 			*b++ = static_cast<byte>(old_pk_extra_size);
795 
796 			rec_convert_dtuple_to_temp(
797 				b + old_pk_extra_size, new_index,
798 				old_pk->fields, old_pk->n_fields);
799 			b += old_pk_size;
800 		}
801 
802 		if (extra_size < 0x80) {
803 			*b++ = static_cast<byte>(extra_size);
804 		} else {
805 			ut_ad(extra_size < 0x8000);
806 			*b++ = static_cast<byte>(0x80 | (extra_size >> 8));
807 			*b++ = static_cast<byte>(extra_size);
808 		}
809 
810 		rec_convert_dtuple_to_temp(
811 			b + extra_size, index, tuple->fields, tuple->n_fields);
812 		b += size;
813 
814 		row_log_table_close(
815 			index->online_log, b, mrec_size, avail_size);
816 	}
817 
818 	mem_heap_free(heap);
819 }
820 
821 /******************************************************//**
822 Logs an insert or update to a table that is being rebuilt. */
823 static MY_ATTRIBUTE((nonnull(1,2,3)))
824 void
row_log_table_low(const rec_t * rec,dict_index_t * index,const ulint * offsets,bool insert,const dtuple_t * old_pk)825 row_log_table_low(
826 /*==============*/
827 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
828 				page X-latched */
829 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
830 				or X-latched */
831 	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
832 	bool		insert,	/*!< in: true if insert, false if update */
833 	const dtuple_t*	old_pk)	/*!< in: old PRIMARY KEY value (if !insert
834 				and a PRIMARY KEY is being created) */
835 {
836 	ulint			omit_size;
837 	ulint			old_pk_size;
838 	ulint			old_pk_extra_size;
839 	ulint			extra_size;
840 	ulint			mrec_size;
841 	ulint			avail_size;
842 	const dict_index_t*	new_index = dict_table_get_first_index(
843 		index->online_log->table);
844 	ut_ad(dict_index_is_clust(index));
845 	ut_ad(dict_index_is_clust(new_index));
846 	ut_ad(!dict_index_is_online_ddl(new_index));
847 	ut_ad(rec_offs_validate(rec, index, offsets));
848 	ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
849 	ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
850 #ifdef UNIV_SYNC_DEBUG
851 	ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
852 	      || rw_lock_own(&index->lock, RW_LOCK_EX));
853 #endif /* UNIV_SYNC_DEBUG */
854 	ut_ad(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX);
855 	ut_ad(page_is_leaf(page_align(rec)));
856 	ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets));
857 
858 	if (dict_index_is_corrupted(index)
859 	    || !dict_index_is_online_ddl(index)
860 	    || index->online_log->error != DB_SUCCESS) {
861 		return;
862 	}
863 
864 	if (!rec_offs_comp(offsets)) {
865 		row_log_table_low_redundant(
866 			rec, index, insert, old_pk, new_index);
867 		return;
868 	}
869 
870 	ut_ad(page_is_comp(page_align(rec)));
871 	ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
872 
873 	omit_size = REC_N_NEW_EXTRA_BYTES;
874 
875 	extra_size = rec_offs_extra_size(offsets) - omit_size;
876 
877 	mrec_size = ROW_LOG_HEADER_SIZE
878 		+ (extra_size >= 0x80) + rec_offs_size(offsets) - omit_size;
879 
880 	if (insert || index->online_log->same_pk) {
881 		ut_ad(!old_pk);
882 		old_pk_extra_size = old_pk_size = 0;
883 	} else {
884 		ut_ad(old_pk);
885 		ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
886 		ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
887 			      old_pk, old_pk->n_fields - 2)->len);
888 		ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
889 			      old_pk, old_pk->n_fields - 1)->len);
890 
891 		old_pk_size = rec_get_converted_size_temp(
892 			new_index, old_pk->fields, old_pk->n_fields,
893 			&old_pk_extra_size);
894 		ut_ad(old_pk_extra_size < 0x100);
895 		mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
896 	}
897 
898 	if (byte* b = row_log_table_open(index->online_log,
899 					 mrec_size, &avail_size)) {
900 		*b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
901 
902 		if (old_pk_size) {
903 			*b++ = static_cast<byte>(old_pk_extra_size);
904 
905 			rec_convert_dtuple_to_temp(
906 				b + old_pk_extra_size, new_index,
907 				old_pk->fields, old_pk->n_fields);
908 			b += old_pk_size;
909 		}
910 
911 		if (extra_size < 0x80) {
912 			*b++ = static_cast<byte>(extra_size);
913 		} else {
914 			ut_ad(extra_size < 0x8000);
915 			*b++ = static_cast<byte>(0x80 | (extra_size >> 8));
916 			*b++ = static_cast<byte>(extra_size);
917 		}
918 
919 		memcpy(b, rec - rec_offs_extra_size(offsets), extra_size);
920 		b += extra_size;
921 		memcpy(b, rec, rec_offs_data_size(offsets));
922 		b += rec_offs_data_size(offsets);
923 
924 		row_log_table_close(
925 			index->online_log, b, mrec_size, avail_size);
926 	}
927 }
928 
929 /******************************************************//**
930 Logs an update to a table that is being rebuilt.
931 This will be merged in row_log_table_apply_update(). */
932 UNIV_INTERN
933 void
row_log_table_update(const rec_t * rec,dict_index_t * index,const ulint * offsets,const dtuple_t * old_pk)934 row_log_table_update(
935 /*=================*/
936 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
937 				page X-latched */
938 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
939 				or X-latched */
940 	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
941 	const dtuple_t*	old_pk)	/*!< in: row_log_table_get_pk()
942 				before the update */
943 {
944 	row_log_table_low(rec, index, offsets, false, old_pk);
945 }
946 
947 /** Gets the old table column of a PRIMARY KEY column.
948 @param table	old table (before ALTER TABLE)
949 @param col_map	mapping of old column numbers to new ones
950 @param col_no	column position in the new table
951 @return old table column, or NULL if this is an added column */
952 static
953 const dict_col_t*
row_log_table_get_pk_old_col(const dict_table_t * table,const ulint * col_map,ulint col_no)954 row_log_table_get_pk_old_col(
955 /*=========================*/
956 	const dict_table_t*	table,
957 	const ulint*		col_map,
958 	ulint			col_no)
959 {
960 	for (ulint i = 0; i < table->n_cols; i++) {
961 		if (col_no == col_map[i]) {
962 			return(dict_table_get_nth_col(table, i));
963 		}
964 	}
965 
966 	return(NULL);
967 }
968 
969 /** Maps an old table column of a PRIMARY KEY column.
970 @param col	old table column (before ALTER TABLE)
971 @param ifield	clustered index field in the new table (after ALTER TABLE)
972 @param dfield	clustered index tuple field in the new table
973 @param heap	memory heap for allocating dfield contents
974 @param rec	clustered index leaf page record in the old table
975 @param offsets	rec_get_offsets(rec)
976 @param i	rec field corresponding to col
977 @param zip_size	compressed page size of the old table, or 0 for uncompressed
978 @param max_len	maximum length of dfield
979 @retval DB_INVALID_NULL if a NULL value is encountered
980 @retval DB_TOO_BIG_INDEX_COL if the maximum prefix length is exceeded */
981 static
982 dberr_t
row_log_table_get_pk_col(const dict_col_t * col,const dict_field_t * ifield,dfield_t * dfield,mem_heap_t * heap,const rec_t * rec,const ulint * offsets,ulint i,ulint zip_size,ulint max_len)983 row_log_table_get_pk_col(
984 /*=====================*/
985 	const dict_col_t*	col,
986 	const dict_field_t*	ifield,
987 	dfield_t*		dfield,
988 	mem_heap_t*		heap,
989 	const rec_t*		rec,
990 	const ulint*		offsets,
991 	ulint			i,
992 	ulint			zip_size,
993 	ulint			max_len)
994 {
995 	const byte*	field;
996 	ulint		len;
997 
998 	ut_ad(ut_is_2pow(zip_size));
999 
1000 	field = rec_get_nth_field(rec, offsets, i, &len);
1001 
1002 	if (len == UNIV_SQL_NULL) {
1003 		return(DB_INVALID_NULL);
1004 	}
1005 
1006 	if (rec_offs_nth_extern(offsets, i)) {
1007 		ulint	field_len = ifield->prefix_len;
1008 		byte*	blob_field;
1009 
1010 		if (!field_len) {
1011 			field_len = ifield->fixed_len;
1012 			if (!field_len) {
1013 				field_len = max_len + 1;
1014 			}
1015 		}
1016 
1017 		blob_field = static_cast<byte*>(
1018 			mem_heap_alloc(heap, field_len));
1019 
1020 		len = btr_copy_externally_stored_field_prefix(
1021 			blob_field, field_len, zip_size, field, len);
1022 		if (len >= max_len + 1) {
1023 			return(DB_TOO_BIG_INDEX_COL);
1024 		}
1025 
1026 		dfield_set_data(dfield, blob_field, len);
1027 	} else {
1028 		dfield_set_data(dfield, mem_heap_dup(heap, field, len), len);
1029 	}
1030 
1031 	return(DB_SUCCESS);
1032 }
1033 
1034 /******************************************************//**
1035 Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR
1036 of a table that is being rebuilt.
1037 @return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table,
1038 or NULL if the PRIMARY KEY definition does not change */
1039 UNIV_INTERN
1040 const dtuple_t*
row_log_table_get_pk(const rec_t * rec,dict_index_t * index,const ulint * offsets,byte * sys,mem_heap_t ** heap)1041 row_log_table_get_pk(
1042 /*=================*/
1043 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
1044 				page X-latched */
1045 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
1046 				or X-latched */
1047 	const ulint*	offsets,/*!< in: rec_get_offsets(rec,index) */
1048 	byte*		sys,	/*!< out: DB_TRX_ID,DB_ROLL_PTR for
1049 				row_log_table_delete(), or NULL */
1050 	mem_heap_t**	heap)	/*!< in/out: memory heap where allocated */
1051 {
1052 	dtuple_t*	tuple	= NULL;
1053 	row_log_t*	log	= index->online_log;
1054 
1055 	ut_ad(dict_index_is_clust(index));
1056 	ut_ad(dict_index_is_online_ddl(index));
1057 	ut_ad(!offsets || rec_offs_validate(rec, index, offsets));
1058 #ifdef UNIV_SYNC_DEBUG
1059 	ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
1060 	      || rw_lock_own(&index->lock, RW_LOCK_EX));
1061 #endif /* UNIV_SYNC_DEBUG */
1062 
1063 	ut_ad(log);
1064 	ut_ad(log->table);
1065 
1066 	if (log->same_pk) {
1067 		/* The PRIMARY KEY columns are unchanged. */
1068 		if (sys) {
1069 			/* Store the DB_TRX_ID,DB_ROLL_PTR. */
1070 			ulint	trx_id_offs = index->trx_id_offset;
1071 
1072 			if (!trx_id_offs) {
1073 				ulint	pos = dict_index_get_sys_col_pos(
1074 					index, DATA_TRX_ID);
1075 				ulint	len;
1076 				ut_ad(pos > 0);
1077 
1078 				if (!offsets) {
1079 					offsets = rec_get_offsets(
1080 						rec, index, NULL, pos + 1,
1081 						heap);
1082 				}
1083 
1084 				trx_id_offs = rec_get_nth_field_offs(
1085 					offsets, pos, &len);
1086 				ut_ad(len == DATA_TRX_ID_LEN);
1087 			}
1088 
1089 			memcpy(sys, rec + trx_id_offs,
1090 			       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1091 		}
1092 
1093 		return(NULL);
1094 	}
1095 
1096 	mutex_enter(&log->mutex);
1097 
1098 	/* log->error is protected by log->mutex. */
1099 	if (log->error == DB_SUCCESS) {
1100 		dict_table_t*	new_table	= log->table;
1101 		dict_index_t*	new_index
1102 			= dict_table_get_first_index(new_table);
1103 		const ulint	new_n_uniq
1104 			= dict_index_get_n_unique(new_index);
1105 
1106 		if (!*heap) {
1107 			ulint	size = 0;
1108 
1109 			if (!offsets) {
1110 				size += (1 + REC_OFFS_HEADER_SIZE
1111 					 + index->n_fields)
1112 					* sizeof *offsets;
1113 			}
1114 
1115 			for (ulint i = 0; i < new_n_uniq; i++) {
1116 				size += dict_col_get_min_size(
1117 					dict_index_get_nth_col(new_index, i));
1118 			}
1119 
1120 			*heap = mem_heap_create(
1121 				DTUPLE_EST_ALLOC(new_n_uniq + 2) + size);
1122 		}
1123 
1124 		if (!offsets) {
1125 			offsets = rec_get_offsets(rec, index, NULL,
1126 						  ULINT_UNDEFINED, heap);
1127 		}
1128 
1129 		tuple = dtuple_create(*heap, new_n_uniq + 2);
1130 		dict_index_copy_types(tuple, new_index, tuple->n_fields);
1131 		dtuple_set_n_fields_cmp(tuple, new_n_uniq);
1132 
1133 		const ulint max_len = DICT_MAX_FIELD_LEN_BY_FORMAT(new_table);
1134 		const ulint zip_size = dict_table_zip_size(index->table);
1135 
1136 		for (ulint new_i = 0; new_i < new_n_uniq; new_i++) {
1137 			dict_field_t*	ifield;
1138 			dfield_t*	dfield;
1139 			ulint		prtype;
1140 			ulint		mbminmaxlen;
1141 
1142 			ifield = dict_index_get_nth_field(new_index, new_i);
1143 			dfield = dtuple_get_nth_field(tuple, new_i);
1144 
1145 			const ulint	col_no
1146 				= dict_field_get_col(ifield)->ind;
1147 
1148 			if (const dict_col_t* col
1149 			    = row_log_table_get_pk_old_col(
1150 				    index->table, log->col_map, col_no)) {
1151 				ulint	i = dict_col_get_clust_pos(col, index);
1152 
1153 				if (i == ULINT_UNDEFINED) {
1154 					ut_ad(0);
1155 					log->error = DB_CORRUPTION;
1156 					goto err_exit;
1157 				}
1158 
1159 				log->error = row_log_table_get_pk_col(
1160 					col, ifield, dfield, *heap,
1161 					rec, offsets, i, zip_size, max_len);
1162 
1163 				if (log->error != DB_SUCCESS) {
1164 err_exit:
1165 					tuple = NULL;
1166 					goto func_exit;
1167 				}
1168 
1169 				mbminmaxlen = col->mbminmaxlen;
1170 				prtype = col->prtype;
1171 			} else {
1172 				/* No matching column was found in the old
1173 				table, so this must be an added column.
1174 				Copy the default value. */
1175 				ut_ad(log->add_cols);
1176 
1177 				dfield_copy(dfield, dtuple_get_nth_field(
1178 						    log->add_cols, col_no));
1179 				mbminmaxlen = dfield->type.mbminmaxlen;
1180 				prtype = dfield->type.prtype;
1181 			}
1182 
1183 			ut_ad(!dfield_is_ext(dfield));
1184 			ut_ad(!dfield_is_null(dfield));
1185 
1186 			if (ifield->prefix_len) {
1187 				ulint	len = dtype_get_at_most_n_mbchars(
1188 					prtype, mbminmaxlen,
1189 					ifield->prefix_len,
1190 					dfield_get_len(dfield),
1191 					static_cast<const char*>(
1192 						dfield_get_data(dfield)));
1193 
1194 				ut_ad(len <= dfield_get_len(dfield));
1195 				dfield_set_len(dfield, len);
1196 			}
1197 		}
1198 
1199 		const byte* trx_roll = rec
1200 			+ row_get_trx_id_offset(index, offsets);
1201 
1202 		/* Copy the fields, because the fields will be updated
1203 		or the record may be moved somewhere else in the B-tree
1204 		as part of the upcoming operation. */
1205 		if (sys) {
1206 			memcpy(sys, trx_roll,
1207 			       DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1208 			trx_roll = sys;
1209 		} else {
1210 			trx_roll = static_cast<const byte*>(
1211 				mem_heap_dup(
1212 					*heap, trx_roll,
1213 					DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN));
1214 		}
1215 
1216 		dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq),
1217 				trx_roll, DATA_TRX_ID_LEN);
1218 		dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1),
1219 				trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
1220 	}
1221 
1222 func_exit:
1223 	mutex_exit(&log->mutex);
1224 	return(tuple);
1225 }
1226 
1227 /******************************************************//**
1228 Logs an insert to a table that is being rebuilt.
1229 This will be merged in row_log_table_apply_insert(). */
1230 UNIV_INTERN
1231 void
row_log_table_insert(const rec_t * rec,dict_index_t * index,const ulint * offsets)1232 row_log_table_insert(
1233 /*=================*/
1234 	const rec_t*	rec,	/*!< in: clustered index leaf page record,
1235 				page X-latched */
1236 	dict_index_t*	index,	/*!< in/out: clustered index, S-latched
1237 				or X-latched */
1238 	const ulint*	offsets)/*!< in: rec_get_offsets(rec,index) */
1239 {
1240 	row_log_table_low(rec, index, offsets, true, NULL);
1241 }
1242 
1243 /******************************************************//**
1244 Notes that a BLOB is being freed during online ALTER TABLE. */
1245 UNIV_INTERN
1246 void
row_log_table_blob_free(dict_index_t * index,ulint page_no)1247 row_log_table_blob_free(
1248 /*====================*/
1249 	dict_index_t*	index,	/*!< in/out: clustered index, X-latched */
1250 	ulint		page_no)/*!< in: starting page number of the BLOB */
1251 {
1252 	ut_ad(dict_index_is_clust(index));
1253 	ut_ad(dict_index_is_online_ddl(index));
1254 #ifdef UNIV_SYNC_DEBUG
1255 	ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX));
1256 #endif /* UNIV_SYNC_DEBUG */
1257 	ut_ad(page_no != FIL_NULL);
1258 
1259 	if (index->online_log->error != DB_SUCCESS) {
1260 		return;
1261 	}
1262 
1263 	page_no_map*	blobs	= index->online_log->blobs;
1264 
1265 	if (!blobs) {
1266 		index->online_log->blobs = blobs = new page_no_map();
1267 	}
1268 
1269 #ifdef UNIV_DEBUG
1270 	const ulonglong	log_pos = index->online_log->tail.total;
1271 #else
1272 # define log_pos /* empty */
1273 #endif /* UNIV_DEBUG */
1274 
1275 	const page_no_map::value_type v(page_no,
1276 					row_log_table_blob_t(log_pos));
1277 
1278 	std::pair<page_no_map::iterator,bool> p = blobs->insert(v);
1279 
1280 	if (!p.second) {
1281 		/* Update the existing mapping. */
1282 		ut_ad(p.first->first == page_no);
1283 		p.first->second.blob_free(log_pos);
1284 	}
1285 #undef log_pos
1286 }
1287 
1288 /******************************************************//**
1289 Notes that a BLOB is being allocated during online ALTER TABLE. */
1290 UNIV_INTERN
1291 void
row_log_table_blob_alloc(dict_index_t * index,ulint page_no)1292 row_log_table_blob_alloc(
1293 /*=====================*/
1294 	dict_index_t*	index,	/*!< in/out: clustered index, X-latched */
1295 	ulint		page_no)/*!< in: starting page number of the BLOB */
1296 {
1297 	ut_ad(dict_index_is_clust(index));
1298 	ut_ad(dict_index_is_online_ddl(index));
1299 #ifdef UNIV_SYNC_DEBUG
1300 	ut_ad(rw_lock_own(&index->lock, RW_LOCK_EX));
1301 #endif /* UNIV_SYNC_DEBUG */
1302 	ut_ad(page_no != FIL_NULL);
1303 
1304 	if (index->online_log->error != DB_SUCCESS) {
1305 		return;
1306 	}
1307 
1308 	/* Only track allocations if the same page has been freed
1309 	earlier. Double allocation without a free is not allowed. */
1310 	if (page_no_map* blobs = index->online_log->blobs) {
1311 		page_no_map::iterator p = blobs->find(page_no);
1312 
1313 		if (p != blobs->end()) {
1314 			ut_ad(p->first == page_no);
1315 			p->second.blob_alloc(index->online_log->tail.total);
1316 		}
1317 	}
1318 }
1319 
1320 /******************************************************//**
1321 Converts a log record to a table row.
1322 @return converted row, or NULL if the conversion fails */
1323 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1324 const dtuple_t*
row_log_table_apply_convert_mrec(const mrec_t * mrec,dict_index_t * index,const ulint * offsets,const row_log_t * log,mem_heap_t * heap,trx_id_t trx_id,dberr_t * error)1325 row_log_table_apply_convert_mrec(
1326 /*=============================*/
1327 	const mrec_t*		mrec,		/*!< in: merge record */
1328 	dict_index_t*		index,		/*!< in: index of mrec */
1329 	const ulint*		offsets,	/*!< in: offsets of mrec */
1330 	const row_log_t*	log,		/*!< in: rebuild context */
1331 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1332 	trx_id_t		trx_id,		/*!< in: DB_TRX_ID of mrec */
1333 	dberr_t*		error)		/*!< out: DB_SUCCESS or
1334 						DB_MISSING_HISTORY or
1335 						reason of failure */
1336 {
1337 	dtuple_t*	row;
1338 
1339 	*error = DB_SUCCESS;
1340 
1341 	/* This is based on row_build(). */
1342 	if (log->add_cols) {
1343 		row = dtuple_copy(log->add_cols, heap);
1344 		/* dict_table_copy_types() would set the fields to NULL */
1345 		for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) {
1346 			dict_col_copy_type(
1347 				dict_table_get_nth_col(log->table, i),
1348 				dfield_get_type(dtuple_get_nth_field(row, i)));
1349 		}
1350 	} else {
1351 		row = dtuple_create(heap, dict_table_get_n_cols(log->table));
1352 		dict_table_copy_types(row, log->table);
1353 	}
1354 
1355 	for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
1356 		const dict_field_t*	ind_field
1357 			= dict_index_get_nth_field(index, i);
1358 
1359 		if (ind_field->prefix_len) {
1360 			/* Column prefixes can only occur in key
1361 			fields, which cannot be stored externally. For
1362 			a column prefix, there should also be the full
1363 			field in the clustered index tuple. The row
1364 			tuple comprises full fields, not prefixes. */
1365 			ut_ad(!rec_offs_nth_extern(offsets, i));
1366 			continue;
1367 		}
1368 
1369 		const dict_col_t*	col
1370 			= dict_field_get_col(ind_field);
1371 		ulint			col_no
1372 			= log->col_map[dict_col_get_no(col)];
1373 
1374 		if (col_no == ULINT_UNDEFINED) {
1375 			/* dropped column */
1376 			continue;
1377 		}
1378 
1379 		dfield_t*		dfield
1380 			= dtuple_get_nth_field(row, col_no);
1381 		ulint			len;
1382 		const byte*		data;
1383 
1384 		if (rec_offs_nth_extern(offsets, i)) {
1385 			ut_ad(rec_offs_any_extern(offsets));
1386 			rw_lock_x_lock(dict_index_get_lock(index));
1387 
1388 			if (const page_no_map* blobs = log->blobs) {
1389 				data = rec_get_nth_field(
1390 					mrec, offsets, i, &len);
1391 				ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
1392 
1393 				ulint	page_no = mach_read_from_4(
1394 					data + len - (BTR_EXTERN_FIELD_REF_SIZE
1395 						      - BTR_EXTERN_PAGE_NO));
1396 				page_no_map::const_iterator p = blobs->find(
1397 					page_no);
1398 				if (p != blobs->end()
1399 				    && p->second.is_freed(log->head.total)) {
1400 					/* This BLOB has been freed.
1401 					We must not access the row. */
1402 					*error = DB_MISSING_HISTORY;
1403 					dfield_set_data(dfield, data, len);
1404 					dfield_set_ext(dfield);
1405 					goto blob_done;
1406 				}
1407 			}
1408 
1409 			data = btr_rec_copy_externally_stored_field(
1410 				mrec, offsets,
1411 				dict_table_zip_size(index->table),
1412 				i, &len, heap);
1413 			ut_a(data);
1414 			dfield_set_data(dfield, data, len);
1415 blob_done:
1416 			rw_lock_x_unlock(dict_index_get_lock(index));
1417 		} else {
1418 			data = rec_get_nth_field(mrec, offsets, i, &len);
1419 			dfield_set_data(dfield, data, len);
1420 		}
1421 
1422 		if (len != UNIV_SQL_NULL && col->mtype == DATA_MYSQL
1423 		    && col->len != len && !dict_table_is_comp(log->table)) {
1424 
1425 			ut_ad(col->len >= len);
1426 			if (dict_table_is_comp(index->table)) {
1427 				byte*	buf = (byte*) mem_heap_alloc(heap,
1428 								     col->len);
1429 				memcpy(buf, dfield->data, len);
1430 				memset(buf + len, 0x20, col->len - len);
1431 
1432 				dfield_set_data(dfield, buf, col->len);
1433 			} else {
1434 				/* field length mismatch should not happen
1435 				when rebuilding the redundant row format
1436 				table. */
1437 				ut_ad(0);
1438 				*error = DB_CORRUPTION;
1439 				return(NULL);
1440 			}
1441 		}
1442 
1443 		/* See if any columns were changed to NULL or NOT NULL. */
1444 		const dict_col_t*	new_col
1445 			= dict_table_get_nth_col(log->table, col_no);
1446 		ut_ad(new_col->mtype == col->mtype);
1447 
1448 		/* Assert that prtype matches except for nullability. */
1449 		ut_ad(!((new_col->prtype ^ col->prtype) & ~DATA_NOT_NULL));
1450 		ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype)
1451 			& ~DATA_NOT_NULL));
1452 
1453 		if (new_col->prtype == col->prtype) {
1454 			continue;
1455 		}
1456 
1457 		if ((new_col->prtype & DATA_NOT_NULL)
1458 		    && dfield_is_null(dfield)) {
1459 			/* We got a NULL value for a NOT NULL column. */
1460 			*error = DB_INVALID_NULL;
1461 			return(NULL);
1462 		}
1463 
1464 		/* Adjust the DATA_NOT_NULL flag in the parsed row. */
1465 		dfield_get_type(dfield)->prtype = new_col->prtype;
1466 
1467 		ut_ad(dict_col_type_assert_equal(new_col,
1468 						 dfield_get_type(dfield)));
1469 	}
1470 
1471 	return(row);
1472 }
1473 
1474 /******************************************************//**
1475 Replays an insert operation on a table that was rebuilt.
1476 @return DB_SUCCESS or error code */
1477 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1478 dberr_t
row_log_table_apply_insert_low(que_thr_t * thr,const dtuple_t * row,trx_id_t trx_id,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup)1479 row_log_table_apply_insert_low(
1480 /*===========================*/
1481 	que_thr_t*		thr,		/*!< in: query graph */
1482 	const dtuple_t*		row,		/*!< in: table row
1483 						in the old table definition */
1484 	trx_id_t		trx_id,		/*!< in: trx_id of the row */
1485 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
1486 						that can be emptied */
1487 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1488 	row_merge_dup_t*	dup)		/*!< in/out: for reporting
1489 						duplicate key errors */
1490 {
1491 	dberr_t		error;
1492 	dtuple_t*	entry;
1493 	const row_log_t*log	= dup->index->online_log;
1494 	dict_index_t*	index	= dict_table_get_first_index(log->table);
1495 	ulint		n_index = 0;
1496 
1497 	ut_ad(dtuple_validate(row));
1498 	ut_ad(trx_id);
1499 
1500 #ifdef ROW_LOG_APPLY_PRINT
1501 	if (row_log_apply_print) {
1502 		fprintf(stderr, "table apply insert "
1503 			IB_ID_FMT " " IB_ID_FMT "\n",
1504 			index->table->id, index->id);
1505 		dtuple_print(stderr, row);
1506 	}
1507 #endif /* ROW_LOG_APPLY_PRINT */
1508 
1509 	static const ulint	flags
1510 		= (BTR_CREATE_FLAG
1511 		   | BTR_NO_LOCKING_FLAG
1512 		   | BTR_NO_UNDO_LOG_FLAG
1513 		   | BTR_KEEP_SYS_FLAG);
1514 
1515 	entry = row_build_index_entry(row, NULL, index, heap);
1516 
1517 	error = row_ins_clust_index_entry_low(
1518 		flags, BTR_MODIFY_TREE, index, index->n_uniq, entry, 0, thr);
1519 
1520 	switch (error) {
1521 	case DB_SUCCESS:
1522 		break;
1523 	case DB_SUCCESS_LOCKED_REC:
1524 		/* The row had already been copied to the table. */
1525 		return(DB_SUCCESS);
1526 	default:
1527 		return(error);
1528 	}
1529 
1530 	do {
1531 		n_index++;
1532 
1533 		if (!(index = dict_table_get_next_index(index))) {
1534 			break;
1535 		}
1536 
1537 		if (index->type & DICT_FTS) {
1538 			continue;
1539 		}
1540 
1541 		entry = row_build_index_entry(row, NULL, index, heap);
1542 		error = row_ins_sec_index_entry_low(
1543 			flags, BTR_MODIFY_TREE,
1544 			index, offsets_heap, heap, entry, trx_id, thr);
1545 
1546 		/* Report correct index name for duplicate key error. */
1547 		if (error == DB_DUPLICATE_KEY) {
1548 			thr_get_trx(thr)->error_key_num = n_index;
1549 		}
1550 
1551 	} while (error == DB_SUCCESS);
1552 
1553 	return(error);
1554 }
1555 
1556 /******************************************************//**
1557 Replays an insert operation on a table that was rebuilt.
1558 @return DB_SUCCESS or error code */
1559 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1560 dberr_t
row_log_table_apply_insert(que_thr_t * thr,const mrec_t * mrec,const ulint * offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup,trx_id_t trx_id)1561 row_log_table_apply_insert(
1562 /*=======================*/
1563 	que_thr_t*		thr,		/*!< in: query graph */
1564 	const mrec_t*		mrec,		/*!< in: record to insert */
1565 	const ulint*		offsets,	/*!< in: offsets of mrec */
1566 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
1567 						that can be emptied */
1568 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1569 	row_merge_dup_t*	dup,		/*!< in/out: for reporting
1570 						duplicate key errors */
1571 	trx_id_t		trx_id)		/*!< in: DB_TRX_ID of mrec */
1572 {
1573 	const row_log_t*log	= dup->index->online_log;
1574 	dberr_t		error;
1575 	const dtuple_t*	row	= row_log_table_apply_convert_mrec(
1576 		mrec, dup->index, offsets, log, heap, trx_id, &error);
1577 
1578 	switch (error) {
1579 	case DB_MISSING_HISTORY:
1580 		ut_ad(log->blobs);
1581 		/* Because some BLOBs are missing, we know that the
1582 		transaction was rolled back later (a rollback of
1583 		an insert can free BLOBs).
1584 		We can simply skip the insert: the subsequent
1585 		ROW_T_DELETE will be ignored, or a ROW_T_UPDATE will
1586 		be interpreted as ROW_T_INSERT. */
1587 		return(DB_SUCCESS);
1588 	case DB_SUCCESS:
1589 		ut_ad(row != NULL);
1590 		break;
1591 	default:
1592 		ut_ad(0);
1593 	case DB_INVALID_NULL:
1594 		ut_ad(row == NULL);
1595 		return(error);
1596 	}
1597 
1598 	error = row_log_table_apply_insert_low(
1599 		thr, row, trx_id, offsets_heap, heap, dup);
1600 	if (error != DB_SUCCESS) {
1601 		/* Report the erroneous row using the new
1602 		version of the table. */
1603 		innobase_row_to_mysql(dup->table, log->table, row);
1604 	}
1605 	return(error);
1606 }
1607 
1608 /******************************************************//**
1609 Deletes a record from a table that is being rebuilt.
1610 @return DB_SUCCESS or error code */
1611 static MY_ATTRIBUTE((nonnull(1, 2, 4, 5), warn_unused_result))
1612 dberr_t
row_log_table_apply_delete_low(btr_pcur_t * pcur,const ulint * offsets,const row_ext_t * save_ext,mem_heap_t * heap,mtr_t * mtr)1613 row_log_table_apply_delete_low(
1614 /*===========================*/
1615 	btr_pcur_t*		pcur,		/*!< in/out: B-tree cursor,
1616 						will be trashed */
1617 	const ulint*		offsets,	/*!< in: offsets on pcur */
1618 	const row_ext_t*	save_ext,	/*!< in: saved external field
1619 						info, or NULL */
1620 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1621 	mtr_t*			mtr)		/*!< in/out: mini-transaction,
1622 						will be committed */
1623 {
1624 	dberr_t		error;
1625 	row_ext_t*	ext;
1626 	dtuple_t*	row;
1627 	dict_index_t*	index	= btr_pcur_get_btr_cur(pcur)->index;
1628 
1629 	ut_ad(dict_index_is_clust(index));
1630 
1631 #ifdef ROW_LOG_APPLY_PRINT
1632 	if (row_log_apply_print) {
1633 		fprintf(stderr, "table apply delete "
1634 			IB_ID_FMT " " IB_ID_FMT "\n",
1635 			index->table->id, index->id);
1636 		rec_print_new(stderr, btr_pcur_get_rec(pcur), offsets);
1637 	}
1638 #endif /* ROW_LOG_APPLY_PRINT */
1639 	if (dict_table_get_next_index(index)) {
1640 		/* Build a row template for purging secondary index entries. */
1641 		row = row_build(
1642 			ROW_COPY_DATA, index, btr_pcur_get_rec(pcur),
1643 			offsets, NULL, NULL, NULL,
1644 			save_ext ? NULL : &ext, heap);
1645 		if (!save_ext) {
1646 			save_ext = ext;
1647 		}
1648 	} else {
1649 		row = NULL;
1650 	}
1651 
1652 	btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur),
1653 				   BTR_CREATE_FLAG, RB_NONE, mtr);
1654 	mtr_commit(mtr);
1655 
1656 	if (error != DB_SUCCESS) {
1657 		return(error);
1658 	}
1659 
1660 	while ((index = dict_table_get_next_index(index)) != NULL) {
1661 		if (index->type & DICT_FTS) {
1662 			continue;
1663 		}
1664 
1665 		const dtuple_t*	entry = row_build_index_entry(
1666 			row, save_ext, index, heap);
1667 		mtr_start(mtr);
1668 		btr_pcur_open(index, entry, PAGE_CUR_LE,
1669 			      BTR_MODIFY_TREE, pcur, mtr);
1670 #ifdef UNIV_DEBUG
1671 		switch (btr_pcur_get_btr_cur(pcur)->flag) {
1672 		case BTR_CUR_DELETE_REF:
1673 		case BTR_CUR_DEL_MARK_IBUF:
1674 		case BTR_CUR_DELETE_IBUF:
1675 		case BTR_CUR_INSERT_TO_IBUF:
1676 			/* We did not request buffering. */
1677 			break;
1678 		case BTR_CUR_HASH:
1679 		case BTR_CUR_HASH_FAIL:
1680 		case BTR_CUR_BINARY:
1681 			goto flag_ok;
1682 		}
1683 		ut_ad(0);
1684 flag_ok:
1685 #endif /* UNIV_DEBUG */
1686 
1687 		if (page_rec_is_infimum(btr_pcur_get_rec(pcur))
1688 		    || btr_pcur_get_low_match(pcur) < index->n_uniq) {
1689 			/* All secondary index entries should be
1690 			found, because new_table is being modified by
1691 			this thread only, and all indexes should be
1692 			updated in sync. */
1693 			mtr_commit(mtr);
1694 			return(DB_INDEX_CORRUPT);
1695 		}
1696 
1697 		btr_cur_pessimistic_delete(&error, FALSE,
1698 					   btr_pcur_get_btr_cur(pcur),
1699 					   BTR_CREATE_FLAG, RB_NONE, mtr);
1700 		mtr_commit(mtr);
1701 	}
1702 
1703 	return(error);
1704 }
1705 
1706 /******************************************************//**
1707 Replays a delete operation on a table that was rebuilt.
1708 @return DB_SUCCESS or error code */
1709 static MY_ATTRIBUTE((nonnull(1, 3, 4, 5, 6, 7), warn_unused_result))
1710 dberr_t
row_log_table_apply_delete(que_thr_t * thr,ulint trx_id_col,const mrec_t * mrec,const ulint * moffsets,mem_heap_t * offsets_heap,mem_heap_t * heap,const row_log_t * log,const row_ext_t * save_ext)1711 row_log_table_apply_delete(
1712 /*=======================*/
1713 	que_thr_t*		thr,		/*!< in: query graph */
1714 	ulint			trx_id_col,	/*!< in: position of
1715 						DB_TRX_ID in the new
1716 						clustered index */
1717 	const mrec_t*		mrec,		/*!< in: merge record */
1718 	const ulint*		moffsets,	/*!< in: offsets of mrec */
1719 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
1720 						that can be emptied */
1721 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1722 	const row_log_t*	log,		/*!< in: online log */
1723 	const row_ext_t*	save_ext)	/*!< in: saved external field
1724 						info, or NULL */
1725 {
1726 	dict_table_t*	new_table = log->table;
1727 	dict_index_t*	index = dict_table_get_first_index(new_table);
1728 	dtuple_t*	old_pk;
1729 	mtr_t		mtr;
1730 	btr_pcur_t	pcur;
1731 	ulint*		offsets;
1732 
1733 	ut_ad(rec_offs_n_fields(moffsets)
1734 	      == dict_index_get_n_unique(index) + 2);
1735 	ut_ad(!rec_offs_any_extern(moffsets));
1736 
1737 	/* Convert the row to a search tuple. */
1738 	old_pk = dtuple_create(heap, index->n_uniq);
1739 	dict_index_copy_types(old_pk, index, index->n_uniq);
1740 
1741 	for (ulint i = 0; i < index->n_uniq; i++) {
1742 		ulint		len;
1743 		const void*	field;
1744 		field = rec_get_nth_field(mrec, moffsets, i, &len);
1745 		ut_ad(len != UNIV_SQL_NULL);
1746 		dfield_set_data(dtuple_get_nth_field(old_pk, i),
1747 				field, len);
1748 	}
1749 
1750 	mtr_start(&mtr);
1751 	btr_pcur_open(index, old_pk, PAGE_CUR_LE,
1752 		      BTR_MODIFY_TREE, &pcur, &mtr);
1753 #ifdef UNIV_DEBUG
1754 	switch (btr_pcur_get_btr_cur(&pcur)->flag) {
1755 	case BTR_CUR_DELETE_REF:
1756 	case BTR_CUR_DEL_MARK_IBUF:
1757 	case BTR_CUR_DELETE_IBUF:
1758 	case BTR_CUR_INSERT_TO_IBUF:
1759 		/* We did not request buffering. */
1760 		break;
1761 	case BTR_CUR_HASH:
1762 	case BTR_CUR_HASH_FAIL:
1763 	case BTR_CUR_BINARY:
1764 		goto flag_ok;
1765 	}
1766 	ut_ad(0);
1767 flag_ok:
1768 #endif /* UNIV_DEBUG */
1769 
1770 	if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
1771 	    || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
1772 all_done:
1773 		mtr_commit(&mtr);
1774 		/* The record was not found. All done. */
1775 		/* This should only happen when an earlier
1776 		ROW_T_INSERT was skipped or
1777 		ROW_T_UPDATE was interpreted as ROW_T_DELETE
1778 		due to BLOBs having been freed by rollback. */
1779 		return(DB_SUCCESS);
1780 	}
1781 
1782 	offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL,
1783 				  ULINT_UNDEFINED, &offsets_heap);
1784 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
1785 	ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets));
1786 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
1787 
1788 	/* Only remove the record if DB_TRX_ID,DB_ROLL_PTR match. */
1789 
1790 	{
1791 		ulint		len;
1792 		const byte*	mrec_trx_id
1793 			= rec_get_nth_field(mrec, moffsets, trx_id_col, &len);
1794 		ut_ad(len == DATA_TRX_ID_LEN);
1795 		const byte*	rec_trx_id
1796 			= rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1797 					    trx_id_col, &len);
1798 		ut_ad(len == DATA_TRX_ID_LEN);
1799 
1800 		ut_ad(rec_get_nth_field(mrec, moffsets, trx_id_col + 1, &len)
1801 		      == mrec_trx_id + DATA_TRX_ID_LEN);
1802 		ut_ad(len == DATA_ROLL_PTR_LEN);
1803 		ut_ad(rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
1804 					trx_id_col + 1, &len)
1805 		      == rec_trx_id + DATA_TRX_ID_LEN);
1806 		ut_ad(len == DATA_ROLL_PTR_LEN);
1807 
1808 		if (memcmp(mrec_trx_id, rec_trx_id,
1809 			   DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
1810 			/* The ROW_T_DELETE was logged for a different
1811 			PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR.
1812 			This is possible if a ROW_T_INSERT was skipped
1813 			or a ROW_T_UPDATE was interpreted as ROW_T_DELETE
1814 			because some BLOBs were missing due to
1815 			(1) rolling back the initial insert, or
1816 			(2) purging the BLOB for a later ROW_T_DELETE
1817 			(3) purging 'old values' for a later ROW_T_UPDATE
1818 			or ROW_T_DELETE. */
1819 			ut_ad(!log->same_pk);
1820 			goto all_done;
1821 		}
1822 	}
1823 
1824 	return(row_log_table_apply_delete_low(&pcur, offsets, save_ext,
1825 					      heap, &mtr));
1826 }
1827 
1828 /******************************************************//**
1829 Replays an update operation on a table that was rebuilt.
1830 @return DB_SUCCESS or error code */
1831 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1832 dberr_t
row_log_table_apply_update(que_thr_t * thr,ulint new_trx_id_col,const mrec_t * mrec,const ulint * offsets,mem_heap_t * offsets_heap,mem_heap_t * heap,row_merge_dup_t * dup,trx_id_t trx_id,const dtuple_t * old_pk)1833 row_log_table_apply_update(
1834 /*=======================*/
1835 	que_thr_t*		thr,		/*!< in: query graph */
1836 	ulint			new_trx_id_col,	/*!< in: position of
1837 						DB_TRX_ID in the new
1838 						clustered index */
1839 	const mrec_t*		mrec,		/*!< in: new value */
1840 	const ulint*		offsets,	/*!< in: offsets of mrec */
1841 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
1842 						that can be emptied */
1843 	mem_heap_t*		heap,		/*!< in/out: memory heap */
1844 	row_merge_dup_t*	dup,		/*!< in/out: for reporting
1845 						duplicate key errors */
1846 	trx_id_t		trx_id,		/*!< in: DB_TRX_ID of mrec */
1847 	const dtuple_t*		old_pk)		/*!< in: PRIMARY KEY and
1848 						DB_TRX_ID,DB_ROLL_PTR
1849 						of the old value,
1850 						or PRIMARY KEY if same_pk */
1851 {
1852 	const row_log_t*log	= dup->index->online_log;
1853 	const dtuple_t*	row;
1854 	dict_index_t*	index	= dict_table_get_first_index(log->table);
1855 	mtr_t		mtr;
1856 	btr_pcur_t	pcur;
1857 	dberr_t		error;
1858 	ulint		n_index = 0;
1859 
1860 	ut_ad(dtuple_get_n_fields_cmp(old_pk)
1861 	      == dict_index_get_n_unique(index));
1862 	ut_ad(dtuple_get_n_fields(old_pk)
1863 	      == dict_index_get_n_unique(index)
1864 	      + (log->same_pk ? 0 : 2));
1865 
1866 	row = row_log_table_apply_convert_mrec(
1867 		mrec, dup->index, offsets, log, heap, trx_id, &error);
1868 
1869 	switch (error) {
1870 	case DB_MISSING_HISTORY:
1871 		/* The record contained BLOBs that are now missing. */
1872 		ut_ad(log->blobs);
1873 		/* Whether or not we are updating the PRIMARY KEY, we
1874 		know that there should be a subsequent
1875 		ROW_T_DELETE for rolling back a preceding ROW_T_INSERT,
1876 		overriding this ROW_T_UPDATE record. (*1)
1877 
1878 		This allows us to interpret this ROW_T_UPDATE
1879 		as ROW_T_DELETE.
1880 
1881 		When applying the subsequent ROW_T_DELETE, no matching
1882 		record will be found. */
1883 		return(DB_SUCCESS);
1884 	case DB_SUCCESS:
1885 		ut_ad(row != NULL);
1886 		break;
1887 	default:
1888 		ut_ad(0);
1889 	case DB_INVALID_NULL:
1890 		ut_ad(row == NULL);
1891 		return(error);
1892 	}
1893 
1894 	mtr_start(&mtr);
1895 	btr_pcur_open(index, old_pk, PAGE_CUR_LE,
1896 		      BTR_MODIFY_TREE, &pcur, &mtr);
1897 #ifdef UNIV_DEBUG
1898 	switch (btr_pcur_get_btr_cur(&pcur)->flag) {
1899 	case BTR_CUR_DELETE_REF:
1900 	case BTR_CUR_DEL_MARK_IBUF:
1901 	case BTR_CUR_DELETE_IBUF:
1902 	case BTR_CUR_INSERT_TO_IBUF:
1903 		ut_ad(0);/* We did not request buffering. */
1904 	case BTR_CUR_HASH:
1905 	case BTR_CUR_HASH_FAIL:
1906 	case BTR_CUR_BINARY:
1907 		break;
1908 	}
1909 #endif /* UNIV_DEBUG */
1910 
1911 	if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
1912 	    || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
1913 		/* The record was not found. This should only happen
1914 		when an earlier ROW_T_INSERT or ROW_T_UPDATE was
1915 		diverted because BLOBs were freed when the insert was
1916 		later rolled back. */
1917 
1918 		ut_ad(log->blobs);
1919 
1920 		if (error == DB_SUCCESS) {
1921 			/* An earlier ROW_T_INSERT could have been
1922 			skipped because of a missing BLOB, like this:
1923 
1924 			BEGIN;
1925 			INSERT INTO t SET blob_col='blob value';
1926 			UPDATE t SET blob_col='';
1927 			ROLLBACK;
1928 
1929 			This would generate the following records:
1930 			ROW_T_INSERT (referring to 'blob value')
1931 			ROW_T_UPDATE
1932 			ROW_T_UPDATE (referring to 'blob value')
1933 			ROW_T_DELETE
1934 			[ROLLBACK removes the 'blob value']
1935 
1936 			The ROW_T_INSERT would have been skipped
1937 			because of a missing BLOB. Now we are
1938 			executing the first ROW_T_UPDATE.
1939 			The second ROW_T_UPDATE (for the ROLLBACK)
1940 			would be interpreted as ROW_T_DELETE, because
1941 			the BLOB would be missing.
1942 
1943 			We could probably assume that the transaction
1944 			has been rolled back and simply skip the
1945 			'insert' part of this ROW_T_UPDATE record.
1946 			However, there might be some complex scenario
1947 			that could interfere with such a shortcut.
1948 			So, we will insert the row (and risk
1949 			introducing a bogus duplicate key error
1950 			for the ALTER TABLE), and a subsequent
1951 			ROW_T_UPDATE or ROW_T_DELETE will delete it. */
1952 			mtr_commit(&mtr);
1953 			error = row_log_table_apply_insert_low(
1954 				thr, row, trx_id, offsets_heap, heap, dup);
1955 		} else {
1956 			/* Some BLOBs are missing, so we are interpreting
1957 			this ROW_T_UPDATE as ROW_T_DELETE (see *1).
1958 			Because the record was not found, we do nothing. */
1959 			ut_ad(error == DB_MISSING_HISTORY);
1960 			error = DB_SUCCESS;
1961 func_exit:
1962 			mtr_commit(&mtr);
1963 		}
1964 func_exit_committed:
1965 		ut_ad(mtr.state == MTR_COMMITTED);
1966 
1967 		if (error != DB_SUCCESS) {
1968 			/* Report the erroneous row using the new
1969 			version of the table. */
1970 			innobase_row_to_mysql(dup->table, log->table, row);
1971 		}
1972 
1973 		return(error);
1974 	}
1975 
1976 	/* Prepare to update (or delete) the record. */
1977 	ulint*		cur_offsets	= rec_get_offsets(
1978 		btr_pcur_get_rec(&pcur),
1979 		index, NULL, ULINT_UNDEFINED, &offsets_heap);
1980 
1981 	if (!log->same_pk) {
1982 		/* Only update the record if DB_TRX_ID,DB_ROLL_PTR match what
1983 		was buffered. */
1984 		ulint		len;
1985 		const void*	rec_trx_id
1986 			= rec_get_nth_field(btr_pcur_get_rec(&pcur),
1987 					    cur_offsets, index->n_uniq, &len);
1988 		ut_ad(len == DATA_TRX_ID_LEN);
1989 		ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq)->len
1990 		      == DATA_TRX_ID_LEN);
1991 		ut_ad(dtuple_get_nth_field(old_pk, index->n_uniq + 1)->len
1992 		      == DATA_ROLL_PTR_LEN);
1993 		ut_ad(DATA_TRX_ID_LEN + static_cast<const char*>(
1994 			      dtuple_get_nth_field(old_pk,
1995 						   index->n_uniq)->data)
1996 		      == dtuple_get_nth_field(old_pk,
1997 					      index->n_uniq + 1)->data);
1998 		if (memcmp(rec_trx_id,
1999 			   dtuple_get_nth_field(old_pk, index->n_uniq)->data,
2000 			   DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
2001 			/* The ROW_T_UPDATE was logged for a different
2002 			DB_TRX_ID,DB_ROLL_PTR. This is possible if an
2003 			earlier ROW_T_INSERT or ROW_T_UPDATE was diverted
2004 			because some BLOBs were missing due to rolling
2005 			back the initial insert or due to purging
2006 			the old BLOB values of an update. */
2007 			ut_ad(log->blobs);
2008 			if (error != DB_SUCCESS) {
2009 				ut_ad(error == DB_MISSING_HISTORY);
2010 				/* Some BLOBs are missing, so we are
2011 				interpreting this ROW_T_UPDATE as
2012 				ROW_T_DELETE (see *1).
2013 				Because this is a different row,
2014 				we will do nothing. */
2015 				error = DB_SUCCESS;
2016 			} else {
2017 				/* Because the user record is missing due to
2018 				BLOBs that were missing when processing
2019 				an earlier log record, we should
2020 				interpret the ROW_T_UPDATE as ROW_T_INSERT.
2021 				However, there is a different user record
2022 				with the same PRIMARY KEY value already. */
2023 				error = DB_DUPLICATE_KEY;
2024 			}
2025 
2026 			goto func_exit;
2027 		}
2028 	}
2029 
2030 	if (error != DB_SUCCESS) {
2031 		ut_ad(error == DB_MISSING_HISTORY);
2032 		ut_ad(log->blobs);
2033 		/* Some BLOBs are missing, so we are interpreting
2034 		this ROW_T_UPDATE as ROW_T_DELETE (see *1). */
2035 		error = row_log_table_apply_delete_low(
2036 			&pcur, cur_offsets, NULL, heap, &mtr);
2037 		goto func_exit_committed;
2038 	}
2039 
2040 	dtuple_t*	entry	= row_build_index_entry(
2041 		row, NULL, index, heap);
2042 	const upd_t*	update	= row_upd_build_difference_binary(
2043 		index, entry, btr_pcur_get_rec(&pcur), cur_offsets,
2044 		false, NULL, heap);
2045 
2046 	if (!update->n_fields) {
2047 		/* Nothing to do. */
2048 		goto func_exit;
2049 	}
2050 
2051 	const bool	pk_updated
2052 		= upd_get_nth_field(update, 0)->field_no < new_trx_id_col;
2053 
2054 	if (pk_updated || rec_offs_any_extern(cur_offsets)) {
2055 		/* If the record contains any externally stored
2056 		columns, perform the update by delete and insert,
2057 		because we will not write any undo log that would
2058 		allow purge to free any orphaned externally stored
2059 		columns. */
2060 
2061 		if (pk_updated && log->same_pk) {
2062 			/* The ROW_T_UPDATE log record should only be
2063 			written when the PRIMARY KEY fields of the
2064 			record did not change in the old table.  We
2065 			can only get a change of PRIMARY KEY columns
2066 			in the rebuilt table if the PRIMARY KEY was
2067 			redefined (!same_pk). */
2068 			ut_ad(0);
2069 			error = DB_CORRUPTION;
2070 			goto func_exit;
2071 		}
2072 
2073 		error = row_log_table_apply_delete_low(
2074 			&pcur, cur_offsets, NULL, heap, &mtr);
2075 		ut_ad(mtr.state == MTR_COMMITTED);
2076 
2077 		if (error == DB_SUCCESS) {
2078 			error = row_log_table_apply_insert_low(
2079 				thr, row, trx_id, offsets_heap, heap, dup);
2080 		}
2081 
2082 		goto func_exit_committed;
2083 	}
2084 
2085 	dtuple_t*	old_row;
2086 	row_ext_t*	old_ext;
2087 
2088 	if (dict_table_get_next_index(index)) {
2089 		/* Construct the row corresponding to the old value of
2090 		the record. */
2091 		old_row = row_build(
2092 			ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur),
2093 			cur_offsets, NULL, NULL, NULL, &old_ext, heap);
2094 		ut_ad(old_row);
2095 #ifdef ROW_LOG_APPLY_PRINT
2096 		if (row_log_apply_print) {
2097 			fprintf(stderr, "table apply update "
2098 				IB_ID_FMT " " IB_ID_FMT "\n",
2099 				index->table->id, index->id);
2100 			dtuple_print(stderr, old_row);
2101 			dtuple_print(stderr, row);
2102 		}
2103 #endif /* ROW_LOG_APPLY_PRINT */
2104 	} else {
2105 		old_row = NULL;
2106 		old_ext = NULL;
2107 	}
2108 
2109 	big_rec_t*	big_rec;
2110 
2111 	error = btr_cur_pessimistic_update(
2112 		BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2113 		| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG
2114 		| BTR_KEEP_POS_FLAG,
2115 		btr_pcur_get_btr_cur(&pcur),
2116 		&cur_offsets, &offsets_heap, heap, &big_rec,
2117 		update, 0, thr, 0, &mtr);
2118 
2119 	if (big_rec) {
2120 		if (error == DB_SUCCESS) {
2121 			error = btr_store_big_rec_extern_fields(
2122 				index, btr_pcur_get_block(&pcur),
2123 				btr_pcur_get_rec(&pcur), cur_offsets,
2124 				big_rec, &mtr, BTR_STORE_UPDATE);
2125 		}
2126 
2127 		dtuple_big_rec_free(big_rec);
2128 	}
2129 
2130 	while ((index = dict_table_get_next_index(index)) != NULL) {
2131 		if (error != DB_SUCCESS) {
2132 			break;
2133 		}
2134 
2135 		n_index++;
2136 
2137 		if (index->type & DICT_FTS) {
2138 			continue;
2139 		}
2140 
2141 		if (!row_upd_changes_ord_field_binary(
2142 			    index, update, thr, old_row, NULL)) {
2143 			continue;
2144 		}
2145 
2146 		mtr_commit(&mtr);
2147 
2148 		entry = row_build_index_entry(old_row, old_ext, index, heap);
2149 		if (!entry) {
2150 			ut_ad(0);
2151 			return(DB_CORRUPTION);
2152 		}
2153 
2154 		mtr_start(&mtr);
2155 
2156 		if (ROW_FOUND != row_search_index_entry(
2157 			    index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) {
2158 			ut_ad(0);
2159 			error = DB_CORRUPTION;
2160 			break;
2161 		}
2162 
2163 		btr_cur_pessimistic_delete(
2164 			&error, FALSE, btr_pcur_get_btr_cur(&pcur),
2165 			BTR_CREATE_FLAG, RB_NONE, &mtr);
2166 
2167 		if (error != DB_SUCCESS) {
2168 			break;
2169 		}
2170 
2171 		mtr_commit(&mtr);
2172 
2173 		entry = row_build_index_entry(row, NULL, index, heap);
2174 		error = row_ins_sec_index_entry_low(
2175 			BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
2176 			| BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
2177 			BTR_MODIFY_TREE, index, offsets_heap, heap,
2178 			entry, trx_id, thr);
2179 
2180 		/* Report correct index name for duplicate key error. */
2181 		if (error == DB_DUPLICATE_KEY) {
2182 			thr_get_trx(thr)->error_key_num = n_index;
2183 		}
2184 
2185 		mtr_start(&mtr);
2186 	}
2187 
2188 	goto func_exit;
2189 }
2190 
2191 /******************************************************//**
2192 Applies an operation to a table that was rebuilt.
2193 @return NULL on failure (mrec corruption) or when out of data;
2194 pointer to next record on success */
2195 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2196 const mrec_t*
row_log_table_apply_op(que_thr_t * thr,ulint trx_id_col,ulint new_trx_id_col,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,mem_heap_t * heap,const mrec_t * mrec,const mrec_t * mrec_end,ulint * offsets)2197 row_log_table_apply_op(
2198 /*===================*/
2199 	que_thr_t*		thr,		/*!< in: query graph */
2200 	ulint			trx_id_col,	/*!< in: position of
2201 						DB_TRX_ID in old index */
2202 	ulint			new_trx_id_col,	/*!< in: position of
2203 						DB_TRX_ID in new index */
2204 	row_merge_dup_t*	dup,		/*!< in/out: for reporting
2205 						duplicate key errors */
2206 	dberr_t*		error,		/*!< out: DB_SUCCESS
2207 						or error code */
2208 	mem_heap_t*		offsets_heap,	/*!< in/out: memory heap
2209 						that can be emptied */
2210 	mem_heap_t*		heap,		/*!< in/out: memory heap */
2211 	const mrec_t*		mrec,		/*!< in: merge record */
2212 	const mrec_t*		mrec_end,	/*!< in: end of buffer */
2213 	ulint*			offsets)	/*!< in/out: work area
2214 						for parsing mrec */
2215 {
2216 	row_log_t*	log	= dup->index->online_log;
2217 	dict_index_t*	new_index = dict_table_get_first_index(log->table);
2218 	ulint		extra_size;
2219 	const mrec_t*	next_mrec;
2220 	dtuple_t*	old_pk;
2221 	row_ext_t*	ext;
2222 	ulint		ext_size;
2223 
2224 	ut_ad(dict_index_is_clust(dup->index));
2225 	ut_ad(dup->index->table != log->table);
2226 	ut_ad(log->head.total <= log->tail.total);
2227 
2228 	*error = DB_SUCCESS;
2229 
2230 	/* 3 = 1 (op type) + 1 (ext_size) + at least 1 byte payload */
2231 	if (mrec + 3 >= mrec_end) {
2232 		return(NULL);
2233 	}
2234 
2235 	const mrec_t* const mrec_start = mrec;
2236 
2237 	switch (*mrec++) {
2238 	default:
2239 		ut_ad(0);
2240 		*error = DB_CORRUPTION;
2241 		return(NULL);
2242 	case ROW_T_INSERT:
2243 		extra_size = *mrec++;
2244 
2245 		if (extra_size >= 0x80) {
2246 			/* Read another byte of extra_size. */
2247 
2248 			extra_size = (extra_size & 0x7f) << 8;
2249 			extra_size |= *mrec++;
2250 		}
2251 
2252 		mrec += extra_size;
2253 
2254 		if (mrec > mrec_end) {
2255 			return(NULL);
2256 		}
2257 
2258 		rec_offs_set_n_fields(offsets, dup->index->n_fields);
2259 		rec_init_offsets_temp(mrec, dup->index, offsets);
2260 
2261 		next_mrec = mrec + rec_offs_data_size(offsets);
2262 
2263 		if (next_mrec > mrec_end) {
2264 			return(NULL);
2265 		} else {
2266 			log->head.total += next_mrec - mrec_start;
2267 
2268 			ulint		len;
2269 			const byte*	db_trx_id
2270 				= rec_get_nth_field(
2271 					mrec, offsets, trx_id_col, &len);
2272 			ut_ad(len == DATA_TRX_ID_LEN);
2273 			*error = row_log_table_apply_insert(
2274 				thr, mrec, offsets, offsets_heap,
2275 				heap, dup, trx_read_trx_id(db_trx_id));
2276 		}
2277 		break;
2278 
2279 	case ROW_T_DELETE:
2280 		/* 1 (extra_size) + 4 (ext_size) + at least 1 (payload) */
2281 		if (mrec + 6 >= mrec_end) {
2282 			return(NULL);
2283 		}
2284 
2285 		extra_size = *mrec++;
2286 		ext_size = mach_read_from_4(mrec);
2287 		mrec += 4;
2288 		ut_ad(mrec < mrec_end);
2289 
2290 		/* We assume extra_size < 0x100 for the PRIMARY KEY prefix.
2291 		For fixed-length PRIMARY key columns, it is 0. */
2292 		mrec += extra_size;
2293 
2294 		rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
2295 		rec_init_offsets_temp(mrec, new_index, offsets);
2296 		next_mrec = mrec + rec_offs_data_size(offsets) + ext_size;
2297 		if (next_mrec > mrec_end) {
2298 			return(NULL);
2299 		}
2300 
2301 		log->head.total += next_mrec - mrec_start;
2302 
2303 		/* If there are external fields, retrieve those logged
2304 		prefix info and reconstruct the row_ext_t */
2305 		if (ext_size) {
2306 			/* We use memcpy to avoid unaligned
2307 			access on some non-x86 platforms.*/
2308 			ext = static_cast<row_ext_t*>(
2309 				mem_heap_dup(heap,
2310 					     mrec + rec_offs_data_size(offsets),
2311 					     ext_size));
2312 
2313 			byte*	ext_start = reinterpret_cast<byte*>(ext);
2314 
2315 			ulint	ext_len = sizeof(*ext)
2316 				+ (ext->n_ext - 1) * sizeof ext->len;
2317 
2318 			ext->ext = reinterpret_cast<ulint*>(ext_start + ext_len);
2319 			ext_len += ext->n_ext * sizeof(*ext->ext);
2320 
2321 			ext->buf = static_cast<byte*>(ext_start + ext_len);
2322 		} else {
2323 			ext = NULL;
2324 		}
2325 
2326 		*error = row_log_table_apply_delete(
2327 			thr, new_trx_id_col,
2328 			mrec, offsets, offsets_heap, heap,
2329 			log, ext);
2330 		break;
2331 
2332 	case ROW_T_UPDATE:
2333 		/* Logically, the log entry consists of the
2334 		(PRIMARY KEY,DB_TRX_ID) of the old value (converted
2335 		to the new primary key definition) followed by
2336 		the new value in the old table definition. If the
2337 		definition of the columns belonging to PRIMARY KEY
2338 		is not changed, the log will only contain
2339 		DB_TRX_ID,new_row. */
2340 
2341 		if (dup->index->online_log->same_pk) {
2342 			ut_ad(new_index->n_uniq == dup->index->n_uniq);
2343 
2344 			extra_size = *mrec++;
2345 
2346 			if (extra_size >= 0x80) {
2347 				/* Read another byte of extra_size. */
2348 
2349 				extra_size = (extra_size & 0x7f) << 8;
2350 				extra_size |= *mrec++;
2351 			}
2352 
2353 			mrec += extra_size;
2354 
2355 			if (mrec > mrec_end) {
2356 				return(NULL);
2357 			}
2358 
2359 			rec_offs_set_n_fields(offsets, dup->index->n_fields);
2360 			rec_init_offsets_temp(mrec, dup->index, offsets);
2361 
2362 			next_mrec = mrec + rec_offs_data_size(offsets);
2363 
2364 			if (next_mrec > mrec_end) {
2365 				return(NULL);
2366 			}
2367 
2368 			old_pk = dtuple_create(heap, new_index->n_uniq);
2369 			dict_index_copy_types(
2370 				old_pk, new_index, old_pk->n_fields);
2371 
2372 			/* Copy the PRIMARY KEY fields from mrec to old_pk. */
2373 			for (ulint i = 0; i < new_index->n_uniq; i++) {
2374 				const void*	field;
2375 				ulint		len;
2376 				dfield_t*	dfield;
2377 
2378 				ut_ad(!rec_offs_nth_extern(offsets, i));
2379 
2380 				field = rec_get_nth_field(
2381 					mrec, offsets, i, &len);
2382 				ut_ad(len != UNIV_SQL_NULL);
2383 
2384 				dfield = dtuple_get_nth_field(old_pk, i);
2385 				dfield_set_data(dfield, field, len);
2386 			}
2387 		} else {
2388 			/* We assume extra_size < 0x100
2389 			for the PRIMARY KEY prefix. */
2390 			mrec += *mrec + 1;
2391 
2392 			if (mrec > mrec_end) {
2393 				return(NULL);
2394 			}
2395 
2396 			/* Get offsets for PRIMARY KEY,
2397 			DB_TRX_ID, DB_ROLL_PTR. */
2398 			rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
2399 			rec_init_offsets_temp(mrec, new_index, offsets);
2400 
2401 			next_mrec = mrec + rec_offs_data_size(offsets);
2402 			if (next_mrec + 2 > mrec_end) {
2403 				return(NULL);
2404 			}
2405 
2406 			/* Copy the PRIMARY KEY fields and
2407 			DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */
2408 			old_pk = dtuple_create(heap, new_index->n_uniq + 2);
2409 			dict_index_copy_types(old_pk, new_index,
2410 					      old_pk->n_fields);
2411 
2412 			for (ulint i = 0;
2413 			     i < dict_index_get_n_unique(new_index) + 2;
2414 			     i++) {
2415 				const void*	field;
2416 				ulint		len;
2417 				dfield_t*	dfield;
2418 
2419 				ut_ad(!rec_offs_nth_extern(offsets, i));
2420 
2421 				field = rec_get_nth_field(
2422 					mrec, offsets, i, &len);
2423 				ut_ad(len != UNIV_SQL_NULL);
2424 
2425 				dfield = dtuple_get_nth_field(old_pk, i);
2426 				dfield_set_data(dfield, field, len);
2427 			}
2428 
2429 			mrec = next_mrec;
2430 
2431 			/* Fetch the new value of the row as it was
2432 			in the old table definition. */
2433 			extra_size = *mrec++;
2434 
2435 			if (extra_size >= 0x80) {
2436 				/* Read another byte of extra_size. */
2437 
2438 				extra_size = (extra_size & 0x7f) << 8;
2439 				extra_size |= *mrec++;
2440 			}
2441 
2442 			mrec += extra_size;
2443 
2444 			if (mrec > mrec_end) {
2445 				return(NULL);
2446 			}
2447 
2448 			rec_offs_set_n_fields(offsets, dup->index->n_fields);
2449 			rec_init_offsets_temp(mrec, dup->index, offsets);
2450 
2451 			next_mrec = mrec + rec_offs_data_size(offsets);
2452 
2453 			if (next_mrec > mrec_end) {
2454 				return(NULL);
2455 			}
2456 		}
2457 
2458 		ut_ad(next_mrec <= mrec_end);
2459 		log->head.total += next_mrec - mrec_start;
2460 		dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq);
2461 
2462 		{
2463 			ulint		len;
2464 			const byte*	db_trx_id
2465 				= rec_get_nth_field(
2466 					mrec, offsets, trx_id_col, &len);
2467 			ut_ad(len == DATA_TRX_ID_LEN);
2468 			*error = row_log_table_apply_update(
2469 				thr, new_trx_id_col,
2470 				mrec, offsets, offsets_heap,
2471 				heap, dup, trx_read_trx_id(db_trx_id), old_pk);
2472 		}
2473 
2474 		break;
2475 	}
2476 
2477 	ut_ad(log->head.total <= log->tail.total);
2478 	mem_heap_empty(offsets_heap);
2479 	mem_heap_empty(heap);
2480 	return(next_mrec);
2481 }
2482 
2483 /******************************************************//**
2484 Applies operations to a table was rebuilt.
2485 @return DB_SUCCESS, or error code on failure */
2486 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2487 dberr_t
row_log_table_apply_ops(que_thr_t * thr,row_merge_dup_t * dup)2488 row_log_table_apply_ops(
2489 /*====================*/
2490 	que_thr_t*	thr,	/*!< in: query graph */
2491 	row_merge_dup_t*dup)	/*!< in/out: for reporting duplicate key
2492 				errors */
2493 {
2494 	dberr_t		error;
2495 	const mrec_t*	mrec		= NULL;
2496 	const mrec_t*	next_mrec;
2497 	const mrec_t*	mrec_end	= NULL; /* silence bogus warning */
2498 	const mrec_t*	next_mrec_end;
2499 	mem_heap_t*	heap;
2500 	mem_heap_t*	offsets_heap;
2501 	ulint*		offsets;
2502 	bool		has_index_lock;
2503 	dict_index_t*	index		= const_cast<dict_index_t*>(
2504 		dup->index);
2505 	dict_table_t*	new_table	= index->online_log->table;
2506 	dict_index_t*	new_index	= dict_table_get_first_index(
2507 		new_table);
2508 	const ulint	i		= 1 + REC_OFFS_HEADER_SIZE
2509 		+ ut_max(dict_index_get_n_fields(index),
2510 			 dict_index_get_n_unique(new_index) + 2);
2511 	const ulint	trx_id_col	= dict_col_get_clust_pos(
2512 		dict_table_get_sys_col(index->table, DATA_TRX_ID), index);
2513 	const ulint	new_trx_id_col	= dict_col_get_clust_pos(
2514 		dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index);
2515 	trx_t*		trx		= thr_get_trx(thr);
2516 
2517 	ut_ad(dict_index_is_clust(index));
2518 	ut_ad(dict_index_is_online_ddl(index));
2519 	ut_ad(trx->mysql_thd);
2520 #ifdef UNIV_SYNC_DEBUG
2521 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2522 #endif /* UNIV_SYNC_DEBUG */
2523 	ut_ad(!dict_index_is_online_ddl(new_index));
2524 	ut_ad(trx_id_col > 0);
2525 	ut_ad(trx_id_col != ULINT_UNDEFINED);
2526 	ut_ad(new_trx_id_col > 0);
2527 	ut_ad(new_trx_id_col != ULINT_UNDEFINED);
2528 
2529 	UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
2530 
2531 	offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
2532 	offsets[0] = i;
2533 	offsets[1] = dict_index_get_n_fields(index);
2534 
2535 	heap = mem_heap_create(UNIV_PAGE_SIZE);
2536 	offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
2537 	has_index_lock = true;
2538 
2539 next_block:
2540 	ut_ad(has_index_lock);
2541 #ifdef UNIV_SYNC_DEBUG
2542 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2543 #endif /* UNIV_SYNC_DEBUG */
2544 	ut_ad(index->online_log->head.bytes == 0);
2545 
2546 	if (trx_is_interrupted(trx)) {
2547 		goto interrupted;
2548 	}
2549 
2550 	if (dict_index_is_corrupted(index)) {
2551 		error = DB_INDEX_CORRUPT;
2552 		goto func_exit;
2553 	}
2554 
2555 	ut_ad(dict_index_is_online_ddl(index));
2556 
2557 	error = index->online_log->error;
2558 
2559 	if (error != DB_SUCCESS) {
2560 		goto func_exit;
2561 	}
2562 
2563 	if (UNIV_UNLIKELY(index->online_log->head.blocks
2564 			  > index->online_log->tail.blocks)) {
2565 unexpected_eof:
2566 		fprintf(stderr, "InnoDB: unexpected end of temporary file"
2567 			" for table %s\n", index->table_name);
2568 corruption:
2569 		error = DB_CORRUPTION;
2570 		goto func_exit;
2571 	}
2572 
2573 	if (index->online_log->head.blocks
2574 	    == index->online_log->tail.blocks) {
2575 		if (index->online_log->head.blocks) {
2576 #ifdef HAVE_FTRUNCATE
2577 			/* Truncate the file in order to save space. */
2578 			if (index->online_log->fd != -1
2579 			    && ftruncate(index->online_log->fd, 0) == -1) {
2580 				perror("ftruncate");
2581 			}
2582 #endif /* HAVE_FTRUNCATE */
2583 			index->online_log->head.blocks
2584 				= index->online_log->tail.blocks = 0;
2585 		}
2586 
2587 		next_mrec = index->online_log->tail.block;
2588 		next_mrec_end = next_mrec + index->online_log->tail.bytes;
2589 
2590 		if (next_mrec_end == next_mrec) {
2591 			/* End of log reached. */
2592 all_done:
2593 			ut_ad(has_index_lock);
2594 			ut_ad(index->online_log->head.blocks == 0);
2595 			ut_ad(index->online_log->tail.blocks == 0);
2596 			index->online_log->head.bytes = 0;
2597 			index->online_log->tail.bytes = 0;
2598 			error = DB_SUCCESS;
2599 			goto func_exit;
2600 		}
2601 	} else {
2602 		os_offset_t	ofs;
2603 		ibool		success;
2604 
2605 		ofs = (os_offset_t) index->online_log->head.blocks
2606 			* srv_sort_buf_size;
2607 
2608 		ut_ad(has_index_lock);
2609 		has_index_lock = false;
2610 		rw_lock_x_unlock(dict_index_get_lock(index));
2611 
2612 		log_free_check();
2613 
2614 		ut_ad(dict_index_is_online_ddl(index));
2615 
2616 		if (!row_log_block_allocate(index->online_log->head)) {
2617 			error = DB_OUT_OF_MEMORY;
2618 			goto func_exit;
2619 		}
2620 
2621 		success = os_file_read_no_error_handling_int_fd(
2622 			index->online_log->fd,
2623 			index->online_log->head.block, ofs,
2624 			srv_sort_buf_size);
2625 		if (!success) {
2626 			fprintf(stderr, "InnoDB: unable to read temporary file"
2627 				" for table %s\n", index->table_name);
2628 			goto corruption;
2629 		}
2630 
2631 #ifdef POSIX_FADV_DONTNEED
2632 		/* Each block is read exactly once.  Free up the file cache. */
2633 		posix_fadvise(index->online_log->fd,
2634 			      ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
2635 #endif /* POSIX_FADV_DONTNEED */
2636 #if 0 //def FALLOC_FL_PUNCH_HOLE
2637 		/* Try to deallocate the space for the file on disk.
2638 		This should work on ext4 on Linux 2.6.39 and later,
2639 		and be ignored when the operation is unsupported. */
2640 		fallocate(index->online_log->fd,
2641 			  FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
2642 			  ofs, srv_buf_size);
2643 #endif /* FALLOC_FL_PUNCH_HOLE */
2644 
2645 		next_mrec = index->online_log->head.block;
2646 		next_mrec_end = next_mrec + srv_sort_buf_size;
2647 	}
2648 
2649 	/* This read is not protected by index->online_log->mutex for
2650 	performance reasons. We will eventually notice any error that
2651 	was flagged by a DML thread. */
2652 	error = index->online_log->error;
2653 
2654 	if (error != DB_SUCCESS) {
2655 		goto func_exit;
2656 	}
2657 
2658 	if (mrec) {
2659 		/* A partial record was read from the previous block.
2660 		Copy the temporary buffer full, as we do not know the
2661 		length of the record. Parse subsequent records from
2662 		the bigger buffer index->online_log->head.block
2663 		or index->online_log->tail.block. */
2664 
2665 		ut_ad(mrec == index->online_log->head.buf);
2666 		ut_ad(mrec_end > mrec);
2667 		ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
2668 
2669 		memcpy((mrec_t*) mrec_end, next_mrec,
2670 		       (&index->online_log->head.buf)[1] - mrec_end);
2671 		mrec = row_log_table_apply_op(
2672 			thr, trx_id_col, new_trx_id_col,
2673 			dup, &error, offsets_heap, heap,
2674 			index->online_log->head.buf,
2675 			(&index->online_log->head.buf)[1], offsets);
2676 		if (error != DB_SUCCESS) {
2677 			goto func_exit;
2678 		} else if (UNIV_UNLIKELY(mrec == NULL)) {
2679 			/* The record was not reassembled properly. */
2680 			goto corruption;
2681 		}
2682 		/* The record was previously found out to be
2683 		truncated. Now that the parse buffer was extended,
2684 		it should proceed beyond the old end of the buffer. */
2685 		ut_a(mrec > mrec_end);
2686 
2687 		index->online_log->head.bytes = mrec - mrec_end;
2688 		next_mrec += index->online_log->head.bytes;
2689 	}
2690 
2691 	ut_ad(next_mrec <= next_mrec_end);
2692 	/* The following loop must not be parsing the temporary
2693 	buffer, but head.block or tail.block. */
2694 
2695 	/* mrec!=NULL means that the next record starts from the
2696 	middle of the block */
2697 	ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
2698 
2699 #ifdef UNIV_DEBUG
2700 	if (next_mrec_end == index->online_log->head.block
2701 	    + srv_sort_buf_size) {
2702 		/* If tail.bytes == 0, next_mrec_end can also be at
2703 		the end of tail.block. */
2704 		if (index->online_log->tail.bytes == 0) {
2705 			ut_ad(next_mrec == next_mrec_end);
2706 			ut_ad(index->online_log->tail.blocks == 0);
2707 			ut_ad(index->online_log->head.blocks == 0);
2708 			ut_ad(index->online_log->head.bytes == 0);
2709 		} else {
2710 			ut_ad(next_mrec == index->online_log->head.block
2711 			      + index->online_log->head.bytes);
2712 			ut_ad(index->online_log->tail.blocks
2713 			      > index->online_log->head.blocks);
2714 		}
2715 	} else if (next_mrec_end == index->online_log->tail.block
2716 		   + index->online_log->tail.bytes) {
2717 		ut_ad(next_mrec == index->online_log->tail.block
2718 		      + index->online_log->head.bytes);
2719 		ut_ad(index->online_log->tail.blocks == 0);
2720 		ut_ad(index->online_log->head.blocks == 0);
2721 		ut_ad(index->online_log->head.bytes
2722 		      <= index->online_log->tail.bytes);
2723 	} else {
2724 		ut_error;
2725 	}
2726 #endif /* UNIV_DEBUG */
2727 
2728 	mrec_end = next_mrec_end;
2729 
2730 	while (!trx_is_interrupted(trx)) {
2731 		mrec = next_mrec;
2732 		ut_ad(mrec <= mrec_end);
2733 
2734 		if (mrec == mrec_end) {
2735 			/* We are at the end of the log.
2736 			   Mark the replay all_done. */
2737 			if (has_index_lock) {
2738 				goto all_done;
2739 			}
2740 		}
2741 
2742 		if (!has_index_lock) {
2743 			/* We are applying operations from a different
2744 			block than the one that is being written to.
2745 			We do not hold index->lock in order to
2746 			allow other threads to concurrently buffer
2747 			modifications. */
2748 			ut_ad(mrec >= index->online_log->head.block);
2749 			ut_ad(mrec_end == index->online_log->head.block
2750 			      + srv_sort_buf_size);
2751 			ut_ad(index->online_log->head.bytes
2752 			      < srv_sort_buf_size);
2753 
2754 			/* Take the opportunity to do a redo log
2755 			checkpoint if needed. */
2756 			log_free_check();
2757 		} else {
2758 			/* We are applying operations from the last block.
2759 			Do not allow other threads to buffer anything,
2760 			so that we can finally catch up and synchronize. */
2761 			ut_ad(index->online_log->head.blocks == 0);
2762 			ut_ad(index->online_log->tail.blocks == 0);
2763 			ut_ad(mrec_end == index->online_log->tail.block
2764 			      + index->online_log->tail.bytes);
2765 			ut_ad(mrec >= index->online_log->tail.block);
2766 		}
2767 
2768 		/* This read is not protected by index->online_log->mutex
2769 		for performance reasons. We will eventually notice any
2770 		error that was flagged by a DML thread. */
2771 		error = index->online_log->error;
2772 
2773 		if (error != DB_SUCCESS) {
2774 			goto func_exit;
2775 		}
2776 
2777 		next_mrec = row_log_table_apply_op(
2778 			thr, trx_id_col, new_trx_id_col,
2779 			dup, &error, offsets_heap, heap,
2780 			mrec, mrec_end, offsets);
2781 
2782 		if (error != DB_SUCCESS) {
2783 			goto func_exit;
2784 		} else if (next_mrec == next_mrec_end) {
2785 			/* The record happened to end on a block boundary.
2786 			Do we have more blocks left? */
2787 			if (has_index_lock) {
2788 				/* The index will be locked while
2789 				applying the last block. */
2790 				goto all_done;
2791 			}
2792 
2793 			mrec = NULL;
2794 process_next_block:
2795 			rw_lock_x_lock(dict_index_get_lock(index));
2796 			has_index_lock = true;
2797 
2798 			index->online_log->head.bytes = 0;
2799 			index->online_log->head.blocks++;
2800 			goto next_block;
2801 		} else if (next_mrec != NULL) {
2802 			ut_ad(next_mrec < next_mrec_end);
2803 			index->online_log->head.bytes += next_mrec - mrec;
2804 		} else if (has_index_lock) {
2805 			/* When mrec is within tail.block, it should
2806 			be a complete record, because we are holding
2807 			index->lock and thus excluding the writer. */
2808 			ut_ad(index->online_log->tail.blocks == 0);
2809 			ut_ad(mrec_end == index->online_log->tail.block
2810 			      + index->online_log->tail.bytes);
2811 			ut_ad(0);
2812 			goto unexpected_eof;
2813 		} else {
2814 			memcpy(index->online_log->head.buf, mrec,
2815 			       mrec_end - mrec);
2816 			mrec_end += index->online_log->head.buf - mrec;
2817 			mrec = index->online_log->head.buf;
2818 			goto process_next_block;
2819 		}
2820 	}
2821 
2822 interrupted:
2823 	error = DB_INTERRUPTED;
2824 func_exit:
2825 	if (!has_index_lock) {
2826 		rw_lock_x_lock(dict_index_get_lock(index));
2827 	}
2828 
2829 	mem_heap_free(offsets_heap);
2830 	mem_heap_free(heap);
2831 	row_log_block_free(index->online_log->head);
2832 	ut_free(offsets);
2833 	return(error);
2834 }
2835 
2836 /******************************************************//**
2837 Apply the row_log_table log to a table upon completing rebuild.
2838 @return DB_SUCCESS, or error code on failure */
2839 UNIV_INTERN
2840 dberr_t
row_log_table_apply(que_thr_t * thr,dict_table_t * old_table,struct TABLE * table)2841 row_log_table_apply(
2842 /*================*/
2843 	que_thr_t*	thr,	/*!< in: query graph */
2844 	dict_table_t*	old_table,
2845 				/*!< in: old table */
2846 	struct TABLE*	table)	/*!< in/out: MySQL table
2847 				(for reporting duplicates) */
2848 {
2849 	dberr_t		error;
2850 	dict_index_t*	clust_index;
2851 
2852 	thr_get_trx(thr)->error_key_num = 0;
2853 
2854 #ifdef UNIV_SYNC_DEBUG
2855 	ut_ad(!rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
2856 #endif /* UNIV_SYNC_DEBUG */
2857 	clust_index = dict_table_get_first_index(old_table);
2858 
2859 	rw_lock_x_lock(dict_index_get_lock(clust_index));
2860 
2861 	if (!clust_index->online_log) {
2862 		ut_ad(dict_index_get_online_status(clust_index)
2863 		      == ONLINE_INDEX_COMPLETE);
2864 		/* This function should not be called unless
2865 		rebuilding a table online. Build in some fault
2866 		tolerance. */
2867 		ut_ad(0);
2868 		error = DB_ERROR;
2869 	} else {
2870 		row_merge_dup_t	dup = {
2871 			clust_index, table,
2872 			clust_index->online_log->col_map, 0
2873 		};
2874 
2875 		error = row_log_table_apply_ops(thr, &dup);
2876 
2877 		ut_ad(error != DB_SUCCESS
2878 		      || clust_index->online_log->head.total
2879 		      == clust_index->online_log->tail.total);
2880 	}
2881 
2882 	rw_lock_x_unlock(dict_index_get_lock(clust_index));
2883 	return(error);
2884 }
2885 
2886 /******************************************************//**
2887 Allocate the row log for an index and flag the index
2888 for online creation.
2889 @retval true if success, false if not */
2890 UNIV_INTERN
2891 bool
row_log_allocate(dict_index_t * index,dict_table_t * table,bool same_pk,const dtuple_t * add_cols,const ulint * col_map,const char * path)2892 row_log_allocate(
2893 /*=============*/
2894 	dict_index_t*	index,	/*!< in/out: index */
2895 	dict_table_t*	table,	/*!< in/out: new table being rebuilt,
2896 				or NULL when creating a secondary index */
2897 	bool		same_pk,/*!< in: whether the definition of the
2898 				PRIMARY KEY has remained the same */
2899 	const dtuple_t*	add_cols,
2900 				/*!< in: default values of
2901 				added columns, or NULL */
2902 	const ulint*	col_map,/*!< in: mapping of old column
2903 				numbers to new ones, or NULL if !table */
2904 	const char*	path)	/*!< in: where to create temporary file */
2905 {
2906 	row_log_t*	log;
2907 	DBUG_ENTER("row_log_allocate");
2908 
2909 	ut_ad(!dict_index_is_online_ddl(index));
2910 	ut_ad(dict_index_is_clust(index) == !!table);
2911 	ut_ad(!table || index->table != table);
2912 	ut_ad(same_pk || table);
2913 	ut_ad(!table || col_map);
2914 	ut_ad(!add_cols || col_map);
2915 #ifdef UNIV_SYNC_DEBUG
2916 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2917 #endif /* UNIV_SYNC_DEBUG */
2918 	log = (row_log_t*) ut_malloc(sizeof *log);
2919 	if (!log) {
2920 		DBUG_RETURN(false);
2921 	}
2922 
2923 	log->fd = -1;
2924 	mutex_create(index_online_log_key, &log->mutex,
2925 		     SYNC_INDEX_ONLINE_LOG);
2926 	log->blobs = NULL;
2927 	log->table = table;
2928 	log->same_pk = same_pk;
2929 	log->add_cols = add_cols;
2930 	log->col_map = col_map;
2931 	log->error = DB_SUCCESS;
2932 	log->max_trx = 0;
2933 	log->tail.blocks = log->tail.bytes = 0;
2934 	log->tail.total = 0;
2935 	log->tail.block = log->head.block = NULL;
2936 	log->head.blocks = log->head.bytes = 0;
2937 	log->head.total = 0;
2938 	log->path = path;
2939 	dict_index_set_online_status(index, ONLINE_INDEX_CREATION);
2940 	index->online_log = log;
2941 
2942 	/* While we might be holding an exclusive data dictionary lock
2943 	here, in row_log_abort_sec() we will not always be holding it. Use
2944 	atomic operations in both cases. */
2945 	MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX);
2946 
2947 	DBUG_RETURN(true);
2948 }
2949 
2950 /******************************************************//**
2951 Free the row log for an index that was being created online. */
2952 UNIV_INTERN
2953 void
row_log_free(row_log_t * & log)2954 row_log_free(
2955 /*=========*/
2956 	row_log_t*&	log)	/*!< in,own: row log */
2957 {
2958 	MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX);
2959 
2960 	delete log->blobs;
2961 	row_log_block_free(log->tail);
2962 	row_log_block_free(log->head);
2963 	row_merge_file_destroy_low(log->fd);
2964 	mutex_free(&log->mutex);
2965 	ut_free(log);
2966 	log = 0;
2967 }
2968 
2969 /******************************************************//**
2970 Get the latest transaction ID that has invoked row_log_online_op()
2971 during online creation.
2972 @return latest transaction ID, or 0 if nothing was logged */
2973 UNIV_INTERN
2974 trx_id_t
row_log_get_max_trx(dict_index_t * index)2975 row_log_get_max_trx(
2976 /*================*/
2977 	dict_index_t*	index)	/*!< in: index, must be locked */
2978 {
2979 	ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION);
2980 #ifdef UNIV_SYNC_DEBUG
2981 	ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED)
2982 	       && mutex_own(&index->online_log->mutex))
2983 	      || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
2984 #endif /* UNIV_SYNC_DEBUG */
2985 	return(index->online_log->max_trx);
2986 }
2987 
2988 /******************************************************//**
2989 Applies an operation to a secondary index that was being created. */
2990 static MY_ATTRIBUTE((nonnull))
2991 void
row_log_apply_op_low(dict_index_t * index,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,bool has_index_lock,enum row_op op,trx_id_t trx_id,const dtuple_t * entry)2992 row_log_apply_op_low(
2993 /*=================*/
2994 	dict_index_t*	index,		/*!< in/out: index */
2995 	row_merge_dup_t*dup,		/*!< in/out: for reporting
2996 					duplicate key errors */
2997 	dberr_t*	error,		/*!< out: DB_SUCCESS or error code */
2998 	mem_heap_t*	offsets_heap,	/*!< in/out: memory heap for
2999 					allocating offsets; can be emptied */
3000 	bool		has_index_lock, /*!< in: true if holding index->lock
3001 					in exclusive mode */
3002 	enum row_op	op,		/*!< in: operation being applied */
3003 	trx_id_t	trx_id,		/*!< in: transaction identifier */
3004 	const dtuple_t*	entry)		/*!< in: row */
3005 {
3006 	mtr_t		mtr;
3007 	btr_cur_t	cursor;
3008 	ulint*		offsets = NULL;
3009 
3010 	ut_ad(!dict_index_is_clust(index));
3011 #ifdef UNIV_SYNC_DEBUG
3012 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)
3013 	      == has_index_lock);
3014 #endif /* UNIV_SYNC_DEBUG */
3015 	ut_ad(!dict_index_is_corrupted(index));
3016 	ut_ad(trx_id != 0 || op == ROW_OP_DELETE);
3017 
3018 	mtr_start(&mtr);
3019 
3020 	/* We perform the pessimistic variant of the operations if we
3021 	already hold index->lock exclusively. First, search the
3022 	record. The operation may already have been performed,
3023 	depending on when the row in the clustered index was
3024 	scanned. */
3025 	btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
3026 				    has_index_lock
3027 				    ? BTR_MODIFY_TREE
3028 				    : BTR_MODIFY_LEAF,
3029 				    &cursor, 0, __FILE__, __LINE__,
3030 				    &mtr);
3031 
3032 	ut_ad(dict_index_get_n_unique(index) > 0);
3033 	/* This test is somewhat similar to row_ins_must_modify_rec(),
3034 	but not identical for unique secondary indexes. */
3035 	if (cursor.low_match >= dict_index_get_n_unique(index)
3036 	    && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) {
3037 		/* We have a matching record. */
3038 		bool	exists	= (cursor.low_match
3039 				   == dict_index_get_n_fields(index));
3040 #ifdef UNIV_DEBUG
3041 		rec_t*	rec	= btr_cur_get_rec(&cursor);
3042 		ut_ad(page_rec_is_user_rec(rec));
3043 		ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec)));
3044 #endif /* UNIV_DEBUG */
3045 
3046 		ut_ad(exists || dict_index_is_unique(index));
3047 
3048 		switch (op) {
3049 		case ROW_OP_DELETE:
3050 			if (!exists) {
3051 				/* The existing record matches the
3052 				unique secondary index key, but the
3053 				PRIMARY KEY columns differ. So, this
3054 				exact record does not exist. For
3055 				example, we could detect a duplicate
3056 				key error in some old index before
3057 				logging an ROW_OP_INSERT for our
3058 				index. This ROW_OP_DELETE could have
3059 				been logged for rolling back
3060 				TRX_UNDO_INSERT_REC. */
3061 				goto func_exit;
3062 			}
3063 
3064 			if (btr_cur_optimistic_delete(
3065 				    &cursor, BTR_CREATE_FLAG, &mtr)) {
3066 				*error = DB_SUCCESS;
3067 				break;
3068 			}
3069 
3070 			if (!has_index_lock) {
3071 				/* This needs a pessimistic operation.
3072 				Lock the index tree exclusively. */
3073 				mtr_commit(&mtr);
3074 				mtr_start(&mtr);
3075 				btr_cur_search_to_nth_level(
3076 					index, 0, entry, PAGE_CUR_LE,
3077 					BTR_MODIFY_TREE, &cursor, 0,
3078 					__FILE__, __LINE__, &mtr);
3079 
3080 				/* No other thread than the current one
3081 				is allowed to modify the index tree.
3082 				Thus, the record should still exist. */
3083 				ut_ad(cursor.low_match
3084 				      >= dict_index_get_n_fields(index));
3085 				ut_ad(page_rec_is_user_rec(
3086 					      btr_cur_get_rec(&cursor)));
3087 			}
3088 
3089 			/* As there are no externally stored fields in
3090 			a secondary index record, the parameter
3091 			rb_ctx = RB_NONE will be ignored. */
3092 
3093 			btr_cur_pessimistic_delete(
3094 				error, FALSE, &cursor,
3095 				BTR_CREATE_FLAG, RB_NONE, &mtr);
3096 			break;
3097 		case ROW_OP_INSERT:
3098 			if (exists) {
3099 				/* The record already exists. There
3100 				is nothing to be inserted.
3101 				This could happen when processing
3102 				TRX_UNDO_DEL_MARK_REC in statement
3103 				rollback:
3104 
3105 				UPDATE of PRIMARY KEY can lead to
3106 				statement rollback if the updated
3107 				value of the PRIMARY KEY already
3108 				exists. In this case, the UPDATE would
3109 				be mapped to DELETE;INSERT, and we
3110 				only wrote undo log for the DELETE
3111 				part. The duplicate key error would be
3112 				triggered before logging the INSERT
3113 				part.
3114 
3115 				Theoretically, we could also get a
3116 				similar situation when a DELETE operation
3117 				is blocked by a FOREIGN KEY constraint. */
3118 				goto func_exit;
3119 			}
3120 
3121 			if (dtuple_contains_null(entry)) {
3122 				/* The UNIQUE KEY columns match, but
3123 				there is a NULL value in the key, and
3124 				NULL!=NULL. */
3125 				goto insert_the_rec;
3126 			}
3127 
3128 			goto duplicate;
3129 		}
3130 	} else {
3131 		switch (op) {
3132 			rec_t*		rec;
3133 			big_rec_t*	big_rec;
3134 		case ROW_OP_DELETE:
3135 			/* The record does not exist. For example, we
3136 			could detect a duplicate key error in some old
3137 			index before logging an ROW_OP_INSERT for our
3138 			index. This ROW_OP_DELETE could be logged for
3139 			rolling back TRX_UNDO_INSERT_REC. */
3140 			goto func_exit;
3141 		case ROW_OP_INSERT:
3142 			if (dict_index_is_unique(index)
3143 			    && (cursor.up_match
3144 				>= dict_index_get_n_unique(index)
3145 				|| cursor.low_match
3146 				>= dict_index_get_n_unique(index))
3147 			    && (!index->n_nullable
3148 				|| !dtuple_contains_null(entry))) {
3149 duplicate:
3150 				/* Duplicate key */
3151 				ut_ad(dict_index_is_unique(index));
3152 				row_merge_dup_report(dup, entry->fields);
3153 				*error = DB_DUPLICATE_KEY;
3154 				goto func_exit;
3155 			}
3156 insert_the_rec:
3157 			/* Insert the record. As we are inserting into
3158 			a secondary index, there cannot be externally
3159 			stored columns (!big_rec). */
3160 			*error = btr_cur_optimistic_insert(
3161 				BTR_NO_UNDO_LOG_FLAG
3162 				| BTR_NO_LOCKING_FLAG
3163 				| BTR_CREATE_FLAG,
3164 				&cursor, &offsets, &offsets_heap,
3165 				const_cast<dtuple_t*>(entry),
3166 				&rec, &big_rec, 0, NULL, &mtr);
3167 			ut_ad(!big_rec);
3168 			if (*error != DB_FAIL) {
3169 				break;
3170 			}
3171 
3172 			if (!has_index_lock) {
3173 				/* This needs a pessimistic operation.
3174 				Lock the index tree exclusively. */
3175 				mtr_commit(&mtr);
3176 				mtr_start(&mtr);
3177 				btr_cur_search_to_nth_level(
3178 					index, 0, entry, PAGE_CUR_LE,
3179 					BTR_MODIFY_TREE, &cursor, 0,
3180 					__FILE__, __LINE__, &mtr);
3181 			}
3182 
3183 			/* We already determined that the
3184 			record did not exist. No other thread
3185 			than the current one is allowed to
3186 			modify the index tree. Thus, the
3187 			record should still not exist. */
3188 
3189 			*error = btr_cur_pessimistic_insert(
3190 				BTR_NO_UNDO_LOG_FLAG
3191 				| BTR_NO_LOCKING_FLAG
3192 				| BTR_CREATE_FLAG,
3193 				&cursor, &offsets, &offsets_heap,
3194 				const_cast<dtuple_t*>(entry),
3195 				&rec, &big_rec,
3196 				0, NULL, &mtr);
3197 			ut_ad(!big_rec);
3198 			break;
3199 		}
3200 		mem_heap_empty(offsets_heap);
3201 	}
3202 
3203 	if (*error == DB_SUCCESS && trx_id) {
3204 		page_update_max_trx_id(btr_cur_get_block(&cursor),
3205 				       btr_cur_get_page_zip(&cursor),
3206 				       trx_id, &mtr);
3207 	}
3208 
3209 func_exit:
3210 	mtr_commit(&mtr);
3211 }
3212 
3213 /******************************************************//**
3214 Applies an operation to a secondary index that was being created.
3215 @return NULL on failure (mrec corruption) or when out of data;
3216 pointer to next record on success */
3217 static MY_ATTRIBUTE((nonnull, warn_unused_result))
3218 const mrec_t*
row_log_apply_op(dict_index_t * index,row_merge_dup_t * dup,dberr_t * error,mem_heap_t * offsets_heap,mem_heap_t * heap,bool has_index_lock,const mrec_t * mrec,const mrec_t * mrec_end,ulint * offsets)3219 row_log_apply_op(
3220 /*=============*/
3221 	dict_index_t*	index,		/*!< in/out: index */
3222 	row_merge_dup_t*dup,		/*!< in/out: for reporting
3223 					duplicate key errors */
3224 	dberr_t*	error,		/*!< out: DB_SUCCESS or error code */
3225 	mem_heap_t*	offsets_heap,	/*!< in/out: memory heap for
3226 					allocating offsets; can be emptied */
3227 	mem_heap_t*	heap,		/*!< in/out: memory heap for
3228 					allocating data tuples */
3229 	bool		has_index_lock, /*!< in: true if holding index->lock
3230 					in exclusive mode */
3231 	const mrec_t*	mrec,		/*!< in: merge record */
3232 	const mrec_t*	mrec_end,	/*!< in: end of buffer */
3233 	ulint*		offsets)	/*!< in/out: work area for
3234 					rec_init_offsets_temp() */
3235 
3236 {
3237 	enum row_op	op;
3238 	ulint		extra_size;
3239 	ulint		data_size;
3240 	ulint		n_ext;
3241 	dtuple_t*	entry;
3242 	trx_id_t	trx_id;
3243 
3244 	/* Online index creation is only used for secondary indexes. */
3245 	ut_ad(!dict_index_is_clust(index));
3246 #ifdef UNIV_SYNC_DEBUG
3247 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)
3248 	      == has_index_lock);
3249 #endif /* UNIV_SYNC_DEBUG */
3250 
3251 	if (dict_index_is_corrupted(index)) {
3252 		*error = DB_INDEX_CORRUPT;
3253 		return(NULL);
3254 	}
3255 
3256 	*error = DB_SUCCESS;
3257 
3258 	if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) {
3259 		return(NULL);
3260 	}
3261 
3262 	switch (*mrec) {
3263 	case ROW_OP_INSERT:
3264 		if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) {
3265 			return(NULL);
3266 		}
3267 
3268 		op = static_cast<enum row_op>(*mrec++);
3269 		trx_id = trx_read_trx_id(mrec);
3270 		mrec += DATA_TRX_ID_LEN;
3271 		break;
3272 	case ROW_OP_DELETE:
3273 		op = static_cast<enum row_op>(*mrec++);
3274 		trx_id = 0;
3275 		break;
3276 	default:
3277 corrupted:
3278 		ut_ad(0);
3279 		*error = DB_CORRUPTION;
3280 		return(NULL);
3281 	}
3282 
3283 	extra_size = *mrec++;
3284 
3285 	ut_ad(mrec < mrec_end);
3286 
3287 	if (extra_size >= 0x80) {
3288 		/* Read another byte of extra_size. */
3289 
3290 		extra_size = (extra_size & 0x7f) << 8;
3291 		extra_size |= *mrec++;
3292 	}
3293 
3294 	mrec += extra_size;
3295 
3296 	if (mrec > mrec_end) {
3297 		return(NULL);
3298 	}
3299 
3300 	rec_init_offsets_temp(mrec, index, offsets);
3301 
3302 	if (rec_offs_any_extern(offsets)) {
3303 		/* There should never be any externally stored fields
3304 		in a secondary index, which is what online index
3305 		creation is used for. Therefore, the log file must be
3306 		corrupted. */
3307 		goto corrupted;
3308 	}
3309 
3310 	data_size = rec_offs_data_size(offsets);
3311 
3312 	mrec += data_size;
3313 
3314 	if (mrec > mrec_end) {
3315 		return(NULL);
3316 	}
3317 
3318 	entry = row_rec_to_index_entry_low(
3319 		mrec - data_size, index, offsets, &n_ext, heap);
3320 	/* Online index creation is only implemented for secondary
3321 	indexes, which never contain off-page columns. */
3322 	ut_ad(n_ext == 0);
3323 #ifdef ROW_LOG_APPLY_PRINT
3324 	if (row_log_apply_print) {
3325 		fprintf(stderr, "apply " IB_ID_FMT " " TRX_ID_FMT " %u %u ",
3326 			index->id, trx_id,
3327 			unsigned (op), unsigned (has_index_lock));
3328 		for (const byte* m = mrec - data_size; m < mrec; m++) {
3329 			fprintf(stderr, "%02x", *m);
3330 		}
3331 		putc('\n', stderr);
3332 	}
3333 #endif /* ROW_LOG_APPLY_PRINT */
3334 	row_log_apply_op_low(index, dup, error, offsets_heap,
3335 			     has_index_lock, op, trx_id, entry);
3336 	return(mrec);
3337 }
3338 
3339 /******************************************************//**
3340 Applies operations to a secondary index that was being created.
3341 @return DB_SUCCESS, or error code on failure */
3342 static MY_ATTRIBUTE((nonnull))
3343 dberr_t
row_log_apply_ops(trx_t * trx,dict_index_t * index,row_merge_dup_t * dup)3344 row_log_apply_ops(
3345 /*==============*/
3346 	trx_t*		trx,	/*!< in: transaction (for checking if
3347 				the operation was interrupted) */
3348 	dict_index_t*	index,	/*!< in/out: index */
3349 	row_merge_dup_t*dup)	/*!< in/out: for reporting duplicate key
3350 				errors */
3351 {
3352 	dberr_t		error;
3353 	const mrec_t*	mrec	= NULL;
3354 	const mrec_t*	next_mrec;
3355 	const mrec_t*	mrec_end= NULL; /* silence bogus warning */
3356 	const mrec_t*	next_mrec_end;
3357 	mem_heap_t*	offsets_heap;
3358 	mem_heap_t*	heap;
3359 	ulint*		offsets;
3360 	bool		has_index_lock;
3361 	const ulint	i	= 1 + REC_OFFS_HEADER_SIZE
3362 		+ dict_index_get_n_fields(index);
3363 
3364 	ut_ad(dict_index_is_online_ddl(index));
3365 	ut_ad(*index->name == TEMP_INDEX_PREFIX);
3366 #ifdef UNIV_SYNC_DEBUG
3367 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
3368 #endif /* UNIV_SYNC_DEBUG */
3369 	ut_ad(index->online_log);
3370 	UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
3371 
3372 	offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
3373 	offsets[0] = i;
3374 	offsets[1] = dict_index_get_n_fields(index);
3375 
3376 	offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
3377 	heap = mem_heap_create(UNIV_PAGE_SIZE);
3378 	has_index_lock = true;
3379 
3380 next_block:
3381 	ut_ad(has_index_lock);
3382 #ifdef UNIV_SYNC_DEBUG
3383 	ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
3384 #endif /* UNIV_SYNC_DEBUG */
3385 	ut_ad(index->online_log->head.bytes == 0);
3386 
3387 	if (trx_is_interrupted(trx)) {
3388 		goto interrupted;
3389 	}
3390 
3391 	error = index->online_log->error;
3392 	if (error != DB_SUCCESS) {
3393 		goto func_exit;
3394 	}
3395 
3396 	if (dict_index_is_corrupted(index)) {
3397 		error = DB_INDEX_CORRUPT;
3398 		goto func_exit;
3399 	}
3400 
3401 	if (UNIV_UNLIKELY(index->online_log->head.blocks
3402 			  > index->online_log->tail.blocks)) {
3403 unexpected_eof:
3404 		fprintf(stderr, "InnoDB: unexpected end of temporary file"
3405 			" for index %s\n", index->name + 1);
3406 corruption:
3407 		error = DB_CORRUPTION;
3408 		goto func_exit;
3409 	}
3410 
3411 	if (index->online_log->head.blocks
3412 	    == index->online_log->tail.blocks) {
3413 		if (index->online_log->head.blocks) {
3414 #ifdef HAVE_FTRUNCATE
3415 			/* Truncate the file in order to save space. */
3416 			if (index->online_log->fd != -1
3417 			    && ftruncate(index->online_log->fd, 0) == -1) {
3418 				perror("ftruncate");
3419 			}
3420 #endif /* HAVE_FTRUNCATE */
3421 			index->online_log->head.blocks
3422 				= index->online_log->tail.blocks = 0;
3423 		}
3424 
3425 		next_mrec = index->online_log->tail.block;
3426 		next_mrec_end = next_mrec + index->online_log->tail.bytes;
3427 
3428 		if (next_mrec_end == next_mrec) {
3429 			/* End of log reached. */
3430 all_done:
3431 			ut_ad(has_index_lock);
3432 			ut_ad(index->online_log->head.blocks == 0);
3433 			ut_ad(index->online_log->tail.blocks == 0);
3434 			error = DB_SUCCESS;
3435 			goto func_exit;
3436 		}
3437 	} else {
3438 		os_offset_t	ofs;
3439 		ibool		success;
3440 
3441 		ofs = (os_offset_t) index->online_log->head.blocks
3442 			* srv_sort_buf_size;
3443 
3444 		ut_ad(has_index_lock);
3445 		has_index_lock = false;
3446 		rw_lock_x_unlock(dict_index_get_lock(index));
3447 
3448 		log_free_check();
3449 
3450 		if (!row_log_block_allocate(index->online_log->head)) {
3451 			error = DB_OUT_OF_MEMORY;
3452 			goto func_exit;
3453 		}
3454 
3455 		success = os_file_read_no_error_handling_int_fd(
3456 			index->online_log->fd,
3457 			index->online_log->head.block, ofs,
3458 			srv_sort_buf_size);
3459 
3460 		if (!success) {
3461 			fprintf(stderr, "InnoDB: unable to read temporary file"
3462 				" for index %s\n", index->name + 1);
3463 			goto corruption;
3464 		}
3465 
3466 #ifdef POSIX_FADV_DONTNEED
3467 		/* Each block is read exactly once.  Free up the file cache. */
3468 		posix_fadvise(index->online_log->fd,
3469 			      ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
3470 #endif /* POSIX_FADV_DONTNEED */
3471 #if 0 //def FALLOC_FL_PUNCH_HOLE
3472 		/* Try to deallocate the space for the file on disk.
3473 		This should work on ext4 on Linux 2.6.39 and later,
3474 		and be ignored when the operation is unsupported. */
3475 		fallocate(index->online_log->fd,
3476 			  FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
3477 			  ofs, srv_buf_size);
3478 #endif /* FALLOC_FL_PUNCH_HOLE */
3479 
3480 		next_mrec = index->online_log->head.block;
3481 		next_mrec_end = next_mrec + srv_sort_buf_size;
3482 	}
3483 
3484 	if (mrec) {
3485 		/* A partial record was read from the previous block.
3486 		Copy the temporary buffer full, as we do not know the
3487 		length of the record. Parse subsequent records from
3488 		the bigger buffer index->online_log->head.block
3489 		or index->online_log->tail.block. */
3490 
3491 		ut_ad(mrec == index->online_log->head.buf);
3492 		ut_ad(mrec_end > mrec);
3493 		ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
3494 
3495 		memcpy((mrec_t*) mrec_end, next_mrec,
3496 		       (&index->online_log->head.buf)[1] - mrec_end);
3497 		mrec = row_log_apply_op(
3498 			index, dup, &error, offsets_heap, heap,
3499 			has_index_lock, index->online_log->head.buf,
3500 			(&index->online_log->head.buf)[1], offsets);
3501 		if (error != DB_SUCCESS) {
3502 			goto func_exit;
3503 		} else if (UNIV_UNLIKELY(mrec == NULL)) {
3504 			/* The record was not reassembled properly. */
3505 			goto corruption;
3506 		}
3507 		/* The record was previously found out to be
3508 		truncated. Now that the parse buffer was extended,
3509 		it should proceed beyond the old end of the buffer. */
3510 		ut_a(mrec > mrec_end);
3511 
3512 		index->online_log->head.bytes = mrec - mrec_end;
3513 		next_mrec += index->online_log->head.bytes;
3514 	}
3515 
3516 	ut_ad(next_mrec <= next_mrec_end);
3517 	/* The following loop must not be parsing the temporary
3518 	buffer, but head.block or tail.block. */
3519 
3520 	/* mrec!=NULL means that the next record starts from the
3521 	middle of the block */
3522 	ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
3523 
3524 #ifdef UNIV_DEBUG
3525 	if (next_mrec_end == index->online_log->head.block
3526 	    + srv_sort_buf_size) {
3527 		/* If tail.bytes == 0, next_mrec_end can also be at
3528 		the end of tail.block. */
3529 		if (index->online_log->tail.bytes == 0) {
3530 			ut_ad(next_mrec == next_mrec_end);
3531 			ut_ad(index->online_log->tail.blocks == 0);
3532 			ut_ad(index->online_log->head.blocks == 0);
3533 			ut_ad(index->online_log->head.bytes == 0);
3534 		} else {
3535 			ut_ad(next_mrec == index->online_log->head.block
3536 			      + index->online_log->head.bytes);
3537 			ut_ad(index->online_log->tail.blocks
3538 			      > index->online_log->head.blocks);
3539 		}
3540 	} else if (next_mrec_end == index->online_log->tail.block
3541 		   + index->online_log->tail.bytes) {
3542 		ut_ad(next_mrec == index->online_log->tail.block
3543 		      + index->online_log->head.bytes);
3544 		ut_ad(index->online_log->tail.blocks == 0);
3545 		ut_ad(index->online_log->head.blocks == 0);
3546 		ut_ad(index->online_log->head.bytes
3547 		      <= index->online_log->tail.bytes);
3548 	} else {
3549 		ut_error;
3550 	}
3551 #endif /* UNIV_DEBUG */
3552 
3553 	mrec_end = next_mrec_end;
3554 
3555 	while (!trx_is_interrupted(trx)) {
3556 		mrec = next_mrec;
3557 		ut_ad(mrec < mrec_end);
3558 
3559 		if (!has_index_lock) {
3560 			/* We are applying operations from a different
3561 			block than the one that is being written to.
3562 			We do not hold index->lock in order to
3563 			allow other threads to concurrently buffer
3564 			modifications. */
3565 			ut_ad(mrec >= index->online_log->head.block);
3566 			ut_ad(mrec_end == index->online_log->head.block
3567 			      + srv_sort_buf_size);
3568 			ut_ad(index->online_log->head.bytes
3569 			      < srv_sort_buf_size);
3570 
3571 			/* Take the opportunity to do a redo log
3572 			checkpoint if needed. */
3573 			log_free_check();
3574 		} else {
3575 			/* We are applying operations from the last block.
3576 			Do not allow other threads to buffer anything,
3577 			so that we can finally catch up and synchronize. */
3578 			ut_ad(index->online_log->head.blocks == 0);
3579 			ut_ad(index->online_log->tail.blocks == 0);
3580 			ut_ad(mrec_end == index->online_log->tail.block
3581 			      + index->online_log->tail.bytes);
3582 			ut_ad(mrec >= index->online_log->tail.block);
3583 		}
3584 
3585 		next_mrec = row_log_apply_op(
3586 			index, dup, &error, offsets_heap, heap,
3587 			has_index_lock, mrec, mrec_end, offsets);
3588 
3589 		if (error != DB_SUCCESS) {
3590 			goto func_exit;
3591 		} else if (next_mrec == next_mrec_end) {
3592 			/* The record happened to end on a block boundary.
3593 			Do we have more blocks left? */
3594 			if (has_index_lock) {
3595 				/* The index will be locked while
3596 				applying the last block. */
3597 				goto all_done;
3598 			}
3599 
3600 			mrec = NULL;
3601 process_next_block:
3602 			rw_lock_x_lock(dict_index_get_lock(index));
3603 			has_index_lock = true;
3604 
3605 			index->online_log->head.bytes = 0;
3606 			index->online_log->head.blocks++;
3607 			goto next_block;
3608 		} else if (next_mrec != NULL) {
3609 			ut_ad(next_mrec < next_mrec_end);
3610 			index->online_log->head.bytes += next_mrec - mrec;
3611 		} else if (has_index_lock) {
3612 			/* When mrec is within tail.block, it should
3613 			be a complete record, because we are holding
3614 			index->lock and thus excluding the writer. */
3615 			ut_ad(index->online_log->tail.blocks == 0);
3616 			ut_ad(mrec_end == index->online_log->tail.block
3617 			      + index->online_log->tail.bytes);
3618 			ut_ad(0);
3619 			goto unexpected_eof;
3620 		} else {
3621 			memcpy(index->online_log->head.buf, mrec,
3622 			       mrec_end - mrec);
3623 			mrec_end += index->online_log->head.buf - mrec;
3624 			mrec = index->online_log->head.buf;
3625 			goto process_next_block;
3626 		}
3627 	}
3628 
3629 interrupted:
3630 	error = DB_INTERRUPTED;
3631 func_exit:
3632 	if (!has_index_lock) {
3633 		rw_lock_x_lock(dict_index_get_lock(index));
3634 	}
3635 
3636 	switch (error) {
3637 	case DB_SUCCESS:
3638 		break;
3639 	case DB_INDEX_CORRUPT:
3640 		if (((os_offset_t) index->online_log->tail.blocks + 1)
3641 		    * srv_sort_buf_size >= srv_online_max_size) {
3642 			/* The log file grew too big. */
3643 			error = DB_ONLINE_LOG_TOO_BIG;
3644 		}
3645 		/* fall through */
3646 	default:
3647 		/* We set the flag directly instead of invoking
3648 		dict_set_corrupted_index_cache_only(index) here,
3649 		because the index is not "public" yet. */
3650 		index->type |= DICT_CORRUPT;
3651 	}
3652 
3653 	mem_heap_free(heap);
3654 	mem_heap_free(offsets_heap);
3655 	row_log_block_free(index->online_log->head);
3656 	ut_free(offsets);
3657 	return(error);
3658 }
3659 
3660 /******************************************************//**
3661 Apply the row log to the index upon completing index creation.
3662 @return DB_SUCCESS, or error code on failure */
3663 UNIV_INTERN
3664 dberr_t
row_log_apply(trx_t * trx,dict_index_t * index,struct TABLE * table)3665 row_log_apply(
3666 /*==========*/
3667 	trx_t*		trx,	/*!< in: transaction (for checking if
3668 				the operation was interrupted) */
3669 	dict_index_t*	index,	/*!< in/out: secondary index */
3670 	struct TABLE*	table)	/*!< in/out: MySQL table
3671 				(for reporting duplicates) */
3672 {
3673 	dberr_t		error;
3674 	row_log_t*	log;
3675 	row_merge_dup_t	dup = { index, table, NULL, 0 };
3676 	DBUG_ENTER("row_log_apply");
3677 
3678 	ut_ad(dict_index_is_online_ddl(index));
3679 	ut_ad(!dict_index_is_clust(index));
3680 
3681 	log_free_check();
3682 
3683 	rw_lock_x_lock(dict_index_get_lock(index));
3684 
3685 	if (!dict_table_is_corrupted(index->table)) {
3686 		error = row_log_apply_ops(trx, index, &dup);
3687 	} else {
3688 		error = DB_SUCCESS;
3689 	}
3690 
3691 	if (error != DB_SUCCESS) {
3692 		ut_a(!dict_table_is_discarded(index->table));
3693 		/* We set the flag directly instead of invoking
3694 		dict_set_corrupted_index_cache_only(index) here,
3695 		because the index is not "public" yet. */
3696 		index->type |= DICT_CORRUPT;
3697 		index->table->drop_aborted = TRUE;
3698 
3699 		dict_index_set_online_status(index, ONLINE_INDEX_ABORTED);
3700 	} else {
3701 		ut_ad(dup.n_dup == 0);
3702 		dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE);
3703 	}
3704 
3705 	log = index->online_log;
3706 	index->online_log = NULL;
3707 	/* We could remove the TEMP_INDEX_PREFIX and update the data
3708 	dictionary to say that this index is complete, if we had
3709 	access to the .frm file here.  If the server crashes before
3710 	all requested indexes have been created, this completed index
3711 	will be dropped. */
3712 	rw_lock_x_unlock(dict_index_get_lock(index));
3713 
3714 	row_log_free(log);
3715 
3716 	DBUG_RETURN(error);
3717 }
3718