1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file trx/trx0rec.cc
22 Transaction undo log record
23 
24 Created 3/26/1996 Heikki Tuuri
25 *******************************************************/
26 
27 #include "trx0rec.h"
28 #include "fsp0fsp.h"
29 #include "mach0data.h"
30 #include "trx0undo.h"
31 #include "mtr0log.h"
32 #include "dict0dict.h"
33 #include "ut0mem.h"
34 #include "row0ext.h"
35 #include "row0upd.h"
36 #include "que0que.h"
37 #include "trx0purge.h"
38 #include "trx0rseg.h"
39 #include "row0row.h"
40 #include "row0mysql.h"
41 
42 /** The search tuple corresponding to TRX_UNDO_INSERT_METADATA */
43 const dtuple_t trx_undo_metadata = {
44 	REC_INFO_METADATA, 0, 0,
45 	NULL, 0, NULL
46 #ifdef UNIV_DEBUG
47 	, DATA_TUPLE_MAGIC_N
48 #endif /* UNIV_DEBUG */
49 };
50 
51 /*=========== UNDO LOG RECORD CREATION AND DECODING ====================*/
52 
53 /** Write redo log of writing an undo log record.
54 @param[in]	undo_block	undo log page
55 @param[in]	old_free	start offset of the undo log record
56 @param[in]	new_free	end offset of the undo log record
57 @param[in,out]	mtr		mini-transaction */
trx_undof_page_add_undo_rec_log(const buf_block_t * undo_block,ulint old_free,ulint new_free,mtr_t * mtr)58 static void trx_undof_page_add_undo_rec_log(const buf_block_t* undo_block,
59 					    ulint old_free, ulint new_free,
60 					    mtr_t* mtr)
61 {
62 	ut_ad(old_free >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
63 	ut_ad(new_free >= old_free);
64 	ut_ad(new_free < srv_page_size);
65 	ut_ad(mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
66 			       + undo_block->frame)
67 	      == new_free);
68 	mtr->set_modified();
69 	switch (mtr->get_log_mode()) {
70 	case MTR_LOG_NONE:
71 	case MTR_LOG_NO_REDO:
72 		return;
73 	case MTR_LOG_SHORT_INSERTS:
74 		ut_ad(0);
75 		/* fall through */
76 	case MTR_LOG_ALL:
77 		break;
78 	}
79 
80 	const uint32_t
81 		len = uint32_t(new_free - old_free - 4),
82 		reserved = std::min<uint32_t>(11 + 13 + len,
83 					      mtr->get_log()->MAX_DATA_SIZE);
84 	byte* log_ptr = mtr->get_log()->open(reserved);
85 	const byte* log_end = log_ptr + reserved;
86 	log_ptr = mlog_write_initial_log_record_low(
87 		MLOG_UNDO_INSERT,
88 		undo_block->page.id.space(), undo_block->page.id.page_no(),
89 		log_ptr, mtr);
90 	mach_write_to_2(log_ptr, len);
91 	if (log_ptr + 2 + len <= log_end) {
92 		memcpy(log_ptr + 2, undo_block->frame + old_free + 2, len);
93 		mlog_close(mtr, log_ptr + 2 + len);
94 	} else {
95 		mlog_close(mtr, log_ptr + 2);
96 		mtr->get_log()->push(undo_block->frame + old_free + 2, len);
97 	}
98 }
99 
100 /** Parse MLOG_UNDO_INSERT.
101 @param[in]	ptr	log record
102 @param[in]	end_ptr	end of log record buffer
103 @param[in,out]	page	page or NULL
104 @return	end of log record
105 @retval	NULL	if the log record is incomplete */
106 byte*
trx_undo_parse_add_undo_rec(const byte * ptr,const byte * end_ptr,page_t * page)107 trx_undo_parse_add_undo_rec(
108 	const byte*	ptr,
109 	const byte*	end_ptr,
110 	page_t*		page)
111 {
112 	ulint	len;
113 
114 	if (end_ptr < ptr + 2) {
115 
116 		return(NULL);
117 	}
118 
119 	len = mach_read_from_2(ptr);
120 	ptr += 2;
121 
122 	if (end_ptr < ptr + len) {
123 
124 		return(NULL);
125 	}
126 
127 	if (page) {
128 		ulint first_free = mach_read_from_2(page + TRX_UNDO_PAGE_HDR
129 						    + TRX_UNDO_PAGE_FREE);
130 		byte* rec = page + first_free;
131 
132 		mach_write_to_2(rec, first_free + 4 + len);
133 		mach_write_to_2(rec + 2 + len, first_free);
134 
135 		mach_write_to_2(page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE,
136 				first_free + 4 + len);
137 		memcpy(rec + 2, ptr, len);
138 	}
139 
140 	return(const_cast<byte*>(ptr + len));
141 }
142 
143 /** Calculate the free space left for extending an undo log record.
144 @param block   undo log page
145 @param ptr     current end of the undo page
146 @return bytes left */
trx_undo_left(const buf_block_t * undo_block,const byte * ptr)147 static ulint trx_undo_left(const buf_block_t *undo_block, const byte *ptr)
148 {
149   ut_ad(ptr >= &undo_block->frame[TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE]);
150   /* The 10 is supposed to be an extra safety margin (and needed for
151   compatibility with older versions) */
152   lint left= srv_page_size - (ptr - undo_block->frame) -
153     (10 + FIL_PAGE_DATA_END);
154   ut_ad(left >= 0);
155   return left < 0 ? 0 : static_cast<ulint>(left);
156 }
157 
158 /**********************************************************************//**
159 Set the next and previous pointers in the undo page for the undo record
160 that was written to ptr. Update the first free value by the number of bytes
161 written for this undo record.
162 @return offset of the inserted entry on the page if succeeded, 0 if fail */
163 static
164 ulint
trx_undo_page_set_next_prev_and_add(buf_block_t * undo_block,byte * ptr,mtr_t * mtr)165 trx_undo_page_set_next_prev_and_add(
166 /*================================*/
167 	buf_block_t*	undo_block,	/*!< in/out: undo log page */
168 	byte*		ptr,		/*!< in: ptr up to where data has been
169 					written on this undo page. */
170 	mtr_t*		mtr)		/*!< in: mtr */
171 {
172 	ulint		first_free;	/*!< offset within undo_page */
173 	ulint		end_of_rec;	/*!< offset within undo_page */
174 	byte*		ptr_to_first_free;
175 					/* pointer within undo_page
176 					that points to the next free
177 					offset value within undo_page.*/
178 
179 	ut_ad(ptr > undo_block->frame);
180 	ut_ad(ptr < undo_block->frame + srv_page_size);
181 
182 	if (UNIV_UNLIKELY(trx_undo_left(undo_block, ptr) < 2)) {
183 		return(0);
184 	}
185 
186 	ptr_to_first_free = TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
187 		+ undo_block->frame;
188 
189 	first_free = mach_read_from_2(ptr_to_first_free);
190 
191 	/* Write offset of the previous undo log record */
192 	mach_write_to_2(ptr, first_free);
193 	ptr += 2;
194 
195 	end_of_rec = ulint(ptr - undo_block->frame);
196 
197 	/* Write offset of the next undo log record */
198 	mach_write_to_2(undo_block->frame + first_free, end_of_rec);
199 
200 	/* Update the offset to first free undo record */
201 	mach_write_to_2(ptr_to_first_free, end_of_rec);
202 
203 	/* Write this log entry to the UNDO log */
204 	trx_undof_page_add_undo_rec_log(undo_block, first_free,
205 					end_of_rec, mtr);
206 
207 	return(first_free);
208 }
209 
210 /** Virtual column undo log version. To distinguish it from a length value
211 in 5.7.8 undo log, it starts with 0xF1 */
212 static const ulint VIRTUAL_COL_UNDO_FORMAT_1 = 0xF1;
213 
214 /** Write virtual column index info (index id and column position in index)
215 to the undo log
216 @param[in,out]	undo_block	undo log page
217 @param[in]	table           the table
218 @param[in]	pos		the virtual column position
219 @param[in]      ptr             undo log record being written
220 @param[in]	first_v_col	whether this is the first virtual column
221 				which could start with a version marker
222 @return new undo log pointer */
223 static
224 byte*
trx_undo_log_v_idx(buf_block_t * undo_block,const dict_table_t * table,ulint pos,byte * ptr,bool first_v_col)225 trx_undo_log_v_idx(
226 	buf_block_t*		undo_block,
227 	const dict_table_t*	table,
228 	ulint			pos,
229 	byte*			ptr,
230 	bool			first_v_col)
231 {
232 	ut_ad(pos < table->n_v_def);
233 	dict_v_col_t*	vcol = dict_table_get_nth_v_col(table, pos);
234 
235 	ulint		n_idx = vcol->v_indexes->size();
236 	byte*		old_ptr;
237 
238 	ut_ad(n_idx > 0);
239 
240 	/* Size to reserve, max 5 bytes for each index id and position, plus
241 	5 bytes for num of indexes, 2 bytes for write total length.
242 	1 byte for undo log record format version marker */
243 	ulint		size = n_idx * (5 + 5) + 5 + 2 + (first_v_col ? 1 : 0);
244 
245 	if (trx_undo_left(undo_block, ptr) < size) {
246 		return(NULL);
247 	}
248 
249 	if (first_v_col) {
250 		/* write the version marker */
251 		mach_write_to_1(ptr, VIRTUAL_COL_UNDO_FORMAT_1);
252 
253 		ptr += 1;
254 	}
255 
256 	old_ptr = ptr;
257 
258 	ptr += 2;
259 
260 	ptr += mach_write_compressed(ptr, n_idx);
261 
262 	dict_v_idx_list::iterator       it;
263 
264 	for (it = vcol->v_indexes->begin();
265 	     it != vcol->v_indexes->end(); ++it) {
266 		dict_v_idx_t	v_index = *it;
267 
268 		ptr += mach_write_compressed(
269 			ptr, static_cast<ulint>(v_index.index->id));
270 
271 		ptr += mach_write_compressed(ptr, v_index.nth_field);
272 	}
273 
274 	mach_write_to_2(old_ptr, ulint(ptr - old_ptr));
275 
276 	return(ptr);
277 }
278 
279 /** Read virtual column index from undo log, and verify the column is still
280 indexed, and return its position
281 @param[in]	table		the table
282 @param[in]	ptr		undo log pointer
283 @param[out]	col_pos		the column number or ULINT_UNDEFINED
284 				if the column is not indexed any more
285 @return remaining part of undo log record after reading these values */
286 static
287 const byte*
trx_undo_read_v_idx_low(const dict_table_t * table,const byte * ptr,ulint * col_pos)288 trx_undo_read_v_idx_low(
289 	const dict_table_t*	table,
290 	const byte*		ptr,
291 	ulint*			col_pos)
292 {
293 	ulint		len = mach_read_from_2(ptr);
294 	const byte*	old_ptr = ptr;
295 
296 	*col_pos = ULINT_UNDEFINED;
297 
298 	ptr += 2;
299 
300 	ulint	num_idx = mach_read_next_compressed(&ptr);
301 
302 	ut_ad(num_idx > 0);
303 
304 	dict_index_t*	clust_index = dict_table_get_first_index(table);
305 
306 	for (ulint i = 0; i < num_idx; i++) {
307 		index_id_t	id = mach_read_next_compressed(&ptr);
308 		ulint		pos = mach_read_next_compressed(&ptr);
309 		dict_index_t*	index = dict_table_get_next_index(clust_index);
310 
311 		while (index != NULL) {
312 			/* Return if we find a matching index.
313 			TODO: in the future, it might be worth to add
314 			checks on other indexes */
315 			if (index->id == id) {
316 				const dict_col_t* col = dict_index_get_nth_col(
317 					index, pos);
318 				ut_ad(col->is_virtual());
319 				const dict_v_col_t*	vcol = reinterpret_cast<
320 					const dict_v_col_t*>(col);
321 				*col_pos = vcol->v_pos;
322 				return(old_ptr + len);
323 			}
324 
325 			index = dict_table_get_next_index(index);
326 		}
327 	}
328 
329 	return(old_ptr + len);
330 }
331 
332 /** Read virtual column index from undo log or online log if the log
333 contains such info, and in the undo log case, verify the column is
334 still indexed, and output its position
335 @param[in]	table		the table
336 @param[in]	ptr		undo log pointer
337 @param[in]	first_v_col	if this is the first virtual column, which
338 				has the version marker
339 @param[in,out]	is_undo_log	this function is used to parse both undo log,
340 				and online log for virtual columns. So
341 				check to see if this is undo log. When
342 				first_v_col is true, is_undo_log is output,
343 				when first_v_col is false, is_undo_log is input
344 @param[in,out]	field_no	the column number
345 @return remaining part of undo log record after reading these values */
346 const byte*
trx_undo_read_v_idx(const dict_table_t * table,const byte * ptr,bool first_v_col,bool * is_undo_log,ulint * field_no)347 trx_undo_read_v_idx(
348 	const dict_table_t*	table,
349 	const byte*		ptr,
350 	bool			first_v_col,
351 	bool*			is_undo_log,
352 	ulint*			field_no)
353 {
354 	/* Version marker only put on the first virtual column */
355 	if (first_v_col) {
356 		/* Undo log has the virtual undo log marker */
357 		*is_undo_log = (mach_read_from_1(ptr)
358 				== VIRTUAL_COL_UNDO_FORMAT_1);
359 
360 		if (*is_undo_log) {
361 			ptr += 1;
362 		}
363 	}
364 
365 	if (*is_undo_log) {
366 		ptr = trx_undo_read_v_idx_low(table, ptr, field_no);
367 	} else {
368 		*field_no -= REC_MAX_N_FIELDS;
369 	}
370 
371 	return(ptr);
372 }
373 
374 /** Reports in the undo log of an insert of virtual columns.
375 @param[in]	undo_block	undo log page
376 @param[in]	table		the table
377 @param[in]	row		dtuple contains the virtual columns
378 @param[in,out]	ptr		log ptr
379 @return true if write goes well, false if out of space */
380 static
381 bool
trx_undo_report_insert_virtual(buf_block_t * undo_block,dict_table_t * table,const dtuple_t * row,byte ** ptr)382 trx_undo_report_insert_virtual(
383 	buf_block_t*	undo_block,
384 	dict_table_t*	table,
385 	const dtuple_t*	row,
386 	byte**		ptr)
387 {
388 	byte*	start = *ptr;
389 	bool	first_v_col = true;
390 
391 	if (trx_undo_left(undo_block, *ptr) < 2) {
392 		return(false);
393 	}
394 
395 	/* Reserve 2 bytes to write the number
396 	of bytes the stored fields take in this
397 	undo record */
398 	*ptr += 2;
399 
400 	for (ulint col_no = 0; col_no < dict_table_get_n_v_cols(table);
401 	     col_no++) {
402 		const dict_v_col_t*     col
403 			= dict_table_get_nth_v_col(table, col_no);
404 
405 		if (col->m_col.ord_part) {
406 
407 			/* make sure enought space to write the length */
408 			if (trx_undo_left(undo_block, *ptr) < 5) {
409 				return(false);
410 			}
411 
412 			ulint   pos = col_no;
413 			pos += REC_MAX_N_FIELDS;
414 			*ptr += mach_write_compressed(*ptr, pos);
415 
416 			*ptr = trx_undo_log_v_idx(undo_block, table,
417 						  col_no, *ptr, first_v_col);
418 			first_v_col = false;
419 
420 			if (*ptr == NULL) {
421 				return(false);
422 			}
423 
424 			const dfield_t* vfield = dtuple_get_nth_v_field(
425 				row, col->v_pos);
426 			switch (ulint flen = vfield->len) {
427 			case 0: case UNIV_SQL_NULL:
428 				if (trx_undo_left(undo_block, *ptr) < 5) {
429 					return(false);
430 				}
431 
432 				*ptr += mach_write_compressed(*ptr, flen);
433 				break;
434 			default:
435 				ulint	max_len
436 					= dict_max_v_field_len_store_undo(
437 						table, col_no);
438 
439 				if (flen > max_len) {
440 					flen = max_len;
441 				}
442 
443 				if (trx_undo_left(undo_block, *ptr)
444 				    < flen + 5) {
445 					return(false);
446 				}
447 				*ptr += mach_write_compressed(*ptr, flen);
448 
449 				memcpy(*ptr, vfield->data, flen);
450 				*ptr += flen;
451 			}
452 		}
453 	}
454 
455 	/* Always mark the end of the log with 2 bytes length field */
456 	mach_write_to_2(start, ulint(*ptr - start));
457 
458 	return(true);
459 }
460 
461 /**********************************************************************//**
462 Reports in the undo log of an insert of a clustered index record.
463 @return offset of the inserted entry on the page if succeed, 0 if fail */
464 static
465 ulint
trx_undo_page_report_insert(buf_block_t * undo_block,trx_t * trx,dict_index_t * index,const dtuple_t * clust_entry,mtr_t * mtr)466 trx_undo_page_report_insert(
467 /*========================*/
468 	buf_block_t*	undo_block,	/*!< in: undo log page */
469 	trx_t*		trx,		/*!< in: transaction */
470 	dict_index_t*	index,		/*!< in: clustered index */
471 	const dtuple_t*	clust_entry,	/*!< in: index entry which will be
472 					inserted to the clustered index */
473 	mtr_t*		mtr)		/*!< in: mtr */
474 {
475 	ulint		first_free;
476 	byte*		ptr;
477 	ulint		i;
478 
479 	ut_ad(dict_index_is_clust(index));
480 	/* MariaDB 10.3.1+ in trx_undo_page_init() always initializes
481 	TRX_UNDO_PAGE_TYPE as 0, but previous versions wrote
482 	TRX_UNDO_INSERT == 1 into insert_undo pages,
483 	or TRX_UNDO_UPDATE == 2 into update_undo pages. */
484 	ut_ad(mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE
485 			       + undo_block->frame) <= 2);
486 
487 	first_free = mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
488 				      + undo_block->frame);
489 	ptr = undo_block->frame + first_free;
490 
491 	ut_ad(first_free <= srv_page_size);
492 
493 	if (trx_undo_left(undo_block, ptr) < 2 + 1 + 11 + 11) {
494 		/* Not enough space for writing the general parameters */
495 		return(0);
496 	}
497 
498 	/* Reserve 2 bytes for the pointer to the next undo log record */
499 	ptr += 2;
500 
501 	/* Store first some general parameters to the undo log */
502 	*ptr++ = TRX_UNDO_INSERT_REC;
503 	ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
504 	ptr += mach_u64_write_much_compressed(ptr, index->table->id);
505 	/*----------------------------------------*/
506 	/* Store then the fields required to uniquely determine the record
507 	to be inserted in the clustered index */
508 	if (UNIV_UNLIKELY(clust_entry->info_bits != 0)) {
509 		ut_ad(clust_entry->info_bits == REC_INFO_METADATA);
510 		ut_ad(index->is_instant());
511 		ut_ad(undo_block->frame[first_free + 2]
512 		      == TRX_UNDO_INSERT_REC);
513 		undo_block->frame[first_free + 2] = TRX_UNDO_INSERT_METADATA;
514 		goto done;
515 	}
516 
517 	for (i = 0; i < dict_index_get_n_unique(index); i++) {
518 
519 		const dfield_t*	field	= dtuple_get_nth_field(clust_entry, i);
520 		ulint		flen	= dfield_get_len(field);
521 
522 		if (trx_undo_left(undo_block, ptr) < 5) {
523 
524 			return(0);
525 		}
526 
527 		ptr += mach_write_compressed(ptr, flen);
528 
529 		switch (flen) {
530 		case 0: case UNIV_SQL_NULL:
531 			break;
532 		default:
533 			if (trx_undo_left(undo_block, ptr) < flen) {
534 
535 				return(0);
536 			}
537 
538 			memcpy(ptr, dfield_get_data(field), flen);
539 			ptr += flen;
540 		}
541 	}
542 
543 	if (index->table->n_v_cols) {
544 		if (!trx_undo_report_insert_virtual(
545 			undo_block, index->table, clust_entry, &ptr)) {
546 			return(0);
547 		}
548 	}
549 
550 done:
551 	return(trx_undo_page_set_next_prev_and_add(undo_block, ptr, mtr));
552 }
553 
554 /**********************************************************************//**
555 Reads from an undo log record the general parameters.
556 @return remaining part of undo log record after reading these values */
557 byte*
trx_undo_rec_get_pars(trx_undo_rec_t * undo_rec,ulint * type,ulint * cmpl_info,bool * updated_extern,undo_no_t * undo_no,table_id_t * table_id)558 trx_undo_rec_get_pars(
559 /*==================*/
560 	trx_undo_rec_t*	undo_rec,	/*!< in: undo log record */
561 	ulint*		type,		/*!< out: undo record type:
562 					TRX_UNDO_INSERT_REC, ... */
563 	ulint*		cmpl_info,	/*!< out: compiler info, relevant only
564 					for update type records */
565 	bool*		updated_extern,	/*!< out: true if we updated an
566 					externally stored fild */
567 	undo_no_t*	undo_no,	/*!< out: undo log record number */
568 	table_id_t*	table_id)	/*!< out: table id */
569 {
570 	const byte*	ptr;
571 	ulint		type_cmpl;
572 
573 	ptr = undo_rec + 2;
574 
575 	type_cmpl = mach_read_from_1(ptr);
576 	ptr++;
577 
578 	*updated_extern = !!(type_cmpl & TRX_UNDO_UPD_EXTERN);
579 	type_cmpl &= ~TRX_UNDO_UPD_EXTERN;
580 
581 	*type = type_cmpl & (TRX_UNDO_CMPL_INFO_MULT - 1);
582 	*cmpl_info = type_cmpl / TRX_UNDO_CMPL_INFO_MULT;
583 
584 	*undo_no = mach_read_next_much_compressed(&ptr);
585 	*table_id = mach_read_next_much_compressed(&ptr);
586 
587 	return(const_cast<byte*>(ptr));
588 }
589 
590 /** Read from an undo log record a non-virtual column value.
591 @param[in,out]	ptr		pointer to remaining part of the undo record
592 @param[in,out]	field		stored field
593 @param[in,out]	len		length of the field, or UNIV_SQL_NULL
594 @param[in,out]	orig_len	original length of the locally stored part
595 of an externally stored column, or 0
596 @return remaining part of undo log record after reading these values */
597 byte*
trx_undo_rec_get_col_val(const byte * ptr,const byte ** field,ulint * len,ulint * orig_len)598 trx_undo_rec_get_col_val(
599 	const byte*	ptr,
600 	const byte**	field,
601 	ulint*		len,
602 	ulint*		orig_len)
603 {
604 	*len = mach_read_next_compressed(&ptr);
605 	*orig_len = 0;
606 
607 	switch (*len) {
608 	case UNIV_SQL_NULL:
609 		*field = NULL;
610 		break;
611 	case UNIV_EXTERN_STORAGE_FIELD:
612 		*orig_len = mach_read_next_compressed(&ptr);
613 		*len = mach_read_next_compressed(&ptr);
614 		*field = ptr;
615 		ptr += *len & ~SPATIAL_STATUS_MASK;
616 
617 		ut_ad(*orig_len >= BTR_EXTERN_FIELD_REF_SIZE);
618 		ut_ad(*len > *orig_len);
619 		/* @see dtuple_convert_big_rec() */
620 		ut_ad(*len >= BTR_EXTERN_FIELD_REF_SIZE);
621 
622 		/* we do not have access to index->table here
623 		ut_ad(dict_table_has_atomic_blobs(index->table)
624 		      || *len >= col->max_prefix
625 		      + BTR_EXTERN_FIELD_REF_SIZE);
626 		*/
627 
628 		*len += UNIV_EXTERN_STORAGE_FIELD;
629 		break;
630 	default:
631 		*field = ptr;
632 		if (*len >= UNIV_EXTERN_STORAGE_FIELD) {
633 			ptr += (*len - UNIV_EXTERN_STORAGE_FIELD)
634 				& ~SPATIAL_STATUS_MASK;
635 		} else {
636 			ptr += *len;
637 		}
638 	}
639 
640 	return(const_cast<byte*>(ptr));
641 }
642 
643 /*******************************************************************//**
644 Builds a row reference from an undo log record.
645 @return pointer to remaining part of undo record */
646 byte*
trx_undo_rec_get_row_ref(byte * ptr,dict_index_t * index,const dtuple_t ** ref,mem_heap_t * heap)647 trx_undo_rec_get_row_ref(
648 /*=====================*/
649 	byte*		ptr,	/*!< in: remaining part of a copy of an undo log
650 				record, at the start of the row reference;
651 				NOTE that this copy of the undo log record must
652 				be preserved as long as the row reference is
653 				used, as we do NOT copy the data in the
654 				record! */
655 	dict_index_t*	index,	/*!< in: clustered index */
656 	const dtuple_t**ref,	/*!< out, own: row reference */
657 	mem_heap_t*	heap)	/*!< in: memory heap from which the memory
658 				needed is allocated */
659 {
660 	ulint		ref_len;
661 	ulint		i;
662 
663 	ut_ad(index && ptr && ref && heap);
664 	ut_a(dict_index_is_clust(index));
665 
666 	ref_len = dict_index_get_n_unique(index);
667 
668 	dtuple_t* tuple = dtuple_create(heap, ref_len);
669 	*ref = tuple;
670 
671 	dict_index_copy_types(tuple, index, ref_len);
672 
673 	for (i = 0; i < ref_len; i++) {
674 		const byte*	field;
675 		ulint		len;
676 		ulint		orig_len;
677 
678 		dfield_t* dfield = dtuple_get_nth_field(tuple, i);
679 
680 		ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
681 
682 		dfield_set_data(dfield, field, len);
683 	}
684 
685 	return(ptr);
686 }
687 
688 /*******************************************************************//**
689 Skips a row reference from an undo log record.
690 @return pointer to remaining part of undo record */
691 static
692 byte*
trx_undo_rec_skip_row_ref(byte * ptr,dict_index_t * index)693 trx_undo_rec_skip_row_ref(
694 /*======================*/
695 	byte*		ptr,	/*!< in: remaining part in update undo log
696 				record, at the start of the row reference */
697 	dict_index_t*	index)	/*!< in: clustered index */
698 {
699 	ulint	ref_len;
700 	ulint	i;
701 
702 	ut_ad(index && ptr);
703 	ut_a(dict_index_is_clust(index));
704 
705 	ref_len = dict_index_get_n_unique(index);
706 
707 	for (i = 0; i < ref_len; i++) {
708 		const byte*	field;
709 		ulint		len;
710 		ulint		orig_len;
711 
712 		ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
713 	}
714 
715 	return(ptr);
716 }
717 
718 /** Fetch a prefix of an externally stored column, for writing to the undo
719 log of an update or delete marking of a clustered index record.
720 @param[out]	ext_buf		buffer to hold the prefix data and BLOB pointer
721 @param[in]	prefix_len	prefix size to store in the undo log
722 @param[in]	page_size	page size
723 @param[in]	field		an externally stored column
724 @param[in,out]	len		input: length of field; output: used length of
725 ext_buf
726 @return ext_buf */
727 static
728 byte*
trx_undo_page_fetch_ext(byte * ext_buf,ulint prefix_len,const page_size_t & page_size,const byte * field,ulint * len)729 trx_undo_page_fetch_ext(
730 	byte*			ext_buf,
731 	ulint			prefix_len,
732 	const page_size_t&	page_size,
733 	const byte*		field,
734 	ulint*			len)
735 {
736 	/* Fetch the BLOB. */
737 	ulint	ext_len = btr_copy_externally_stored_field_prefix(
738 		ext_buf, prefix_len, page_size, field, *len);
739 	/* BLOBs should always be nonempty. */
740 	ut_a(ext_len);
741 	/* Append the BLOB pointer to the prefix. */
742 	memcpy(ext_buf + ext_len,
743 	       field + *len - BTR_EXTERN_FIELD_REF_SIZE,
744 	       BTR_EXTERN_FIELD_REF_SIZE);
745 	*len = ext_len + BTR_EXTERN_FIELD_REF_SIZE;
746 	return(ext_buf);
747 }
748 
749 /** Writes to the undo log a prefix of an externally stored column.
750 @param[out]	ptr		undo log position, at least 15 bytes must be
751 available
752 @param[out]	ext_buf		a buffer of DICT_MAX_FIELD_LEN_BY_FORMAT()
753 				size, or NULL when should not fetch a longer
754 				prefix
755 @param[in]	prefix_len	prefix size to store in the undo log
756 @param[in]	page_size	page size
757 @param[in,out]	field		the locally stored part of the externally
758 stored column
759 @param[in,out]	len		length of field, in bytes
760 @param[in]	spatial_status	whether the column is used by spatial index or
761 				regular index
762 @return undo log position */
763 static
764 byte*
trx_undo_page_report_modify_ext(byte * ptr,byte * ext_buf,ulint prefix_len,const page_size_t & page_size,const byte ** field,ulint * len,spatial_status_t spatial_status)765 trx_undo_page_report_modify_ext(
766 	byte*			ptr,
767 	byte*			ext_buf,
768 	ulint			prefix_len,
769 	const page_size_t&	page_size,
770 	const byte**		field,
771 	ulint*			len,
772 	spatial_status_t	spatial_status)
773 {
774 	ulint	spatial_len= 0;
775 
776 	switch (spatial_status) {
777 	case SPATIAL_UNKNOWN:
778 	case SPATIAL_NONE:
779 		break;
780 
781 	case SPATIAL_MIXED:
782 	case SPATIAL_ONLY:
783 		spatial_len = DATA_MBR_LEN;
784 		break;
785 	}
786 
787 	/* Encode spatial status into length. */
788 	spatial_len |= ulint(spatial_status) << SPATIAL_STATUS_SHIFT;
789 
790 	if (spatial_status == SPATIAL_ONLY) {
791 		/* If the column is only used by gis index, log its
792 		MBR is enough.*/
793 		ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD
794 					     + spatial_len);
795 
796 		return(ptr);
797 	}
798 
799 	if (ext_buf) {
800 		ut_a(prefix_len > 0);
801 
802 		/* If an ordering column is externally stored, we will
803 		have to store a longer prefix of the field.  In this
804 		case, write to the log a marker followed by the
805 		original length and the real length of the field. */
806 		ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD);
807 
808 		ptr += mach_write_compressed(ptr, *len);
809 
810 		*field = trx_undo_page_fetch_ext(ext_buf, prefix_len,
811 						 page_size, *field, len);
812 
813 		ptr += mach_write_compressed(ptr, *len + spatial_len);
814 	} else {
815 		ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD
816 					     + *len + spatial_len);
817 	}
818 
819 	return(ptr);
820 }
821 
822 /** Get MBR from a Geometry column stored externally
823 @param[out]	mbr		MBR to fill
824 @param[in]	pagesize	table pagesize
825 @param[in]	field		field contain the geometry data
826 @param[in,out]	len		length of field, in bytes
827 */
828 static
829 void
trx_undo_get_mbr_from_ext(double * mbr,const page_size_t & page_size,const byte * field,ulint * len)830 trx_undo_get_mbr_from_ext(
831 /*======================*/
832 	double*			mbr,
833 	const page_size_t&      page_size,
834 	const byte*             field,
835 	ulint*			len)
836 {
837 	uchar*		dptr = NULL;
838 	ulint		dlen;
839 	mem_heap_t*	heap = mem_heap_create(100);
840 
841 	dptr = btr_copy_externally_stored_field(
842 		&dlen, field, page_size, *len, heap);
843 
844 	if (dlen <= GEO_DATA_HEADER_SIZE) {
845 		for (uint i = 0; i < SPDIMS; ++i) {
846 			mbr[i * 2] = DBL_MAX;
847 			mbr[i * 2 + 1] = -DBL_MAX;
848 		}
849 	} else {
850 		rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
851 				   static_cast<uint>(dlen
852 				   - GEO_DATA_HEADER_SIZE), SPDIMS, mbr);
853 	}
854 
855 	mem_heap_free(heap);
856 }
857 
858 /**********************************************************************//**
859 Reports in the undo log of an update or delete marking of a clustered index
860 record.
861 @return byte offset of the inserted undo log entry on the page if
862 succeed, 0 if fail */
863 static
864 ulint
trx_undo_page_report_modify(buf_block_t * undo_block,trx_t * trx,dict_index_t * index,const rec_t * rec,const rec_offs * offsets,const upd_t * update,ulint cmpl_info,const dtuple_t * row,mtr_t * mtr)865 trx_undo_page_report_modify(
866 /*========================*/
867 	buf_block_t*	undo_block,	/*!< in: undo log page */
868 	trx_t*		trx,		/*!< in: transaction */
869 	dict_index_t*	index,		/*!< in: clustered index where update or
870 					delete marking is done */
871 	const rec_t*	rec,		/*!< in: clustered index record which
872 					has NOT yet been modified */
873 	const rec_offs*	offsets,	/*!< in: rec_get_offsets(rec, index) */
874 	const upd_t*	update,		/*!< in: update vector which tells the
875 					columns to be updated; in the case of
876 					a delete, this should be set to NULL */
877 	ulint		cmpl_info,	/*!< in: compiler info on secondary
878 					index updates */
879 	const dtuple_t*	row,		/*!< in: clustered index row contains
880 					virtual column info */
881 	mtr_t*		mtr)		/*!< in: mtr */
882 {
883 	ulint		first_free;
884 	byte*		ptr;
885 
886 	ut_ad(index->is_primary());
887 	ut_ad(rec_offs_validate(rec, index, offsets));
888 	/* MariaDB 10.3.1+ in trx_undo_page_init() always initializes
889 	TRX_UNDO_PAGE_TYPE as 0, but previous versions wrote
890 	TRX_UNDO_INSERT == 1 into insert_undo pages,
891 	or TRX_UNDO_UPDATE == 2 into update_undo pages. */
892 	ut_ad(mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE
893 			       + undo_block->frame) <= 2);
894 
895 	first_free = mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
896 				      + undo_block->frame);
897 	ptr = undo_block->frame + first_free;
898 
899 	ut_ad(first_free <= srv_page_size);
900 
901 	if (trx_undo_left(undo_block, ptr) < 50) {
902 		/* NOTE: the value 50 must be big enough so that the general
903 		fields written below fit on the undo log page */
904 		return 0;
905 	}
906 
907 	/* Reserve 2 bytes for the pointer to the next undo log record */
908 	ptr += 2;
909 
910 	dict_table_t*	table		= index->table;
911 	const byte*	field;
912 	ulint		flen;
913 	ulint		col_no;
914 	ulint		type_cmpl;
915 	byte*		type_cmpl_ptr;
916 	ulint		i;
917 	trx_id_t	trx_id;
918 	ibool		ignore_prefix = FALSE;
919 	byte		ext_buf[REC_VERSION_56_MAX_INDEX_COL_LEN
920 				+ BTR_EXTERN_FIELD_REF_SIZE];
921 	bool		first_v_col = true;
922 
923 	/* Store first some general parameters to the undo log */
924 
925 	if (!update) {
926 		ut_ad(!rec_get_deleted_flag(rec, dict_table_is_comp(table)));
927 		type_cmpl = TRX_UNDO_DEL_MARK_REC;
928 	} else if (rec_get_deleted_flag(rec, dict_table_is_comp(table))) {
929 		/* In delete-marked records, DB_TRX_ID must
930 		always refer to an existing update_undo log record. */
931 		ut_ad(row_get_rec_trx_id(rec, index, offsets));
932 
933 		type_cmpl = TRX_UNDO_UPD_DEL_REC;
934 		/* We are about to update a delete marked record.
935 		We don't typically need the prefix in this case unless
936 		the delete marking is done by the same transaction
937 		(which we check below). */
938 		ignore_prefix = TRUE;
939 	} else {
940 		type_cmpl = TRX_UNDO_UPD_EXIST_REC;
941 	}
942 
943 	type_cmpl |= cmpl_info * TRX_UNDO_CMPL_INFO_MULT;
944 	type_cmpl_ptr = ptr;
945 
946 	*ptr++ = (byte) type_cmpl;
947 	ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
948 
949 	ptr += mach_u64_write_much_compressed(ptr, table->id);
950 
951 	/*----------------------------------------*/
952 	/* Store the state of the info bits */
953 
954 	*ptr++ = (byte) rec_get_info_bits(rec, dict_table_is_comp(table));
955 
956 	/* Store the values of the system columns */
957 	field = rec_get_nth_field(rec, offsets,
958 				  dict_index_get_sys_col_pos(
959 					  index, DATA_TRX_ID), &flen);
960 	ut_ad(flen == DATA_TRX_ID_LEN);
961 
962 	trx_id = trx_read_trx_id(field);
963 
964 	/* If it is an update of a delete marked record, then we are
965 	allowed to ignore blob prefixes if the delete marking was done
966 	by some other trx as it must have committed by now for us to
967 	allow an over-write. */
968 	if (trx_id == trx->id) {
969 		ignore_prefix = false;
970 	}
971 	ptr += mach_u64_write_compressed(ptr, trx_id);
972 
973 	field = rec_get_nth_field(rec, offsets,
974 				  dict_index_get_sys_col_pos(
975 					  index, DATA_ROLL_PTR), &flen);
976 	ut_ad(flen == DATA_ROLL_PTR_LEN);
977 	ut_ad(memcmp(field, field_ref_zero, DATA_ROLL_PTR_LEN));
978 
979 	ptr += mach_u64_write_compressed(ptr, trx_read_roll_ptr(field));
980 
981 	/*----------------------------------------*/
982 	/* Store then the fields required to uniquely determine the
983 	record which will be modified in the clustered index */
984 
985 	for (i = 0; i < dict_index_get_n_unique(index); i++) {
986 
987 		/* The ordering columns must not be instant added columns. */
988 		ut_ad(!rec_offs_nth_default(offsets, i));
989 		field = rec_get_nth_field(rec, offsets, i, &flen);
990 
991 		/* The ordering columns must not be stored externally. */
992 		ut_ad(!rec_offs_nth_extern(offsets, i));
993 		ut_ad(dict_index_get_nth_col(index, i)->ord_part);
994 
995 		if (trx_undo_left(undo_block, ptr) < 5) {
996 			return(0);
997 		}
998 
999 		ptr += mach_write_compressed(ptr, flen);
1000 
1001 		if (flen != UNIV_SQL_NULL) {
1002 			if (trx_undo_left(undo_block, ptr) < flen) {
1003 				return(0);
1004 			}
1005 
1006 			memcpy(ptr, field, flen);
1007 			ptr += flen;
1008 		}
1009 	}
1010 
1011 	/*----------------------------------------*/
1012 	/* Save to the undo log the old values of the columns to be updated. */
1013 
1014 	if (update) {
1015 		if (trx_undo_left(undo_block, ptr) < 5) {
1016 			return(0);
1017 		}
1018 
1019 		ulint	n_updated = upd_get_n_fields(update);
1020 
1021 		/* If this is an online update while an inplace alter table
1022 		is in progress and the table has virtual column, we will
1023 		need to double check if there are any non-indexed columns
1024 		being registered in update vector in case they will be indexed
1025 		in new table */
1026 		if (dict_index_is_online_ddl(index) && table->n_v_cols > 0) {
1027 			for (i = 0; i < upd_get_n_fields(update); i++) {
1028 				upd_field_t*	fld = upd_get_nth_field(
1029 					update, i);
1030 				ulint		pos = fld->field_no;
1031 
1032 				/* These columns must not have an index
1033 				on them */
1034 				if (upd_fld_is_virtual_col(fld)
1035 				    && dict_table_get_nth_v_col(
1036 					table, pos)->v_indexes->empty()) {
1037 					n_updated--;
1038 				}
1039 			}
1040 		}
1041 
1042 		ptr += mach_write_compressed(ptr, n_updated);
1043 
1044 		for (i = 0; i < upd_get_n_fields(update); i++) {
1045 			upd_field_t*	fld = upd_get_nth_field(update, i);
1046 
1047 			bool	is_virtual = upd_fld_is_virtual_col(fld);
1048 			ulint	max_v_log_len = 0;
1049 
1050 			ulint	pos = fld->field_no;
1051 
1052 			/* Write field number to undo log */
1053 			if (trx_undo_left(undo_block, ptr) < 5) {
1054 				return(0);
1055 			}
1056 
1057 			if (is_virtual) {
1058 				/* Skip the non-indexed column, during
1059 				an online alter table */
1060 				if (dict_index_is_online_ddl(index)
1061 				    && dict_table_get_nth_v_col(
1062 					table, pos)->v_indexes->empty()) {
1063 					continue;
1064 				}
1065 
1066 				/* add REC_MAX_N_FIELDS to mark this
1067 				is a virtual col */
1068 				pos += REC_MAX_N_FIELDS;
1069 			}
1070 
1071 			ptr += mach_write_compressed(ptr, pos);
1072 
1073 			/* Save the old value of field */
1074 			if (is_virtual) {
1075 				ut_ad(fld->field_no < table->n_v_def);
1076 
1077 				ptr = trx_undo_log_v_idx(undo_block, table,
1078 							 fld->field_no, ptr,
1079 							 first_v_col);
1080 				if (ptr == NULL) {
1081 					 return(0);
1082 				}
1083 				first_v_col = false;
1084 
1085 				max_v_log_len
1086 					= dict_max_v_field_len_store_undo(
1087 						table, fld->field_no);
1088 
1089 				field = static_cast<byte*>(
1090 					fld->old_v_val->data);
1091 				flen = fld->old_v_val->len;
1092 
1093 				/* Only log sufficient bytes for index
1094 				record update */
1095 				if (flen != UNIV_SQL_NULL) {
1096 					flen = ut_min(
1097 						flen, max_v_log_len);
1098 				}
1099 			} else {
1100 				field = rec_get_nth_cfield(
1101 					rec, index, offsets, pos, &flen);
1102 			}
1103 
1104 			if (trx_undo_left(undo_block, ptr) < 15) {
1105 				return(0);
1106 			}
1107 
1108 			if (!is_virtual && rec_offs_nth_extern(offsets, pos)) {
1109 				const dict_col_t*	col
1110 					= dict_index_get_nth_col(index, pos);
1111 				ulint			prefix_len
1112 					= dict_max_field_len_store_undo(
1113 						table, col);
1114 
1115 				ut_ad(prefix_len + BTR_EXTERN_FIELD_REF_SIZE
1116 				      <= sizeof ext_buf);
1117 
1118 				ptr = trx_undo_page_report_modify_ext(
1119 					ptr,
1120 					col->ord_part
1121 					&& !ignore_prefix
1122 					&& flen < REC_ANTELOPE_MAX_INDEX_COL_LEN
1123 					? ext_buf : NULL, prefix_len,
1124 					dict_table_page_size(table),
1125 					&field, &flen, SPATIAL_UNKNOWN);
1126 
1127 				*type_cmpl_ptr |= TRX_UNDO_UPD_EXTERN;
1128 			} else {
1129 				ptr += mach_write_compressed(ptr, flen);
1130 			}
1131 
1132 			if (flen != UNIV_SQL_NULL) {
1133 				if (trx_undo_left(undo_block, ptr) < flen) {
1134 					return(0);
1135 				}
1136 
1137 				memcpy(ptr, field, flen);
1138 				ptr += flen;
1139 			}
1140 
1141 			/* Also record the new value for virtual column */
1142 			if (is_virtual) {
1143 				field = static_cast<byte*>(fld->new_val.data);
1144 				flen = fld->new_val.len;
1145 				if (flen != UNIV_SQL_NULL) {
1146 					flen = ut_min(
1147 						flen, max_v_log_len);
1148 				}
1149 
1150 				if (trx_undo_left(undo_block, ptr) < 15) {
1151 					return(0);
1152 				}
1153 
1154 				ptr += mach_write_compressed(ptr, flen);
1155 
1156 				if (flen != UNIV_SQL_NULL) {
1157 					if (trx_undo_left(undo_block, ptr)
1158 					    < flen) {
1159 						return(0);
1160 					}
1161 
1162 					memcpy(ptr, field, flen);
1163 					ptr += flen;
1164 				}
1165 			}
1166 		}
1167 	}
1168 
1169 	/* Reset the first_v_col, so to put the virtual column undo
1170 	version marker again, when we log all the indexed columns */
1171 	first_v_col = true;
1172 
1173 	/*----------------------------------------*/
1174 	/* In the case of a delete marking, and also in the case of an update
1175 	where any ordering field of any index changes, store the values of all
1176 	columns which occur as ordering fields in any index. This info is used
1177 	in the purge of old versions where we use it to build and search the
1178 	delete marked index records, to look if we can remove them from the
1179 	index tree. Note that starting from 4.0.14 also externally stored
1180 	fields can be ordering in some index. Starting from 5.2, we no longer
1181 	store REC_MAX_INDEX_COL_LEN first bytes to the undo log record,
1182 	but we can construct the column prefix fields in the index by
1183 	fetching the first page of the BLOB that is pointed to by the
1184 	clustered index. This works also in crash recovery, because all pages
1185 	(including BLOBs) are recovered before anything is rolled back. */
1186 
1187 	if (!update || !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1188 		byte*		old_ptr = ptr;
1189 		double		mbr[SPDIMS * 2];
1190 		mem_heap_t*	row_heap = NULL;
1191 
1192 		if (trx_undo_left(undo_block, ptr) < 5) {
1193 			return(0);
1194 		}
1195 
1196 		/* Reserve 2 bytes to write the number of bytes the stored
1197 		fields take in this undo record */
1198 
1199 		ptr += 2;
1200 
1201 		for (col_no = 0; col_no < dict_table_get_n_cols(table);
1202 		     col_no++) {
1203 
1204 			const dict_col_t*	col
1205 				= dict_table_get_nth_col(table, col_no);
1206 
1207 			if (!col->ord_part) {
1208 				continue;
1209 			}
1210 
1211 			const ulint pos = dict_index_get_nth_col_pos(
1212 				index, col_no, NULL);
1213 			/* All non-virtual columns must be present in
1214 			the clustered index. */
1215 			ut_ad(pos != ULINT_UNDEFINED);
1216 
1217 			const bool is_ext = rec_offs_nth_extern(offsets, pos);
1218 			const spatial_status_t spatial_status = is_ext
1219 				? dict_col_get_spatial_status(col)
1220 				: SPATIAL_NONE;
1221 
1222 			switch (spatial_status) {
1223 			case SPATIAL_UNKNOWN:
1224 				ut_ad(0);
1225 				/* fall through */
1226 			case SPATIAL_MIXED:
1227 			case SPATIAL_ONLY:
1228 				/* Externally stored spatially indexed
1229 				columns will be (redundantly) logged
1230 				again, because we did not write the
1231 				MBR yet, that is, the previous call to
1232 				trx_undo_page_report_modify_ext()
1233 				was with SPATIAL_UNKNOWN. */
1234 				break;
1235 			case SPATIAL_NONE:
1236 				if (!update) {
1237 					/* This is a DELETE operation. */
1238 					break;
1239 				}
1240 				/* Avoid redundantly logging indexed
1241 				columns that were updated. */
1242 
1243 				for (i = 0; i < update->n_fields; i++) {
1244 					const ulint field_no
1245 						= upd_get_nth_field(update, i)
1246 						->field_no;
1247 					if (field_no >= index->n_fields
1248 					    || dict_index_get_nth_field(
1249 						    index, field_no)->col
1250 					    == col) {
1251 						goto already_logged;
1252 					}
1253 				}
1254 			}
1255 
1256 			if (true) {
1257 				/* Write field number to undo log */
1258 				if (trx_undo_left(undo_block, ptr) < 5 + 15) {
1259 					return(0);
1260 				}
1261 
1262 				ptr += mach_write_compressed(ptr, pos);
1263 
1264 				/* Save the old value of field */
1265 				field = rec_get_nth_cfield(
1266 					rec, index, offsets, pos, &flen);
1267 
1268 				if (is_ext) {
1269 					const dict_col_t*	col =
1270 						dict_index_get_nth_col(
1271 							index, pos);
1272 					ulint			prefix_len =
1273 						dict_max_field_len_store_undo(
1274 							table, col);
1275 
1276 					ut_a(prefix_len < sizeof ext_buf);
1277 
1278 					/* If there is a spatial index on it,
1279 					log its MBR */
1280 					if (spatial_status != SPATIAL_NONE) {
1281 						ut_ad(DATA_GEOMETRY_MTYPE(
1282 								col->mtype));
1283 
1284 						trx_undo_get_mbr_from_ext(
1285 							mbr,
1286 							dict_table_page_size(
1287 								table),
1288 							field, &flen);
1289 					}
1290 
1291 					ptr = trx_undo_page_report_modify_ext(
1292 						ptr,
1293 						flen < REC_ANTELOPE_MAX_INDEX_COL_LEN
1294 						&& !ignore_prefix
1295 						? ext_buf : NULL, prefix_len,
1296 						dict_table_page_size(table),
1297 						&field, &flen,
1298 						spatial_status);
1299 				} else {
1300 					ptr += mach_write_compressed(
1301 						ptr, flen);
1302 				}
1303 
1304 				if (flen != UNIV_SQL_NULL
1305 				    && spatial_status != SPATIAL_ONLY) {
1306 					if (trx_undo_left(undo_block, ptr)
1307 					    < flen) {
1308 						return(0);
1309 					}
1310 
1311 					memcpy(ptr, field, flen);
1312 					ptr += flen;
1313 				}
1314 
1315 				if (spatial_status != SPATIAL_NONE) {
1316 					if (trx_undo_left(undo_block, ptr)
1317 					    < DATA_MBR_LEN) {
1318 						return(0);
1319 					}
1320 
1321 					for (int i = 0; i < SPDIMS * 2;
1322 					     i++) {
1323 						mach_double_write(
1324 							ptr, mbr[i]);
1325 						ptr +=  sizeof(double);
1326 					}
1327 				}
1328 			}
1329 
1330 already_logged:
1331 			continue;
1332 		}
1333 
1334 		for (col_no = 0; col_no < dict_table_get_n_v_cols(table);
1335 		     col_no++) {
1336 			const dict_v_col_t*     col
1337 				= dict_table_get_nth_v_col(table, col_no);
1338 
1339 			if (col->m_col.ord_part) {
1340 				ulint   pos = col_no;
1341 				ulint	max_v_log_len
1342 					= dict_max_v_field_len_store_undo(
1343 						table, pos);
1344 
1345 				/* Write field number to undo log.
1346 				Make sure there is enought space in log */
1347 				if (trx_undo_left(undo_block, ptr) < 5) {
1348 					return(0);
1349 				}
1350 
1351 				pos += REC_MAX_N_FIELDS;
1352 				ptr += mach_write_compressed(ptr, pos);
1353 
1354 				ut_ad(col_no < table->n_v_def);
1355 				ptr = trx_undo_log_v_idx(undo_block, table,
1356 							 col_no, ptr,
1357 							 first_v_col);
1358 				first_v_col = false;
1359 
1360 				if (!ptr) {
1361 					 return(0);
1362 				}
1363 
1364 				const dfield_t* vfield = NULL;
1365 
1366 				if (update) {
1367 					ut_ad(!row);
1368 					if (update->old_vrow == NULL) {
1369 						flen = UNIV_SQL_NULL;
1370 					} else {
1371 						vfield = dtuple_get_nth_v_field(
1372 							update->old_vrow,
1373 							col->v_pos);
1374 					}
1375 				} else if (row) {
1376 					vfield = dtuple_get_nth_v_field(
1377 						row, col->v_pos);
1378 				} else {
1379 					ut_ad(0);
1380 				}
1381 
1382 				if (vfield) {
1383 					field = static_cast<byte*>(vfield->data);
1384 					flen = vfield->len;
1385 				} else {
1386 					ut_ad(flen == UNIV_SQL_NULL);
1387 				}
1388 
1389 				if (flen != UNIV_SQL_NULL) {
1390 					flen = ut_min(
1391 						flen, max_v_log_len);
1392 				}
1393 
1394 				ptr += mach_write_compressed(ptr, flen);
1395 
1396 				switch (flen) {
1397 				case 0: case UNIV_SQL_NULL:
1398 					break;
1399 				default:
1400 					if (trx_undo_left(undo_block, ptr)
1401 					    < flen) {
1402 						return(0);
1403 					}
1404 
1405 					memcpy(ptr, field, flen);
1406 					ptr += flen;
1407 				}
1408 			}
1409 		}
1410 
1411 		mach_write_to_2(old_ptr, ulint(ptr - old_ptr));
1412 
1413 		if (row_heap) {
1414 			mem_heap_free(row_heap);
1415 		}
1416 	}
1417 
1418 	/*----------------------------------------*/
1419 	/* Write pointers to the previous and the next undo log records */
1420 	if (trx_undo_left(undo_block, ptr) < 2) {
1421 		return(0);
1422 	}
1423 
1424 	mach_write_to_2(ptr, first_free);
1425 	ptr += 2;
1426 	const ulint new_free = ulint(ptr - undo_block->frame);
1427 	mach_write_to_2(undo_block->frame + first_free, new_free);
1428 
1429 	mach_write_to_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
1430 			+ undo_block->frame, new_free);
1431 
1432 	/* Write to the REDO log about this change in the UNDO log */
1433 	trx_undof_page_add_undo_rec_log(undo_block, first_free, new_free, mtr);
1434 	return(first_free);
1435 }
1436 
1437 /**********************************************************************//**
1438 Reads from an undo log update record the system field values of the old
1439 version.
1440 @return remaining part of undo log record after reading these values */
1441 byte*
trx_undo_update_rec_get_sys_cols(const byte * ptr,trx_id_t * trx_id,roll_ptr_t * roll_ptr,ulint * info_bits)1442 trx_undo_update_rec_get_sys_cols(
1443 /*=============================*/
1444 	const byte*	ptr,		/*!< in: remaining part of undo
1445 					log record after reading
1446 					general parameters */
1447 	trx_id_t*	trx_id,		/*!< out: trx id */
1448 	roll_ptr_t*	roll_ptr,	/*!< out: roll ptr */
1449 	ulint*		info_bits)	/*!< out: info bits state */
1450 {
1451 	/* Read the state of the info bits */
1452 	*info_bits = mach_read_from_1(ptr);
1453 	ptr += 1;
1454 
1455 	/* Read the values of the system columns */
1456 
1457 	*trx_id = mach_u64_read_next_compressed(&ptr);
1458 	*roll_ptr = mach_u64_read_next_compressed(&ptr);
1459 
1460 	return(const_cast<byte*>(ptr));
1461 }
1462 
1463 /*******************************************************************//**
1464 Builds an update vector based on a remaining part of an undo log record.
1465 @return remaining part of the record, NULL if an error detected, which
1466 means that the record is corrupted */
1467 byte*
trx_undo_update_rec_get_update(const byte * ptr,dict_index_t * index,ulint type,trx_id_t trx_id,roll_ptr_t roll_ptr,ulint info_bits,mem_heap_t * heap,upd_t ** upd)1468 trx_undo_update_rec_get_update(
1469 /*===========================*/
1470 	const byte*	ptr,	/*!< in: remaining part in update undo log
1471 				record, after reading the row reference
1472 				NOTE that this copy of the undo log record must
1473 				be preserved as long as the update vector is
1474 				used, as we do NOT copy the data in the
1475 				record! */
1476 	dict_index_t*	index,	/*!< in: clustered index */
1477 	ulint		type,	/*!< in: TRX_UNDO_UPD_EXIST_REC,
1478 				TRX_UNDO_UPD_DEL_REC, or
1479 				TRX_UNDO_DEL_MARK_REC; in the last case,
1480 				only trx id and roll ptr fields are added to
1481 				the update vector */
1482 	trx_id_t	trx_id,	/*!< in: transaction id from this undo record */
1483 	roll_ptr_t	roll_ptr,/*!< in: roll pointer from this undo record */
1484 	ulint		info_bits,/*!< in: info bits from this undo record */
1485 	mem_heap_t*	heap,	/*!< in: memory heap from which the memory
1486 				needed is allocated */
1487 	upd_t**		upd)	/*!< out, own: update vector */
1488 {
1489 	upd_field_t*	upd_field;
1490 	upd_t*		update;
1491 	ulint		n_fields;
1492 	byte*		buf;
1493 	ulint		i;
1494 	bool		first_v_col = true;
1495 	bool		is_undo_log = true;
1496 	ulint		n_skip_field = 0;
1497 
1498 	ut_a(dict_index_is_clust(index));
1499 
1500 	if (type != TRX_UNDO_DEL_MARK_REC) {
1501 		n_fields = mach_read_next_compressed(&ptr);
1502 	} else {
1503 		n_fields = 0;
1504 	}
1505 
1506 	update = upd_create(n_fields + 2, heap);
1507 
1508 	update->info_bits = info_bits;
1509 
1510 	/* Store first trx id and roll ptr to update vector */
1511 
1512 	upd_field = upd_get_nth_field(update, n_fields);
1513 
1514 	buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_TRX_ID_LEN));
1515 
1516 	mach_write_to_6(buf, trx_id);
1517 
1518 	upd_field_set_field_no(upd_field,
1519 			       dict_index_get_sys_col_pos(index, DATA_TRX_ID),
1520 			       index);
1521 	dfield_set_data(&(upd_field->new_val), buf, DATA_TRX_ID_LEN);
1522 
1523 	upd_field = upd_get_nth_field(update, n_fields + 1);
1524 
1525 	buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_ROLL_PTR_LEN));
1526 
1527 	trx_write_roll_ptr(buf, roll_ptr);
1528 
1529 	upd_field_set_field_no(
1530 		upd_field, dict_index_get_sys_col_pos(index, DATA_ROLL_PTR),
1531 		index);
1532 	dfield_set_data(&(upd_field->new_val), buf, DATA_ROLL_PTR_LEN);
1533 
1534 	/* Store then the updated ordinary columns to the update vector */
1535 
1536 	for (i = 0; i < n_fields; i++) {
1537 
1538 		const byte*	field;
1539 		ulint		len;
1540 		ulint		field_no;
1541 		ulint		orig_len;
1542 		bool		is_virtual;
1543 
1544 		upd_field = upd_get_nth_field(update, i);
1545 		field_no = mach_read_next_compressed(&ptr);
1546 
1547 		is_virtual = (field_no >= REC_MAX_N_FIELDS);
1548 
1549 		if (is_virtual) {
1550 			/* If new version, we need to check index list to figure
1551 			out the correct virtual column position */
1552 			ptr = trx_undo_read_v_idx(
1553 				index->table, ptr, first_v_col, &is_undo_log,
1554 				&field_no);
1555 			first_v_col = false;
1556 			/* This column could be dropped or no longer indexed */
1557 			if (field_no == ULINT_UNDEFINED) {
1558 				/* Mark this is no longer needed */
1559 				upd_field->field_no = REC_MAX_N_FIELDS;
1560 
1561 				ptr = trx_undo_rec_get_col_val(
1562 					ptr, &field, &len, &orig_len);
1563 				ptr = trx_undo_rec_get_col_val(
1564 					ptr, &field, &len, &orig_len);
1565 				n_skip_field++;
1566 				continue;
1567 			}
1568 
1569 			upd_field_set_v_field_no(upd_field, field_no, index);
1570 		} else if (field_no < index->n_fields) {
1571 			upd_field_set_field_no(upd_field, field_no, index);
1572 		} else if (update->info_bits == REC_INFO_MIN_REC_FLAG
1573 			   && index->is_instant()) {
1574 			/* This must be a rollback of a subsequent
1575 			instant ADD COLUMN operation. This will be
1576 			detected and handled by btr_cur_trim(). */
1577 			upd_field->field_no = field_no;
1578 			upd_field->orig_len = 0;
1579 		} else {
1580 			ib::error() << "Trying to access update undo rec"
1581 				" field " << field_no
1582 				<< " in index " << index->name
1583 				<< " of table " << index->table->name
1584 				<< " but index has only "
1585 				<< dict_index_get_n_fields(index)
1586 				<< " fields " << BUG_REPORT_MSG
1587 				<< ". Run also CHECK TABLE "
1588 				<< index->table->name << "."
1589 				" n_fields = " << n_fields << ", i = " << i;
1590 
1591 			ut_ad(0);
1592 			*upd = NULL;
1593 			return(NULL);
1594 		}
1595 
1596 		ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1597 
1598 		upd_field->orig_len = orig_len;
1599 
1600 		if (len == UNIV_SQL_NULL) {
1601 			dfield_set_null(&upd_field->new_val);
1602 		} else if (len < UNIV_EXTERN_STORAGE_FIELD) {
1603 			dfield_set_data(&upd_field->new_val, field, len);
1604 		} else {
1605 			len -= UNIV_EXTERN_STORAGE_FIELD;
1606 
1607 			dfield_set_data(&upd_field->new_val, field, len);
1608 			dfield_set_ext(&upd_field->new_val);
1609 		}
1610 
1611 		if (is_virtual) {
1612 			upd_field->old_v_val = static_cast<dfield_t*>(
1613 				mem_heap_alloc(
1614 					heap, sizeof *upd_field->old_v_val));
1615 			ptr = trx_undo_rec_get_col_val(
1616 				ptr, &field, &len, &orig_len);
1617 	                if (len == UNIV_SQL_NULL) {
1618 				dfield_set_null(upd_field->old_v_val);
1619 			} else if (len < UNIV_EXTERN_STORAGE_FIELD) {
1620 				dfield_set_data(
1621 					upd_field->old_v_val, field, len);
1622 			} else {
1623 				ut_ad(0);
1624 			}
1625 		}
1626 	}
1627 
1628 	/* In rare scenario, we could have skipped virtual column (as they
1629 	are dropped. We will regenerate a update vector and skip them */
1630 	if (n_skip_field > 0) {
1631 		ulint	n = 0;
1632 		ut_ad(n_skip_field <= n_fields);
1633 
1634 		upd_t*	new_update = upd_create(
1635 			n_fields + 2 - n_skip_field, heap);
1636 
1637 		for (i = 0; i < n_fields + 2; i++) {
1638 			upd_field = upd_get_nth_field(update, i);
1639 
1640 			if (upd_field->field_no == REC_MAX_N_FIELDS) {
1641 				continue;
1642 			}
1643 
1644 			upd_field_t*	new_upd_field
1645 				 = upd_get_nth_field(new_update, n);
1646 			*new_upd_field = *upd_field;
1647 			n++;
1648 		}
1649 		ut_ad(n == n_fields + 2 - n_skip_field);
1650 		*upd = new_update;
1651 	} else {
1652 		*upd = update;
1653 	}
1654 
1655 	return(const_cast<byte*>(ptr));
1656 }
1657 
1658 /*******************************************************************//**
1659 Builds a partial row from an update undo log record, for purge.
1660 It contains the columns which occur as ordering in any index of the table.
1661 Any missing columns are indicated by col->mtype == DATA_MISSING.
1662 @return pointer to remaining part of undo record */
1663 byte*
trx_undo_rec_get_partial_row(const byte * ptr,dict_index_t * index,const upd_t * update,dtuple_t ** row,ibool ignore_prefix,mem_heap_t * heap)1664 trx_undo_rec_get_partial_row(
1665 /*=========================*/
1666 	const byte*	ptr,	/*!< in: remaining part in update undo log
1667 				record of a suitable type, at the start of
1668 				the stored index columns;
1669 				NOTE that this copy of the undo log record must
1670 				be preserved as long as the partial row is
1671 				used, as we do NOT copy the data in the
1672 				record! */
1673 	dict_index_t*	index,	/*!< in: clustered index */
1674 	const upd_t*	update,	/*!< in: updated columns */
1675 	dtuple_t**	row,	/*!< out, own: partial row */
1676 	ibool		ignore_prefix, /*!< in: flag to indicate if we
1677 				expect blob prefixes in undo. Used
1678 				only in the assertion. */
1679 	mem_heap_t*	heap)	/*!< in: memory heap from which the memory
1680 				needed is allocated */
1681 {
1682 	const byte*	end_ptr;
1683 	bool		first_v_col = true;
1684 	bool		is_undo_log = true;
1685 
1686 	ut_ad(index->is_primary());
1687 
1688 	*row = dtuple_create_with_vcol(
1689 		heap, dict_table_get_n_cols(index->table),
1690 		dict_table_get_n_v_cols(index->table));
1691 
1692 	/* Mark all columns in the row uninitialized, so that
1693 	we can distinguish missing fields from fields that are SQL NULL. */
1694 	for (ulint i = 0; i < dict_table_get_n_cols(index->table); i++) {
1695 		dfield_get_type(dtuple_get_nth_field(*row, i))
1696 			->mtype = DATA_MISSING;
1697 	}
1698 
1699 	dtuple_init_v_fld(*row);
1700 
1701 	for (const upd_field_t* uf = update->fields, * const ue
1702 		     = update->fields + update->n_fields;
1703 	     uf != ue; uf++) {
1704 		if (uf->old_v_val) {
1705 			continue;
1706 		}
1707 		ulint c = dict_index_get_nth_col(index, uf->field_no)->ind;
1708 		*dtuple_get_nth_field(*row, c) = uf->new_val;
1709 	}
1710 
1711 	end_ptr = ptr + mach_read_from_2(ptr);
1712 	ptr += 2;
1713 
1714 	while (ptr != end_ptr) {
1715 		dfield_t*	dfield;
1716 		const byte*	field;
1717 		ulint		field_no;
1718 		const dict_col_t* col;
1719 		ulint		col_no;
1720 		ulint		len;
1721 		ulint		orig_len;
1722 		bool		is_virtual;
1723 
1724 		field_no = mach_read_next_compressed(&ptr);
1725 
1726 		is_virtual = (field_no >= REC_MAX_N_FIELDS);
1727 
1728 		if (is_virtual) {
1729 			ptr = trx_undo_read_v_idx(
1730 				index->table, ptr, first_v_col, &is_undo_log,
1731 				&field_no);
1732 			first_v_col = false;
1733 		}
1734 
1735 		ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1736 
1737 		/* This column could be dropped or no longer indexed */
1738 		if (field_no == ULINT_UNDEFINED) {
1739 			ut_ad(is_virtual);
1740 			continue;
1741 		}
1742 
1743 		if (is_virtual) {
1744 			dict_v_col_t* vcol = dict_table_get_nth_v_col(
1745 						index->table, field_no);
1746 			col = &vcol->m_col;
1747 			col_no = dict_col_get_no(col);
1748 			dfield = dtuple_get_nth_v_field(*row, vcol->v_pos);
1749 			dict_col_copy_type(
1750 				&vcol->m_col,
1751 				dfield_get_type(dfield));
1752 		} else {
1753 			col = dict_index_get_nth_col(index, field_no);
1754 			col_no = dict_col_get_no(col);
1755 			dfield = dtuple_get_nth_field(*row, col_no);
1756 			ut_ad(dfield->type.mtype == DATA_MISSING
1757 			      || dict_col_type_assert_equal(col,
1758 							    &dfield->type));
1759 			ut_ad(dfield->type.mtype == DATA_MISSING
1760 			      || dfield->len == len
1761 			      || (len != UNIV_SQL_NULL
1762 				  && len >= UNIV_EXTERN_STORAGE_FIELD));
1763 			dict_col_copy_type(
1764 				dict_table_get_nth_col(index->table, col_no),
1765 				dfield_get_type(dfield));
1766 		}
1767 
1768 		dfield_set_data(dfield, field, len);
1769 
1770 		if (len != UNIV_SQL_NULL
1771 		    && len >= UNIV_EXTERN_STORAGE_FIELD) {
1772 			spatial_status_t	spatial_status;
1773 
1774 			/* Decode spatial status. */
1775 			spatial_status = static_cast<spatial_status_t>(
1776 				(len & SPATIAL_STATUS_MASK)
1777 				>> SPATIAL_STATUS_SHIFT);
1778 			len &= ~SPATIAL_STATUS_MASK;
1779 
1780 			/* Keep compatible with 5.7.9 format. */
1781 			if (spatial_status == SPATIAL_UNKNOWN) {
1782 				spatial_status =
1783 					dict_col_get_spatial_status(col);
1784 			}
1785 
1786 			switch (spatial_status) {
1787 			case SPATIAL_ONLY:
1788 				ut_ad(len - UNIV_EXTERN_STORAGE_FIELD
1789 				      == DATA_MBR_LEN);
1790 				dfield_set_len(
1791 					dfield,
1792 					len - UNIV_EXTERN_STORAGE_FIELD);
1793 				break;
1794 
1795 			case SPATIAL_MIXED:
1796 				dfield_set_len(
1797 					dfield,
1798 					len - UNIV_EXTERN_STORAGE_FIELD
1799 					- DATA_MBR_LEN);
1800 				break;
1801 
1802 			case SPATIAL_NONE:
1803 				dfield_set_len(
1804 					dfield,
1805 					len - UNIV_EXTERN_STORAGE_FIELD);
1806 				break;
1807 
1808 			case SPATIAL_UNKNOWN:
1809 				ut_ad(0);
1810 				break;
1811 			}
1812 
1813 			dfield_set_ext(dfield);
1814 			dfield_set_spatial_status(dfield, spatial_status);
1815 
1816 			/* If the prefix of this column is indexed,
1817 			ensure that enough prefix is stored in the
1818 			undo log record. */
1819 			if (!ignore_prefix && col->ord_part
1820 			    && spatial_status != SPATIAL_ONLY) {
1821 				ut_a(dfield_get_len(dfield)
1822 				     >= BTR_EXTERN_FIELD_REF_SIZE);
1823 				ut_a(dict_table_has_atomic_blobs(index->table)
1824 				     || dfield_get_len(dfield)
1825 				     >= REC_ANTELOPE_MAX_INDEX_COL_LEN
1826 				     + BTR_EXTERN_FIELD_REF_SIZE);
1827 			}
1828 		}
1829 	}
1830 
1831 	return(const_cast<byte*>(ptr));
1832 }
1833 
1834 /** Erase the unused undo log page end.
1835 @param[in,out]	undo_page	undo log page
1836 @return whether the page contained something */
1837 bool
trx_undo_erase_page_end(page_t * undo_page)1838 trx_undo_erase_page_end(page_t* undo_page)
1839 {
1840 	ulint	first_free;
1841 
1842 	first_free = mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR
1843 				      + TRX_UNDO_PAGE_FREE);
1844 	memset(undo_page + first_free, 0,
1845 	       (srv_page_size - FIL_PAGE_DATA_END) - first_free);
1846 
1847 	return(first_free != TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
1848 }
1849 
1850 /** Report a RENAME TABLE operation.
1851 @param[in,out]	trx	transaction
1852 @param[in]	table	table that is being renamed
1853 @param[in,out]	block	undo page
1854 @param[in,out]	mtr	mini-transaction
1855 @return	byte offset of the undo log record
1856 @retval	0	in case of failure */
1857 static
1858 ulint
trx_undo_page_report_rename(trx_t * trx,const dict_table_t * table,buf_block_t * block,mtr_t * mtr)1859 trx_undo_page_report_rename(trx_t* trx, const dict_table_t* table,
1860 			    buf_block_t* block, mtr_t* mtr)
1861 {
1862 	byte*	ptr_first_free  = TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
1863 		+ block->frame;
1864 	ulint	first_free = mach_read_from_2(ptr_first_free);
1865 	ut_ad(first_free >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
1866 	ut_ad(first_free <= srv_page_size);
1867 	byte* start = block->frame + first_free;
1868 	size_t len = strlen(table->name.m_name);
1869 	const size_t fixed = 2 + 1 + 11 + 11 + 2;
1870 	ut_ad(len <= NAME_LEN * 2 + 1);
1871 	/* The -10 is used in trx_undo_left() */
1872 	compile_time_assert((NAME_LEN * 1) * 2 + fixed
1873 			    + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE
1874 			    < UNIV_PAGE_SIZE_MIN - 10 - FIL_PAGE_DATA_END);
1875 
1876 	if (trx_undo_left(block, start) < fixed + len) {
1877 		ut_ad(first_free > TRX_UNDO_PAGE_HDR
1878 		      + TRX_UNDO_PAGE_HDR_SIZE);
1879 		return 0;
1880 	}
1881 
1882 	byte* ptr = start + 2;
1883 	*ptr++ = TRX_UNDO_RENAME_TABLE;
1884 	ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
1885 	ptr += mach_u64_write_much_compressed(ptr, table->id);
1886 	memcpy(ptr, table->name.m_name, len);
1887 	ptr += len;
1888 	mach_write_to_2(ptr, first_free);
1889 	ptr += 2;
1890 	ulint offset = page_offset(ptr);
1891 	mach_write_to_2(start, offset);
1892 	mach_write_to_2(ptr_first_free, offset);
1893 
1894 	trx_undof_page_add_undo_rec_log(block, first_free, offset, mtr);
1895 	return first_free;
1896 }
1897 
1898 /** Report a RENAME TABLE operation.
1899 @param[in,out]	trx	transaction
1900 @param[in]	table	table that is being renamed
1901 @return	DB_SUCCESS or error code */
trx_undo_report_rename(trx_t * trx,const dict_table_t * table)1902 dberr_t trx_undo_report_rename(trx_t* trx, const dict_table_t* table)
1903 {
1904 	ut_ad(!trx->read_only);
1905 	ut_ad(trx->id);
1906 	ut_ad(!table->is_temporary());
1907 
1908 	mtr_t		mtr;
1909 	dberr_t		err;
1910 	mtr.start();
1911 	if (buf_block_t* block = trx_undo_assign(trx, &err, &mtr)) {
1912 		trx_undo_t*	undo = trx->rsegs.m_redo.undo;
1913 		ut_ad(err == DB_SUCCESS);
1914 		ut_ad(undo);
1915 		for (ut_d(int loop_count = 0);;) {
1916 			ut_ad(loop_count++ < 2);
1917 			ut_ad(undo->last_page_no == block->page.id.page_no());
1918 
1919 			if (ulint offset = trx_undo_page_report_rename(
1920 				    trx, table, block, &mtr)) {
1921 				undo->top_page_no = undo->last_page_no;
1922 				undo->top_offset  = offset;
1923 				undo->top_undo_no = trx->undo_no++;
1924 				undo->guess_block = block;
1925 				ut_ad(!undo->empty());
1926 
1927 				err = DB_SUCCESS;
1928 				break;
1929 			} else {
1930 				mtr.commit();
1931 				mtr.start();
1932 				block = trx_undo_add_page(undo, &mtr);
1933 				if (!block) {
1934 					err = DB_OUT_OF_FILE_SPACE;
1935 					break;
1936 				}
1937 			}
1938 		}
1939 	}
1940 
1941 	mtr.commit();
1942 	return err;
1943 }
1944 
1945 /***********************************************************************//**
1946 Writes information to an undo log about an insert, update, or a delete marking
1947 of a clustered index record. This information is used in a rollback of the
1948 transaction and in consistent reads that must look to the history of this
1949 transaction.
1950 @return DB_SUCCESS or error code */
1951 dberr_t
trx_undo_report_row_operation(que_thr_t * thr,dict_index_t * index,const dtuple_t * clust_entry,const upd_t * update,ulint cmpl_info,const rec_t * rec,const rec_offs * offsets,roll_ptr_t * roll_ptr)1952 trx_undo_report_row_operation(
1953 /*==========================*/
1954 	que_thr_t*	thr,		/*!< in: query thread */
1955 	dict_index_t*	index,		/*!< in: clustered index */
1956 	const dtuple_t*	clust_entry,	/*!< in: in the case of an insert,
1957 					index entry to insert into the
1958 					clustered index; in updates,
1959 					may contain a clustered index
1960 					record tuple that also contains
1961 					virtual columns of the table;
1962 					otherwise, NULL */
1963 	const upd_t*	update,		/*!< in: in the case of an update,
1964 					the update vector, otherwise NULL */
1965 	ulint		cmpl_info,	/*!< in: compiler info on secondary
1966 					index updates */
1967 	const rec_t*	rec,		/*!< in: case of an update or delete
1968 					marking, the record in the clustered
1969 					index; NULL if insert */
1970 	const rec_offs*	offsets,	/*!< in: rec_get_offsets(rec) */
1971 	roll_ptr_t*	roll_ptr)	/*!< out: DB_ROLL_PTR to the
1972 					undo log record */
1973 {
1974 	trx_t*		trx;
1975 	mtr_t		mtr;
1976 #ifdef UNIV_DEBUG
1977 	int		loop_count	= 0;
1978 #endif /* UNIV_DEBUG */
1979 
1980 	ut_a(dict_index_is_clust(index));
1981 	ut_ad(!update || rec);
1982 	ut_ad(!rec || rec_offs_validate(rec, index, offsets));
1983 	ut_ad(!srv_read_only_mode);
1984 
1985 	trx = thr_get_trx(thr);
1986 	/* This function must not be invoked during rollback
1987 	(of a TRX_STATE_PREPARE transaction or otherwise). */
1988 	ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
1989 	ut_ad(!trx->in_rollback);
1990 
1991 	mtr.start();
1992 	trx_undo_t**	pundo;
1993 	trx_rseg_t*	rseg;
1994 	const bool	is_temp	= index->table->is_temporary();
1995 
1996 	if (is_temp) {
1997 		mtr.set_log_mode(MTR_LOG_NO_REDO);
1998 
1999 		rseg = trx->get_temp_rseg();
2000 		pundo = &trx->rsegs.m_noredo.undo;
2001 	} else {
2002 		ut_ad(!trx->read_only);
2003 		ut_ad(trx->id);
2004 		pundo = &trx->rsegs.m_redo.undo;
2005 		rseg = trx->rsegs.m_redo.rseg;
2006 	}
2007 
2008 	dberr_t		err;
2009 	buf_block_t*	undo_block = trx_undo_assign_low(trx, rseg, pundo,
2010 							 &err, &mtr);
2011 	trx_undo_t*	undo	= *pundo;
2012 
2013 	ut_ad((err == DB_SUCCESS) == (undo_block != NULL));
2014 	if (UNIV_UNLIKELY(undo_block == NULL)) {
2015 		goto err_exit;
2016 	}
2017 
2018 	ut_ad(undo != NULL);
2019 
2020 	do {
2021 		ulint	offset = !rec
2022 			? trx_undo_page_report_insert(
2023 				undo_block, trx, index, clust_entry, &mtr)
2024 			: trx_undo_page_report_modify(
2025 				undo_block, trx, index, rec, offsets, update,
2026 				cmpl_info, clust_entry, &mtr);
2027 
2028 		if (UNIV_UNLIKELY(offset == 0)) {
2029 			if (!trx_undo_erase_page_end(undo_block->frame)) {
2030 				/* The record did not fit on an empty
2031 				undo page. Discard the freshly allocated
2032 				page and return an error. */
2033 
2034 				/* When we remove a page from an undo
2035 				log, this is analogous to a
2036 				pessimistic insert in a B-tree, and we
2037 				must reserve the counterpart of the
2038 				tree latch, which is the rseg
2039 				mutex. We must commit the mini-transaction
2040 				first, because it may be holding lower-level
2041 				latches, such as SYNC_FSP and SYNC_FSP_PAGE. */
2042 
2043 				mtr.commit();
2044 				mtr.start();
2045 				if (is_temp) {
2046 					mtr.set_log_mode(MTR_LOG_NO_REDO);
2047 				}
2048 
2049 				mutex_enter(&rseg->mutex);
2050 				trx_undo_free_last_page(undo, &mtr);
2051 				mutex_exit(&rseg->mutex);
2052 
2053 				err = DB_UNDO_RECORD_TOO_BIG;
2054 				goto err_exit;
2055 			}
2056 
2057 			mtr.commit();
2058 		} else {
2059 			/* Success */
2060 			undo->top_page_no = undo_block->page.id.page_no();
2061 			mtr.commit();
2062 			undo->top_offset  = offset;
2063 			undo->top_undo_no = trx->undo_no++;
2064 			undo->guess_block = undo_block;
2065 			ut_ad(!undo->empty());
2066 
2067 			if (!is_temp) {
2068 				const undo_no_t limit = undo->top_undo_no;
2069 				/* Determine if this is the first time
2070 				when this transaction modifies a
2071 				system-versioned column in this table. */
2072 				trx_mod_table_time_t& time
2073 					= trx->mod_tables.insert(
2074 						trx_mod_tables_t::value_type(
2075 							index->table, limit))
2076 					.first->second;
2077 				ut_ad(time.valid(limit));
2078 
2079 				if (!time.is_versioned()
2080 				    && index->table->versioned_by_id()
2081 				    && (!rec /* INSERT */
2082 					|| (update
2083 					    && update->affects_versioned()))) {
2084 					time.set_versioned(limit);
2085 				}
2086 			}
2087 
2088 			*roll_ptr = trx_undo_build_roll_ptr(
2089 				!rec, rseg->id, undo->top_page_no, offset);
2090 			return(DB_SUCCESS);
2091 		}
2092 
2093 		ut_ad(undo_block->page.id.page_no() == undo->last_page_no);
2094 
2095 		/* We have to extend the undo log by one page */
2096 
2097 		ut_ad(++loop_count < 2);
2098 		mtr.start();
2099 
2100 		if (is_temp) {
2101 			mtr.set_log_mode(MTR_LOG_NO_REDO);
2102 		}
2103 
2104 		undo_block = trx_undo_add_page(undo, &mtr);
2105 
2106 		DBUG_EXECUTE_IF("ib_err_ins_undo_page_add_failure",
2107 				undo_block = NULL;);
2108 	} while (UNIV_LIKELY(undo_block != NULL));
2109 
2110 	ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
2111 		DB_OUT_OF_FILE_SPACE,
2112 		//ER_INNODB_UNDO_LOG_FULL,
2113 		"No more space left over in %s tablespace for allocating UNDO"
2114 		" log pages. Please add new data file to the tablespace or"
2115 		" check if filesystem is full or enable auto-extension for"
2116 		" the tablespace",
2117 		undo->rseg->space == fil_system.sys_space
2118 		? "system" : is_temp ? "temporary" : "undo");
2119 
2120 	/* Did not succeed: out of space */
2121 	err = DB_OUT_OF_FILE_SPACE;
2122 
2123 err_exit:
2124 	mtr_commit(&mtr);
2125 	return(err);
2126 }
2127 
2128 /*============== BUILDING PREVIOUS VERSION OF A RECORD ===============*/
2129 
2130 /** Copy an undo record to heap.
2131 @param[in]	roll_ptr	roll pointer to a record that exists
2132 @param[in,out]	heap		memory heap where copied */
2133 static
2134 trx_undo_rec_t*
trx_undo_get_undo_rec_low(roll_ptr_t roll_ptr,mem_heap_t * heap)2135 trx_undo_get_undo_rec_low(
2136 	roll_ptr_t	roll_ptr,
2137 	mem_heap_t*	heap)
2138 {
2139 	trx_undo_rec_t*	undo_rec;
2140 	ulint		rseg_id;
2141 	ulint		page_no;
2142 	ulint		offset;
2143 	const page_t*	undo_page;
2144 	trx_rseg_t*	rseg;
2145 	ibool		is_insert;
2146 	mtr_t		mtr;
2147 
2148 	trx_undo_decode_roll_ptr(roll_ptr, &is_insert, &rseg_id, &page_no,
2149 				 &offset);
2150 	ut_ad(page_no > FSP_FIRST_INODE_PAGE_NO);
2151 	ut_ad(offset >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
2152 	rseg = trx_sys.rseg_array[rseg_id];
2153 	ut_ad(rseg->is_persistent());
2154 
2155 	mtr_start(&mtr);
2156 
2157 	undo_page = trx_undo_page_get_s_latched(
2158 		page_id_t(rseg->space->id, page_no), &mtr);
2159 
2160 	undo_rec = trx_undo_rec_copy(undo_page + offset, heap);
2161 
2162 	mtr_commit(&mtr);
2163 
2164 	return(undo_rec);
2165 }
2166 
2167 /** Copy an undo record to heap.
2168 @param[in]	roll_ptr	roll pointer to record
2169 @param[in,out]	heap		memory heap where copied
2170 @param[in]	trx_id		id of the trx that generated
2171 				the roll pointer: it points to an
2172 				undo log of this transaction
2173 @param[in]	name		table name
2174 @param[out]	undo_rec	own: copy of the record
2175 @retval true if the undo log has been
2176 truncated and we cannot fetch the old version
2177 @retval false if the undo log record is available
2178 NOTE: the caller must have latches on the clustered index page. */
2179 static MY_ATTRIBUTE((warn_unused_result))
2180 bool
trx_undo_get_undo_rec(roll_ptr_t roll_ptr,mem_heap_t * heap,trx_id_t trx_id,const table_name_t & name,trx_undo_rec_t ** undo_rec)2181 trx_undo_get_undo_rec(
2182 	roll_ptr_t		roll_ptr,
2183 	mem_heap_t*		heap,
2184 	trx_id_t		trx_id,
2185 	const table_name_t&	name,
2186 	trx_undo_rec_t**	undo_rec)
2187 {
2188 	bool		missing_history;
2189 
2190 	rw_lock_s_lock(&purge_sys.latch);
2191 
2192 	missing_history = purge_sys.view.changes_visible(trx_id, name);
2193 	if (!missing_history) {
2194 		*undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap);
2195 	}
2196 
2197 	rw_lock_s_unlock(&purge_sys.latch);
2198 
2199 	return(missing_history);
2200 }
2201 
2202 #ifdef UNIV_DEBUG
2203 #define ATTRIB_USED_ONLY_IN_DEBUG
2204 #else /* UNIV_DEBUG */
2205 #define ATTRIB_USED_ONLY_IN_DEBUG	MY_ATTRIBUTE((unused))
2206 #endif /* UNIV_DEBUG */
2207 
2208 /*******************************************************************//**
2209 Build a previous version of a clustered index record. The caller must
2210 hold a latch on the index page of the clustered index record.
2211 @retval true if previous version was built, or if it was an insert
2212 or the table has been rebuilt
2213 @retval false if the previous version is earlier than purge_view,
2214 or being purged, which means that it may have been removed */
2215 bool
trx_undo_prev_version_build(const rec_t * index_rec ATTRIB_USED_ONLY_IN_DEBUG,mtr_t * index_mtr ATTRIB_USED_ONLY_IN_DEBUG,const rec_t * rec,dict_index_t * index,rec_offs * offsets,mem_heap_t * heap,rec_t ** old_vers,mem_heap_t * v_heap,dtuple_t ** vrow,ulint v_status)2216 trx_undo_prev_version_build(
2217 /*========================*/
2218 	const rec_t*	index_rec ATTRIB_USED_ONLY_IN_DEBUG,
2219 				/*!< in: clustered index record in the
2220 				index tree */
2221 	mtr_t*		index_mtr ATTRIB_USED_ONLY_IN_DEBUG,
2222 				/*!< in: mtr which contains the latch to
2223 				index_rec page and purge_view */
2224 	const rec_t*	rec,	/*!< in: version of a clustered index record */
2225 	dict_index_t*	index,	/*!< in: clustered index */
2226 	rec_offs*	offsets,/*!< in/out: rec_get_offsets(rec, index) */
2227 	mem_heap_t*	heap,	/*!< in: memory heap from which the memory
2228 				needed is allocated */
2229 	rec_t**		old_vers,/*!< out, own: previous version, or NULL if
2230 				rec is the first inserted version, or if
2231 				history data has been deleted (an error),
2232 				or if the purge COULD have removed the version
2233 				though it has not yet done so */
2234 	mem_heap_t*	v_heap,	/* !< in: memory heap used to create vrow
2235 				dtuple if it is not yet created. This heap
2236 				diffs from "heap" above in that it could be
2237 				prebuilt->old_vers_heap for selection */
2238 	dtuple_t**	vrow,	/*!< out: virtual column info, if any */
2239 	ulint		v_status)
2240 				/*!< in: status determine if it is going
2241 				into this function by purge thread or not.
2242 				And if we read "after image" of undo log */
2243 {
2244 	trx_undo_rec_t*	undo_rec	= NULL;
2245 	dtuple_t*	entry;
2246 	trx_id_t	rec_trx_id;
2247 	ulint		type;
2248 	undo_no_t	undo_no;
2249 	table_id_t	table_id;
2250 	trx_id_t	trx_id;
2251 	roll_ptr_t	roll_ptr;
2252 	upd_t*		update;
2253 	byte*		ptr;
2254 	ulint		info_bits;
2255 	ulint		cmpl_info;
2256 	bool		dummy_extern;
2257 	byte*		buf;
2258 
2259 	ut_ad(!index->table->is_temporary());
2260 	ut_ad(!rw_lock_own(&purge_sys.latch, RW_LOCK_S));
2261 	ut_ad(mtr_memo_contains_page_flagged(index_mtr, index_rec,
2262 					     MTR_MEMO_PAGE_S_FIX
2263 					     | MTR_MEMO_PAGE_X_FIX));
2264 	ut_ad(rec_offs_validate(rec, index, offsets));
2265 	ut_a(index->is_primary());
2266 
2267 	roll_ptr = row_get_rec_roll_ptr(rec, index, offsets);
2268 
2269 	*old_vers = NULL;
2270 
2271 	if (trx_undo_roll_ptr_is_insert(roll_ptr)) {
2272 		/* The record rec is the first inserted version */
2273 		return(true);
2274 	}
2275 
2276 	rec_trx_id = row_get_rec_trx_id(rec, index, offsets);
2277 
2278 	ut_ad(!index->table->skip_alter_undo);
2279 
2280 	if (trx_undo_get_undo_rec(
2281 		    roll_ptr, heap, rec_trx_id, index->table->name,
2282 		    &undo_rec)) {
2283 		if (v_status & TRX_UNDO_PREV_IN_PURGE) {
2284 			/* We are fetching the record being purged */
2285 			undo_rec = trx_undo_get_undo_rec_low(roll_ptr, heap);
2286 		} else {
2287 			/* The undo record may already have been purged,
2288 			during purge or semi-consistent read. */
2289 			return(false);
2290 		}
2291 	}
2292 
2293 	ptr = trx_undo_rec_get_pars(undo_rec, &type, &cmpl_info,
2294 				    &dummy_extern, &undo_no, &table_id);
2295 
2296 	if (table_id != index->table->id) {
2297 		/* The table should have been rebuilt, but purge has
2298 		not yet removed the undo log records for the
2299 		now-dropped old table (table_id). */
2300 		return(true);
2301 	}
2302 
2303 	ptr = trx_undo_update_rec_get_sys_cols(ptr, &trx_id, &roll_ptr,
2304 					       &info_bits);
2305 
2306 	/* (a) If a clustered index record version is such that the
2307 	trx id stamp in it is bigger than purge_sys.view, then the
2308 	BLOBs in that version are known to exist (the purge has not
2309 	progressed that far);
2310 
2311 	(b) if the version is the first version such that trx id in it
2312 	is less than purge_sys.view, and it is not delete-marked,
2313 	then the BLOBs in that version are known to exist (the purge
2314 	cannot have purged the BLOBs referenced by that version
2315 	yet).
2316 
2317 	This function does not fetch any BLOBs.  The callers might, by
2318 	possibly invoking row_ext_create() via row_build().  However,
2319 	they should have all needed information in the *old_vers
2320 	returned by this function.  This is because *old_vers is based
2321 	on the transaction undo log records.  The function
2322 	trx_undo_page_fetch_ext() will write BLOB prefixes to the
2323 	transaction undo log that are at least as long as the longest
2324 	possible column prefix in a secondary index.  Thus, secondary
2325 	index entries for *old_vers can be constructed without
2326 	dereferencing any BLOB pointers. */
2327 
2328 	ptr = trx_undo_rec_skip_row_ref(ptr, index);
2329 
2330 	ptr = trx_undo_update_rec_get_update(ptr, index, type, trx_id,
2331 					     roll_ptr, info_bits,
2332 					     heap, &update);
2333 	ut_a(ptr);
2334 
2335 	if (row_upd_changes_field_size_or_external(index, offsets, update)) {
2336 		/* We should confirm the existence of disowned external data,
2337 		if the previous version record is delete marked. If the trx_id
2338 		of the previous record is seen by purge view, we should treat
2339 		it as missing history, because the disowned external data
2340 		might be purged already.
2341 
2342 		The inherited external data (BLOBs) can be freed (purged)
2343 		after trx_id was committed, provided that no view was started
2344 		before trx_id. If the purge view can see the committed
2345 		delete-marked record by trx_id, no transactions need to access
2346 		the BLOB. */
2347 
2348 		/* the row_upd_changes_disowned_external(update) call could be
2349 		omitted, but the synchronization on purge_sys.latch is likely
2350 		more expensive. */
2351 
2352 		if ((update->info_bits & REC_INFO_DELETED_FLAG)
2353 		    && row_upd_changes_disowned_external(update)) {
2354 			bool	missing_extern;
2355 
2356 			rw_lock_s_lock(&purge_sys.latch);
2357 
2358 			missing_extern = purge_sys.view.changes_visible(
2359 				trx_id,	index->table->name);
2360 
2361 			rw_lock_s_unlock(&purge_sys.latch);
2362 
2363 			if (missing_extern) {
2364 				/* treat as a fresh insert, not to
2365 				cause assertion error at the caller. */
2366 				return(true);
2367 			}
2368 		}
2369 
2370 		/* We have to set the appropriate extern storage bits in the
2371 		old version of the record: the extern bits in rec for those
2372 		fields that update does NOT update, as well as the bits for
2373 		those fields that update updates to become externally stored
2374 		fields. Store the info: */
2375 
2376 		entry = row_rec_to_index_entry(rec, index, offsets, heap);
2377 		/* The page containing the clustered index record
2378 		corresponding to entry is latched in mtr.  Thus the
2379 		following call is safe. */
2380 		if (!row_upd_index_replace_new_col_vals(entry, *index, update,
2381 							heap)) {
2382 			ut_a(v_status & TRX_UNDO_PREV_IN_PURGE);
2383 			return false;
2384 		}
2385 
2386 		/* Get number of externally stored columns in updated record */
2387 		const ulint n_ext = dtuple_get_n_ext(entry);
2388 
2389 		buf = static_cast<byte*>(mem_heap_alloc(
2390 			heap, rec_get_converted_size(index, entry, n_ext)));
2391 
2392 		*old_vers = rec_convert_dtuple_to_rec(buf, index,
2393 						      entry, n_ext);
2394 	} else {
2395 		buf = static_cast<byte*>(mem_heap_alloc(
2396 			heap, rec_offs_size(offsets)));
2397 
2398 		*old_vers = rec_copy(buf, rec, offsets);
2399 		rec_offs_make_valid(*old_vers, index, true, offsets);
2400 		row_upd_rec_in_place(*old_vers, index, offsets, update, NULL);
2401 	}
2402 
2403 	/* Set the old value (which is the after image of an update) in the
2404 	update vector to dtuple vrow */
2405 	if (v_status & TRX_UNDO_GET_OLD_V_VALUE) {
2406 		row_upd_replace_vcol((dtuple_t*)*vrow, index->table, update,
2407 				     false, NULL, NULL);
2408 	}
2409 
2410 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2411 	rec_offs offsets_dbg[REC_OFFS_NORMAL_SIZE];
2412 	rec_offs_init(offsets_dbg);
2413 	ut_a(!rec_offs_any_null_extern(
2414 		*old_vers, rec_get_offsets(*old_vers, index, offsets_dbg,
2415 					   index->n_core_fields,
2416 					   ULINT_UNDEFINED, &heap)));
2417 #endif // defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2418 
2419 	if (vrow && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
2420 		if (!(*vrow)) {
2421 			*vrow = dtuple_create_with_vcol(
2422 				v_heap ? v_heap : heap,
2423 				dict_table_get_n_cols(index->table),
2424 				dict_table_get_n_v_cols(index->table));
2425 			dtuple_init_v_fld(*vrow);
2426 		}
2427 
2428 		ut_ad(index->table->n_v_cols);
2429 		trx_undo_read_v_cols(index->table, ptr, *vrow,
2430 				     v_status & TRX_UNDO_PREV_IN_PURGE);
2431 	}
2432 
2433 	return(true);
2434 }
2435 
2436 /** Read virtual column value from undo log
2437 @param[in]	table		the table
2438 @param[in]	ptr		undo log pointer
2439 @param[in,out]	row		the dtuple to fill
2440 @param[in]	in_purge	whether this is called by purge */
2441 void
trx_undo_read_v_cols(const dict_table_t * table,const byte * ptr,dtuple_t * row,bool in_purge)2442 trx_undo_read_v_cols(
2443 	const dict_table_t*	table,
2444 	const byte*		ptr,
2445 	dtuple_t*		row,
2446 	bool			in_purge)
2447 {
2448 	const byte*     end_ptr;
2449 	bool		first_v_col = true;
2450 	bool		is_undo_log = true;
2451 
2452 	end_ptr = ptr + mach_read_from_2(ptr);
2453 	ptr += 2;
2454 	while (ptr < end_ptr) {
2455 		dfield_t*	dfield;
2456 		const byte*	field;
2457 		ulint		field_no;
2458 		ulint		len;
2459 		ulint		orig_len;
2460 		bool		is_virtual;
2461 
2462 		field_no = mach_read_next_compressed(
2463 				const_cast<const byte**>(&ptr));
2464 
2465 		is_virtual = (field_no >= REC_MAX_N_FIELDS);
2466 
2467 		if (is_virtual) {
2468 			ptr = trx_undo_read_v_idx(
2469 				table, ptr, first_v_col, &is_undo_log,
2470 				&field_no);
2471 			first_v_col = false;
2472 		}
2473 
2474 		ptr = trx_undo_rec_get_col_val(
2475 			ptr, &field, &len, &orig_len);
2476 
2477 		/* The virtual column is no longer indexed or does not exist.
2478 		This needs to put after trx_undo_rec_get_col_val() so the
2479 		undo ptr advances */
2480 		if (field_no == ULINT_UNDEFINED) {
2481 			ut_ad(is_virtual);
2482 			continue;
2483 		}
2484 
2485 		if (is_virtual) {
2486 			dict_v_col_t*	vcol = dict_table_get_nth_v_col(
2487 				table, field_no);
2488 
2489 			dfield = dtuple_get_nth_v_field(row, vcol->v_pos);
2490 
2491 			if (!in_purge
2492 			    || dfield_get_type(dfield)->mtype == DATA_MISSING) {
2493 				dict_col_copy_type(
2494 					&vcol->m_col,
2495 					dfield_get_type(dfield));
2496 				dfield_set_data(dfield, field, len);
2497 			}
2498 		}
2499 	}
2500 
2501 	ut_ad(ptr == end_ptr);
2502 }
2503