1 /*****************************************************************************
2 
3 Copyright (c) 1996, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file trx/trx0rec.cc
29 Transaction undo log record
30 
31 Created 3/26/1996 Heikki Tuuri
32 *******************************************************/
33 
34 #include "trx0rec.h"
35 
36 #ifdef UNIV_NONINL
37 #include "trx0rec.ic"
38 #endif
39 
40 #include "fsp0fsp.h"
41 #include "mach0data.h"
42 #include "trx0undo.h"
43 #include "mtr0log.h"
44 #ifndef UNIV_HOTBACKUP
45 #include "dict0dict.h"
46 #include "ut0mem.h"
47 #include "read0read.h"
48 #include "row0ext.h"
49 #include "row0upd.h"
50 #include "que0que.h"
51 #include "trx0purge.h"
52 #include "trx0rseg.h"
53 #include "row0row.h"
54 #include "fsp0sysspace.h"
55 #include "row0mysql.h"
56 
57 /*=========== UNDO LOG RECORD CREATION AND DECODING ====================*/
58 
59 /**********************************************************************//**
60 Writes the mtr log entry of the inserted undo log record on the undo log
61 page. */
62 UNIV_INLINE
63 void
trx_undof_page_add_undo_rec_log(page_t * undo_page,ulint old_free,ulint new_free,mtr_t * mtr)64 trx_undof_page_add_undo_rec_log(
65 /*============================*/
66 	page_t* undo_page,	/*!< in: undo log page */
67 	ulint	old_free,	/*!< in: start offset of the inserted entry */
68 	ulint	new_free,	/*!< in: end offset of the entry */
69 	mtr_t*	mtr)		/*!< in: mtr */
70 {
71 	byte*		log_ptr;
72 	const byte*	log_end;
73 	ulint		len;
74 
75 	log_ptr = mlog_open(mtr, 11 + 13 + MLOG_BUF_MARGIN);
76 
77 	if (log_ptr == NULL) {
78 
79 		return;
80 	}
81 
82 	log_end = &log_ptr[11 + 13 + MLOG_BUF_MARGIN];
83 	log_ptr = mlog_write_initial_log_record_fast(
84 		undo_page, MLOG_UNDO_INSERT, log_ptr, mtr);
85 	len = new_free - old_free - 4;
86 
87 	mach_write_to_2(log_ptr, len);
88 	log_ptr += 2;
89 
90 	if (log_ptr + len <= log_end) {
91 		memcpy(log_ptr, undo_page + old_free + 2, len);
92 		mlog_close(mtr, log_ptr + len);
93 	} else {
94 		mlog_close(mtr, log_ptr);
95 		mlog_catenate_string(mtr, undo_page + old_free + 2, len);
96 	}
97 }
98 #endif /* !UNIV_HOTBACKUP */
99 
100 /***********************************************************//**
101 Parses a redo log record of adding an undo log record.
102 @return end of log record or NULL */
103 byte*
trx_undo_parse_add_undo_rec(byte * ptr,byte * end_ptr,page_t * page)104 trx_undo_parse_add_undo_rec(
105 /*========================*/
106 	byte*	ptr,	/*!< in: buffer */
107 	byte*	end_ptr,/*!< in: buffer end */
108 	page_t*	page)	/*!< in: page or NULL */
109 {
110 	ulint	len;
111 	byte*	rec;
112 	ulint	first_free;
113 
114 	if (end_ptr < ptr + 2) {
115 
116 		return(NULL);
117 	}
118 
119 	len = mach_read_from_2(ptr);
120 	ptr += 2;
121 
122 	if (end_ptr < ptr + len) {
123 
124 		return(NULL);
125 	}
126 
127 	if (page == NULL) {
128 
129 		return(ptr + len);
130 	}
131 
132 	first_free = mach_read_from_2(page + TRX_UNDO_PAGE_HDR
133 				      + TRX_UNDO_PAGE_FREE);
134 	rec = page + first_free;
135 
136 	mach_write_to_2(rec, first_free + 4 + len);
137 	mach_write_to_2(rec + 2 + len, first_free);
138 
139 	mach_write_to_2(page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE,
140 			first_free + 4 + len);
141 	ut_memcpy(rec + 2, ptr, len);
142 
143 	return(ptr + len);
144 }
145 
146 #ifndef UNIV_HOTBACKUP
147 /**********************************************************************//**
148 Calculates the free space left for extending an undo log record.
149 @return bytes left */
150 UNIV_INLINE
151 ulint
trx_undo_left(const page_t * page,const byte * ptr)152 trx_undo_left(
153 /*==========*/
154 	const page_t*	page,	/*!< in: undo log page */
155 	const byte*	ptr)	/*!< in: pointer to page */
156 {
157 	/* The '- 10' is a safety margin, in case we have some small
158 	calculation error below */
159 
160 	return(UNIV_PAGE_SIZE - (ptr - page) - 10 - FIL_PAGE_DATA_END);
161 }
162 
163 /**********************************************************************//**
164 Set the next and previous pointers in the undo page for the undo record
165 that was written to ptr. Update the first free value by the number of bytes
166 written for this undo record.
167 @return offset of the inserted entry on the page if succeeded, 0 if fail */
168 static
169 ulint
trx_undo_page_set_next_prev_and_add(page_t * undo_page,byte * ptr,mtr_t * mtr)170 trx_undo_page_set_next_prev_and_add(
171 /*================================*/
172 	page_t*		undo_page,	/*!< in/out: undo log page */
173 	byte*		ptr,		/*!< in: ptr up to where data has been
174 					written on this undo page. */
175 	mtr_t*		mtr)		/*!< in: mtr */
176 {
177 	ulint		first_free;	/*!< offset within undo_page */
178 	ulint		end_of_rec;	/*!< offset within undo_page */
179 	byte*		ptr_to_first_free;
180 					/* pointer within undo_page
181 					that points to the next free
182 					offset value within undo_page.*/
183 
184 	ut_ad(ptr > undo_page);
185 	ut_ad(ptr < undo_page + UNIV_PAGE_SIZE);
186 
187 	if (UNIV_UNLIKELY(trx_undo_left(undo_page, ptr) < 2)) {
188 
189 		return(0);
190 	}
191 
192 	ptr_to_first_free = undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE;
193 
194 	first_free = mach_read_from_2(ptr_to_first_free);
195 
196 	/* Write offset of the previous undo log record */
197 	mach_write_to_2(ptr, first_free);
198 	ptr += 2;
199 
200 	end_of_rec = ptr - undo_page;
201 
202 	/* Write offset of the next undo log record */
203 	mach_write_to_2(undo_page + first_free, end_of_rec);
204 
205 	/* Update the offset to first free undo record */
206 	mach_write_to_2(ptr_to_first_free, end_of_rec);
207 
208 	/* Write this log entry to the UNDO log */
209 	trx_undof_page_add_undo_rec_log(undo_page, first_free,
210 					end_of_rec, mtr);
211 
212 	return(first_free);
213 }
214 
215 /** Virtual column undo log version. To distinguish it from a length value
216 in 5.7.8 undo log, it starts with 0xF1 */
217 static const ulint VIRTUAL_COL_UNDO_FORMAT_1 = 0xF1;
218 
219 /** Write virtual column index info (index id and column position in index)
220 to the undo log
221 @param[in,out]	undo_page	undo log page
222 @param[in]	table           the table
223 @param[in]	pos		the virtual column position
224 @param[in]      ptr             undo log record being written
225 @param[in]	first_v_col	whether this is the first virtual column
226 				which could start with a version marker
227 @return new undo log pointer */
228 static
229 byte*
trx_undo_log_v_idx(page_t * undo_page,const dict_table_t * table,ulint pos,byte * ptr,bool first_v_col)230 trx_undo_log_v_idx(
231 	page_t*			undo_page,
232 	const dict_table_t*	table,
233 	ulint			pos,
234 	byte*			ptr,
235 	bool			first_v_col)
236 {
237 	ut_ad(pos < table->n_v_def);
238 	dict_v_col_t*	vcol = dict_table_get_nth_v_col(table, pos);
239 
240 	ulint		n_idx = vcol->v_indexes->size();
241 	byte*		old_ptr;
242 
243 	ut_ad(n_idx > 0);
244 
245 	/* Size to reserve, max 5 bytes for each index id and position, plus
246 	5 bytes for num of indexes, 2 bytes for write total length.
247 	1 byte for undo log record format version marker */
248 	ulint		size = n_idx * (5 + 5) + 5 + 2 + (first_v_col ? 1 : 0);
249 
250 	if (trx_undo_left(undo_page, ptr) < size) {
251 		return(NULL);
252 	}
253 
254 	if (first_v_col) {
255 		/* write the version marker */
256 		mach_write_to_1(ptr, VIRTUAL_COL_UNDO_FORMAT_1);
257 
258 		ptr += 1;
259 	}
260 
261 	old_ptr = ptr;
262 
263 	ptr += 2;
264 
265 	ptr += mach_write_compressed(ptr, n_idx);
266 
267 	dict_v_idx_list::iterator       it;
268 
269 	for (it = vcol->v_indexes->begin();
270 	     it != vcol->v_indexes->end(); ++it) {
271 		dict_v_idx_t	v_index = *it;
272 
273 		ptr += mach_write_compressed(
274 			ptr, static_cast<ulint>(v_index.index->id));
275 
276 		ptr += mach_write_compressed(ptr, v_index.nth_field);
277 	}
278 
279 	mach_write_to_2(old_ptr, ptr - old_ptr);
280 
281 	return(ptr);
282 }
283 
284 /** Read virtual column index from undo log, and verify the column is still
285 indexed, and return its position
286 @param[in]	table		the table
287 @param[in]	ptr		undo log pointer
288 @param[out]	col_pos		the column number or ULINT_UNDEFINED
289 				if the column is not indexed any more
290 @return remaining part of undo log record after reading these values */
291 static
292 const byte*
trx_undo_read_v_idx_low(const dict_table_t * table,const byte * ptr,ulint * col_pos)293 trx_undo_read_v_idx_low(
294 	const dict_table_t*	table,
295 	const byte*		ptr,
296 	ulint*			col_pos)
297 {
298 	ulint		len = mach_read_from_2(ptr);
299 	const byte*	old_ptr = ptr;
300 
301 	*col_pos = ULINT_UNDEFINED;
302 
303 	ptr += 2;
304 
305 	ulint	num_idx = mach_read_next_compressed(&ptr);
306 
307 	ut_ad(num_idx > 0);
308 
309 	dict_index_t*	clust_index = dict_table_get_first_index(table);
310 
311 	for (ulint i = 0; i < num_idx; i++) {
312 		index_id_t	id = mach_read_next_compressed(&ptr);
313 		ulint		pos = mach_read_next_compressed(&ptr);
314 		dict_index_t*	index = dict_table_get_next_index(clust_index);
315 
316 		while (index != NULL) {
317 			/* Return if we find a matching index.
318 			TODO: in the future, it might be worth to add
319 			checks on other indexes */
320 			if (index->id == id) {
321 				const dict_col_t* col = dict_index_get_nth_col(
322 					index, pos);
323 				ut_ad(dict_col_is_virtual(col));
324 				const dict_v_col_t*	vcol = reinterpret_cast<
325 					const dict_v_col_t*>(col);
326 				*col_pos = vcol->v_pos;
327 				return(old_ptr + len);
328 			}
329 
330 			index = dict_table_get_next_index(index);
331 		}
332 	}
333 
334 	return(old_ptr + len);
335 }
336 
337 /** Read virtual column index from undo log or online log if the log
338 contains such info, and in the undo log case, verify the column is
339 still indexed, and output its position
340 @param[in]	table		the table
341 @param[in]	ptr		undo log pointer
342 @param[in]	first_v_col	if this is the first virtual column, which
343 				has the version marker
344 @param[in,out]	is_undo_log	this function is used to parse both undo log,
345 				and online log for virtual columns. So
346 				check to see if this is undo log. When
347 				first_v_col is true, is_undo_log is output,
348 				when first_v_col is false, is_undo_log is input
349 @param[in,out]	field_no	the column number
350 @return remaining part of undo log record after reading these values */
351 const byte*
trx_undo_read_v_idx(const dict_table_t * table,const byte * ptr,bool first_v_col,bool * is_undo_log,ulint * field_no)352 trx_undo_read_v_idx(
353 	const dict_table_t*	table,
354 	const byte*		ptr,
355 	bool			first_v_col,
356 	bool*			is_undo_log,
357 	ulint*			field_no)
358 {
359 	/* Version marker only put on the first virtual column */
360 	if (first_v_col) {
361 		/* Undo log has the virtual undo log marker */
362 		*is_undo_log = (mach_read_from_1(ptr)
363 				== VIRTUAL_COL_UNDO_FORMAT_1);
364 
365 		if (*is_undo_log) {
366 			ptr += 1;
367 		}
368 	}
369 
370 	if (*is_undo_log) {
371 		ptr = trx_undo_read_v_idx_low(table, ptr, field_no);
372 	} else {
373 		*field_no -= REC_MAX_N_FIELDS;
374 	}
375 
376 	return(ptr);
377 }
378 
379 /** Reports in the undo log of an insert of virtual columns.
380 @param[in]	undo_page	undo log page
381 @param[in]	table		the table
382 @param[in]	row		dtuple contains the virtual columns
383 @param[in,out]	ptr		log ptr
384 @return true if write goes well, false if out of space */
385 static
386 bool
trx_undo_report_insert_virtual(page_t * undo_page,dict_table_t * table,const dtuple_t * row,byte ** ptr)387 trx_undo_report_insert_virtual(
388 	page_t*		undo_page,
389 	dict_table_t*	table,
390 	const dtuple_t*	row,
391 	byte**		ptr)
392 {
393 	byte*	start = *ptr;
394 	bool	first_v_col = true;
395 
396 	if (trx_undo_left(undo_page, *ptr) < 2) {
397 		return(false);
398 	}
399 
400 	/* Reserve 2 bytes to write the number
401 	of bytes the stored fields take in this
402 	undo record */
403 	*ptr += 2;
404 
405 	for (ulint col_no = 0; col_no < dict_table_get_n_v_cols(table);
406 	     col_no++) {
407 		dfield_t*       vfield = NULL;
408 
409 		const dict_v_col_t*     col
410 			= dict_table_get_nth_v_col(table, col_no);
411 
412 		if (col->m_col.ord_part) {
413 
414 			/* make sure enought space to write the length */
415 			if (trx_undo_left(undo_page, *ptr) < 5) {
416 				return(false);
417 			}
418 
419 			ulint   pos = col_no;
420 			pos += REC_MAX_N_FIELDS;
421 			*ptr += mach_write_compressed(*ptr, pos);
422 
423 			*ptr = trx_undo_log_v_idx(undo_page, table,
424 						  col_no, *ptr, first_v_col);
425 			first_v_col = false;
426 
427 			if (*ptr == NULL) {
428 				return(false);
429 			}
430 
431 			vfield = dtuple_get_nth_v_field(row, col->v_pos);
432 			ulint	flen = vfield->len;
433 
434 			if (flen != UNIV_SQL_NULL) {
435 				ulint	max_len
436 					= dict_max_v_field_len_store_undo(
437 						table, col_no);
438 
439 				if (flen > max_len) {
440 					flen = max_len;
441 				}
442 
443 				if (trx_undo_left(undo_page, *ptr) < flen + 5) {
444 
445 					return(false);
446 				}
447 				*ptr += mach_write_compressed(*ptr, flen);
448 
449 				ut_memcpy(*ptr, vfield->data, flen);
450 				*ptr += flen;
451 			} else {
452 				if (trx_undo_left(undo_page, *ptr) < 5) {
453 
454 					return(false);
455 				}
456 
457 				*ptr += mach_write_compressed(*ptr, flen);
458 			}
459 		}
460 	}
461 
462 	/* Always mark the end of the log with 2 bytes length field */
463 	mach_write_to_2(start, *ptr - start);
464 
465 	return(true);
466 }
467 
468 /**********************************************************************//**
469 Reports in the undo log of an insert of a clustered index record.
470 @return offset of the inserted entry on the page if succeed, 0 if fail */
471 static
472 ulint
trx_undo_page_report_insert(page_t * undo_page,trx_t * trx,dict_index_t * index,const dtuple_t * clust_entry,mtr_t * mtr)473 trx_undo_page_report_insert(
474 /*========================*/
475 	page_t*		undo_page,	/*!< in: undo log page */
476 	trx_t*		trx,		/*!< in: transaction */
477 	dict_index_t*	index,		/*!< in: clustered index */
478 	const dtuple_t*	clust_entry,	/*!< in: index entry which will be
479 					inserted to the clustered index */
480 	mtr_t*		mtr)		/*!< in: mtr */
481 {
482 	ulint		first_free;
483 	byte*		ptr;
484 	ulint		i;
485 
486 	ut_ad(dict_index_is_clust(index));
487 	ut_ad(mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR
488 			       + TRX_UNDO_PAGE_TYPE) == TRX_UNDO_INSERT);
489 
490 	first_free = mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR
491 				      + TRX_UNDO_PAGE_FREE);
492 	ptr = undo_page + first_free;
493 
494 	ut_ad(first_free <= UNIV_PAGE_SIZE);
495 
496 	if (trx_undo_left(undo_page, ptr) < 2 + 1 + 11 + 11) {
497 
498 		/* Not enough space for writing the general parameters */
499 
500 		return(0);
501 	}
502 
503 	/* Reserve 2 bytes for the pointer to the next undo log record */
504 	ptr += 2;
505 
506 	/* Store first some general parameters to the undo log */
507 	*ptr++ = TRX_UNDO_INSERT_REC;
508 	ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
509 	ptr += mach_u64_write_much_compressed(ptr, index->table->id);
510 	/*----------------------------------------*/
511 	/* Store then the fields required to uniquely determine the record
512 	to be inserted in the clustered index */
513 
514 	for (i = 0; i < dict_index_get_n_unique(index); i++) {
515 
516 		const dfield_t*	field	= dtuple_get_nth_field(clust_entry, i);
517 		ulint		flen	= dfield_get_len(field);
518 
519 		if (trx_undo_left(undo_page, ptr) < 5) {
520 
521 			return(0);
522 		}
523 
524 		ptr += mach_write_compressed(ptr, flen);
525 
526 		if (flen != UNIV_SQL_NULL) {
527 			if (trx_undo_left(undo_page, ptr) < flen) {
528 
529 				return(0);
530 			}
531 
532 			ut_memcpy(ptr, dfield_get_data(field), flen);
533 			ptr += flen;
534 		}
535 	}
536 
537 	if (index->table->n_v_cols) {
538 		if (!trx_undo_report_insert_virtual(
539 			undo_page, index->table, clust_entry, &ptr)) {
540 			return(0);
541 		}
542 	}
543 
544 	return(trx_undo_page_set_next_prev_and_add(undo_page, ptr, mtr));
545 }
546 
547 /**********************************************************************//**
548 Reads from an undo log record the general parameters.
549 @return remaining part of undo log record after reading these values */
550 byte*
trx_undo_rec_get_pars(trx_undo_rec_t * undo_rec,ulint * type,ulint * cmpl_info,bool * updated_extern,undo_no_t * undo_no,table_id_t * table_id)551 trx_undo_rec_get_pars(
552 /*==================*/
553 	trx_undo_rec_t*	undo_rec,	/*!< in: undo log record */
554 	ulint*		type,		/*!< out: undo record type:
555 					TRX_UNDO_INSERT_REC, ... */
556 	ulint*		cmpl_info,	/*!< out: compiler info, relevant only
557 					for update type records */
558 	bool*		updated_extern,	/*!< out: true if we updated an
559 					externally stored fild */
560 	undo_no_t*	undo_no,	/*!< out: undo log record number */
561 	table_id_t*	table_id)	/*!< out: table id */
562 {
563 	const byte*	ptr;
564 	ulint		type_cmpl;
565 
566 	ptr = undo_rec + 2;
567 
568 	type_cmpl = mach_read_from_1(ptr);
569 	ptr++;
570 
571 	*updated_extern = !!(type_cmpl & TRX_UNDO_UPD_EXTERN);
572 	type_cmpl &= ~TRX_UNDO_UPD_EXTERN;
573 
574 	*type = type_cmpl & (TRX_UNDO_CMPL_INFO_MULT - 1);
575 	*cmpl_info = type_cmpl / TRX_UNDO_CMPL_INFO_MULT;
576 
577 	*undo_no = mach_read_next_much_compressed(&ptr);
578 	*table_id = mach_read_next_much_compressed(&ptr);
579 
580 	return(const_cast<byte*>(ptr));
581 }
582 
583 /** Read from an undo log record a non-virtual column value.
584 @param[in,out]	ptr		pointer to remaining part of the undo record
585 @param[in,out]	field		stored field
586 @param[in,out]	len		length of the field, or UNIV_SQL_NULL
587 @param[in,out]	orig_len	original length of the locally stored part
588 of an externally stored column, or 0
589 @return remaining part of undo log record after reading these values */
590 byte*
trx_undo_rec_get_col_val(const byte * ptr,const byte ** field,ulint * len,ulint * orig_len)591 trx_undo_rec_get_col_val(
592 	const byte*	ptr,
593 	const byte**	field,
594 	ulint*		len,
595 	ulint*		orig_len)
596 {
597 	*len = mach_read_next_compressed(&ptr);
598 	*orig_len = 0;
599 
600 	switch (*len) {
601 	case UNIV_SQL_NULL:
602 		*field = NULL;
603 		break;
604 	case UNIV_EXTERN_STORAGE_FIELD:
605 		*orig_len = mach_read_next_compressed(&ptr);
606 		*len = mach_read_next_compressed(&ptr);
607 		*field = ptr;
608 		ptr += *len & ~SPATIAL_STATUS_MASK;
609 
610 		ut_ad(*orig_len >= BTR_EXTERN_FIELD_REF_SIZE);
611 		ut_ad(*len > *orig_len);
612 		/* @see dtuple_convert_big_rec() */
613 		ut_ad(*len >= BTR_EXTERN_FIELD_REF_SIZE);
614 
615 		/* we do not have access to index->table here
616 		ut_ad(dict_table_get_format(index->table) >= UNIV_FORMAT_B
617 		      || *len >= col->max_prefix
618 		      + BTR_EXTERN_FIELD_REF_SIZE);
619 		*/
620 
621 		*len += UNIV_EXTERN_STORAGE_FIELD;
622 		break;
623 	default:
624 		*field = ptr;
625 		if (*len >= UNIV_EXTERN_STORAGE_FIELD) {
626 			ptr += (*len - UNIV_EXTERN_STORAGE_FIELD)
627 				& ~SPATIAL_STATUS_MASK;
628 		} else {
629 			ptr += *len;
630 		}
631 	}
632 
633 	return(const_cast<byte*>(ptr));
634 }
635 
636 /*******************************************************************//**
637 Builds a row reference from an undo log record.
638 @return pointer to remaining part of undo record */
639 byte*
trx_undo_rec_get_row_ref(byte * ptr,dict_index_t * index,dtuple_t ** ref,mem_heap_t * heap)640 trx_undo_rec_get_row_ref(
641 /*=====================*/
642 	byte*		ptr,	/*!< in: remaining part of a copy of an undo log
643 				record, at the start of the row reference;
644 				NOTE that this copy of the undo log record must
645 				be preserved as long as the row reference is
646 				used, as we do NOT copy the data in the
647 				record! */
648 	dict_index_t*	index,	/*!< in: clustered index */
649 	dtuple_t**	ref,	/*!< out, own: row reference */
650 	mem_heap_t*	heap)	/*!< in: memory heap from which the memory
651 				needed is allocated */
652 {
653 	ulint		ref_len;
654 	ulint		i;
655 
656 	ut_ad(index && ptr && ref && heap);
657 	ut_a(dict_index_is_clust(index));
658 
659 	ref_len = dict_index_get_n_unique(index);
660 
661 	*ref = dtuple_create(heap, ref_len);
662 
663 	dict_index_copy_types(*ref, index, ref_len);
664 
665 	for (i = 0; i < ref_len; i++) {
666 		dfield_t*	dfield;
667 		const byte*	field;
668 		ulint		len;
669 		ulint		orig_len;
670 
671 		dfield = dtuple_get_nth_field(*ref, i);
672 
673 		ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
674 
675 		dfield_set_data(dfield, field, len);
676 	}
677 
678 	return(ptr);
679 }
680 
681 /*******************************************************************//**
682 Skips a row reference from an undo log record.
683 @return pointer to remaining part of undo record */
684 byte*
trx_undo_rec_skip_row_ref(byte * ptr,dict_index_t * index)685 trx_undo_rec_skip_row_ref(
686 /*======================*/
687 	byte*		ptr,	/*!< in: remaining part in update undo log
688 				record, at the start of the row reference */
689 	dict_index_t*	index)	/*!< in: clustered index */
690 {
691 	ulint	ref_len;
692 	ulint	i;
693 
694 	ut_ad(index && ptr);
695 	ut_a(dict_index_is_clust(index));
696 
697 	ref_len = dict_index_get_n_unique(index);
698 
699 	for (i = 0; i < ref_len; i++) {
700 		const byte*	field;
701 		ulint		len;
702 		ulint		orig_len;
703 
704 		ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
705 	}
706 
707 	return(ptr);
708 }
709 
710 /** Fetch a prefix of an externally stored column, for writing to the undo
711 log of an update or delete marking of a clustered index record.
712 @param[out]	ext_buf		buffer to hold the prefix data and BLOB pointer
713 @param[in]	prefix_len	prefix size to store in the undo log
714 @param[in]	page_size	page size
715 @param[in]	field		an externally stored column
716 @param[in,out]	len		input: length of field; output: used length of
717 ext_buf
718 @return ext_buf */
719 static
720 byte*
trx_undo_page_fetch_ext(byte * ext_buf,ulint prefix_len,const page_size_t & page_size,const byte * field,ulint * len)721 trx_undo_page_fetch_ext(
722 	byte*			ext_buf,
723 	ulint			prefix_len,
724 	const page_size_t&	page_size,
725 	const byte*		field,
726 	ulint*			len)
727 {
728 	/* Fetch the BLOB. */
729 	ulint	ext_len = btr_copy_externally_stored_field_prefix(
730 		ext_buf, prefix_len, page_size, field, *len);
731 	/* BLOBs should always be nonempty. */
732 	ut_a(ext_len);
733 	/* Append the BLOB pointer to the prefix. */
734 	memcpy(ext_buf + ext_len,
735 	       field + *len - BTR_EXTERN_FIELD_REF_SIZE,
736 	       BTR_EXTERN_FIELD_REF_SIZE);
737 	*len = ext_len + BTR_EXTERN_FIELD_REF_SIZE;
738 	return(ext_buf);
739 }
740 
741 /** Writes to the undo log a prefix of an externally stored column.
742 @param[out]	ptr		undo log position, at least 15 bytes must be
743 available
744 @param[out]	ext_buf		a buffer of DICT_MAX_FIELD_LEN_BY_FORMAT()
745 				size, or NULL when should not fetch a longer
746 				prefix
747 @param[in]	prefix_len	prefix size to store in the undo log
748 @param[in]	page_size	page size
749 @param[in,out]	field		the locally stored part of the externally
750 stored column
751 @param[in,out]	len		length of field, in bytes
752 @param[in]	spatial_status	whether the column is used by spatial index or
753 				regular index
754 @return undo log position */
755 static
756 byte*
trx_undo_page_report_modify_ext(byte * ptr,byte * ext_buf,ulint prefix_len,const page_size_t & page_size,const byte ** field,ulint * len,spatial_status_t spatial_status)757 trx_undo_page_report_modify_ext(
758 	byte*			ptr,
759 	byte*			ext_buf,
760 	ulint			prefix_len,
761 	const page_size_t&	page_size,
762 	const byte**		field,
763 	ulint*			len,
764 	spatial_status_t	spatial_status)
765 {
766 	ulint	spatial_len= 0;
767 
768 	switch (spatial_status) {
769 	case SPATIAL_UNKNOWN:
770 	case SPATIAL_NONE:
771 		break;
772 
773 	case SPATIAL_MIXED:
774 	case SPATIAL_ONLY:
775 		spatial_len = DATA_MBR_LEN;
776 		break;
777 	}
778 
779 	/* Encode spatial status into length. */
780 	spatial_len |= spatial_status << SPATIAL_STATUS_SHIFT;
781 
782 	if (spatial_status == SPATIAL_ONLY) {
783 		/* If the column is only used by gis index, log its
784 		MBR is enough.*/
785 		ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD
786 					     + spatial_len);
787 
788 		return(ptr);
789 	}
790 
791 	if (ext_buf) {
792 		ut_a(prefix_len > 0);
793 
794 		/* If an ordering column is externally stored, we will
795 		have to store a longer prefix of the field.  In this
796 		case, write to the log a marker followed by the
797 		original length and the real length of the field. */
798 		ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD);
799 
800 		ptr += mach_write_compressed(ptr, *len);
801 
802 		*field = trx_undo_page_fetch_ext(ext_buf, prefix_len,
803 						 page_size, *field, len);
804 
805 		ptr += mach_write_compressed(ptr, *len + spatial_len);
806 	} else {
807 		ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD
808 					     + *len + spatial_len);
809 	}
810 
811 	return(ptr);
812 }
813 
814 /** Get MBR from a Geometry column stored externally
815 @param[out]	mbr		MBR to fill
816 @param[in]	pagesize	table pagesize
817 @param[in]	field		field contain the geometry data
818 @param[in,out]	len		length of field, in bytes
819 */
820 static
821 void
trx_undo_get_mbr_from_ext(double * mbr,const page_size_t & page_size,const byte * field,ulint * len)822 trx_undo_get_mbr_from_ext(
823 /*======================*/
824 	double*			mbr,
825 	const page_size_t&      page_size,
826 	const byte*             field,
827 	ulint*			len)
828 {
829 	uchar*		dptr = NULL;
830 	ulint		dlen;
831 	mem_heap_t*	heap = mem_heap_create(100);
832 
833 	dptr = btr_copy_externally_stored_field(
834 		&dlen, field, page_size, *len, heap);
835 
836 	if (dlen <= GEO_DATA_HEADER_SIZE) {
837 		for (uint i = 0; i < SPDIMS; ++i) {
838 			mbr[i * 2] = DBL_MAX;
839 			mbr[i * 2 + 1] = -DBL_MAX;
840 		}
841 	} else {
842 		rtree_mbr_from_wkb(dptr + GEO_DATA_HEADER_SIZE,
843 				   static_cast<uint>(dlen
844 				   - GEO_DATA_HEADER_SIZE), SPDIMS, mbr);
845 	}
846 
847 	mem_heap_free(heap);
848 }
849 
850 /**********************************************************************//**
851 Reports in the undo log of an update or delete marking of a clustered index
852 record.
853 @return byte offset of the inserted undo log entry on the page if
854 succeed, 0 if fail */
855 static
856 ulint
trx_undo_page_report_modify(page_t * undo_page,trx_t * trx,dict_index_t * index,const rec_t * rec,const ulint * offsets,const upd_t * update,ulint cmpl_info,const dtuple_t * row,mtr_t * mtr)857 trx_undo_page_report_modify(
858 /*========================*/
859 	page_t*		undo_page,	/*!< in: undo log page */
860 	trx_t*		trx,		/*!< in: transaction */
861 	dict_index_t*	index,		/*!< in: clustered index where update or
862 					delete marking is done */
863 	const rec_t*	rec,		/*!< in: clustered index record which
864 					has NOT yet been modified */
865 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec, index) */
866 	const upd_t*	update,		/*!< in: update vector which tells the
867 					columns to be updated; in the case of
868 					a delete, this should be set to NULL */
869 	ulint		cmpl_info,	/*!< in: compiler info on secondary
870 					index updates */
871 	const dtuple_t*	row,		/*!< in: clustered index row contains
872 					virtual column info */
873 	mtr_t*		mtr)		/*!< in: mtr */
874 {
875 	dict_table_t*	table;
876 	ulint		first_free;
877 	byte*		ptr;
878 	const byte*	field;
879 	ulint		flen;
880 	ulint		col_no;
881 	ulint		type_cmpl;
882 	byte*		type_cmpl_ptr;
883 	ulint		i;
884 	trx_id_t	trx_id;
885 	trx_undo_ptr_t*	undo_ptr;
886 	ibool		ignore_prefix = FALSE;
887 	byte		ext_buf[REC_VERSION_56_MAX_INDEX_COL_LEN
888 				+ BTR_EXTERN_FIELD_REF_SIZE];
889 	bool		first_v_col = true;
890 
891 	ut_a(dict_index_is_clust(index));
892 	ut_ad(rec_offs_validate(rec, index, offsets));
893 	ut_ad(mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR
894 			       + TRX_UNDO_PAGE_TYPE) == TRX_UNDO_UPDATE);
895 	table = index->table;
896 
897 	/* If table instance is temporary then select noredo rseg as changes
898 	to undo logs don't need REDO logging given that they are not
899 	restored on restart as corresponding object doesn't exist on restart.*/
900 	undo_ptr = dict_table_is_temporary(index->table)
901 		   ? &trx->rsegs.m_noredo : &trx->rsegs.m_redo;
902 
903 	first_free = mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR
904 				      + TRX_UNDO_PAGE_FREE);
905 	ptr = undo_page + first_free;
906 
907 	ut_ad(first_free <= UNIV_PAGE_SIZE);
908 
909 	if (trx_undo_left(undo_page, ptr) < 50) {
910 
911 		/* NOTE: the value 50 must be big enough so that the general
912 		fields written below fit on the undo log page */
913 
914 		return(0);
915 	}
916 
917 	/* Reserve 2 bytes for the pointer to the next undo log record */
918 	ptr += 2;
919 
920 	/* Store first some general parameters to the undo log */
921 
922 	if (!update) {
923 		ut_ad(!rec_get_deleted_flag(rec, dict_table_is_comp(table)));
924 		type_cmpl = TRX_UNDO_DEL_MARK_REC;
925 	} else if (rec_get_deleted_flag(rec, dict_table_is_comp(table))) {
926 		type_cmpl = TRX_UNDO_UPD_DEL_REC;
927 		/* We are about to update a delete marked record.
928 		We don't typically need the prefix in this case unless
929 		the delete marking is done by the same transaction
930 		(which we check below). */
931 		ignore_prefix = TRUE;
932 	} else {
933 		type_cmpl = TRX_UNDO_UPD_EXIST_REC;
934 	}
935 
936 	type_cmpl |= cmpl_info * TRX_UNDO_CMPL_INFO_MULT;
937 	type_cmpl_ptr = ptr;
938 
939 	*ptr++ = (byte) type_cmpl;
940 	ptr += mach_u64_write_much_compressed(ptr, trx->undo_no);
941 
942 	ptr += mach_u64_write_much_compressed(ptr, table->id);
943 
944 	/*----------------------------------------*/
945 	/* Store the state of the info bits */
946 
947 	*ptr++ = (byte) rec_get_info_bits(rec, dict_table_is_comp(table));
948 
949 	/* Store the values of the system columns */
950 	field = rec_get_nth_field(rec, offsets,
951 				  dict_index_get_sys_col_pos(
952 					  index, DATA_TRX_ID), &flen);
953 	ut_ad(flen == DATA_TRX_ID_LEN);
954 
955 	trx_id = trx_read_trx_id(field);
956 
957 	/* If it is an update of a delete marked record, then we are
958 	allowed to ignore blob prefixes if the delete marking was done
959 	by some other trx as it must have committed by now for us to
960 	allow an over-write. */
961 	if (ignore_prefix) {
962 		ignore_prefix = (trx_id != trx->id);
963 	}
964 	ptr += mach_u64_write_compressed(ptr, trx_id);
965 
966 	field = rec_get_nth_field(rec, offsets,
967 				  dict_index_get_sys_col_pos(
968 					  index, DATA_ROLL_PTR), &flen);
969 	ut_ad(flen == DATA_ROLL_PTR_LEN);
970 
971 	ptr += mach_u64_write_compressed(ptr, trx_read_roll_ptr(field));
972 
973 	/*----------------------------------------*/
974 	/* Store then the fields required to uniquely determine the
975 	record which will be modified in the clustered index */
976 
977 	for (i = 0; i < dict_index_get_n_unique(index); i++) {
978 
979 		field = rec_get_nth_field(rec, offsets, i, &flen);
980 
981 		/* The ordering columns must not be stored externally. */
982 		ut_ad(!rec_offs_nth_extern(offsets, i));
983 		ut_ad(dict_index_get_nth_col(index, i)->ord_part);
984 
985 		if (trx_undo_left(undo_page, ptr) < 5) {
986 
987 			return(0);
988 		}
989 
990 		ptr += mach_write_compressed(ptr, flen);
991 
992 		if (flen != UNIV_SQL_NULL) {
993 			if (trx_undo_left(undo_page, ptr) < flen) {
994 
995 				return(0);
996 			}
997 
998 			ut_memcpy(ptr, field, flen);
999 			ptr += flen;
1000 		}
1001 	}
1002 
1003 	/*----------------------------------------*/
1004 	/* Save to the undo log the old values of the columns to be updated. */
1005 
1006 	if (update) {
1007 		if (trx_undo_left(undo_page, ptr) < 5) {
1008 
1009 			return(0);
1010 		}
1011 
1012 		ulint	n_updated = upd_get_n_fields(update);
1013 
1014 		/* If this is an online update while an inplace alter table
1015 		is in progress and the table has virtual column, we will
1016 		need to double check if there are any non-indexed columns
1017 		being registered in update vector in case they will be indexed
1018 		in new table */
1019 		if (dict_index_is_online_ddl(index)
1020 		    && index->table->n_v_cols > 0) {
1021 			for (i = 0; i < upd_get_n_fields(update); i++) {
1022 				upd_field_t*	fld = upd_get_nth_field(
1023 					update, i);
1024 				ulint		pos = fld->field_no;
1025 
1026 				/* These columns must not have an index
1027 				on them */
1028 				if (upd_fld_is_virtual_col(fld)
1029 				    && dict_table_get_nth_v_col(
1030 					table, pos)->v_indexes->empty()) {
1031 					n_updated--;
1032 				}
1033 			}
1034 		}
1035 
1036 		ptr += mach_write_compressed(ptr, n_updated);
1037 
1038 		for (i = 0; i < upd_get_n_fields(update); i++) {
1039 			upd_field_t*	fld = upd_get_nth_field(update, i);
1040 
1041 			bool	is_virtual = upd_fld_is_virtual_col(fld);
1042 			ulint	max_v_log_len = 0;
1043 
1044 			ulint	pos = fld->field_no;
1045 
1046 			/* Write field number to undo log */
1047 			if (trx_undo_left(undo_page, ptr) < 5) {
1048 
1049 				return(0);
1050 			}
1051 
1052 			if (is_virtual) {
1053 				/* Skip the non-indexed column, during
1054 				an online alter table */
1055 				if (dict_index_is_online_ddl(index)
1056 				    && dict_table_get_nth_v_col(
1057 					table, pos)->v_indexes->empty()) {
1058 					continue;
1059 				}
1060 
1061 				/* add REC_MAX_N_FIELDS to mark this
1062 				is a virtual col */
1063 				pos += REC_MAX_N_FIELDS;
1064 			}
1065 
1066 			ptr += mach_write_compressed(ptr, pos);
1067 
1068 			/* Save the old value of field */
1069 			if (is_virtual) {
1070 				ut_ad(fld->field_no < table->n_v_def);
1071 
1072 				ptr = trx_undo_log_v_idx(undo_page, table,
1073 							 fld->field_no, ptr,
1074 							 first_v_col);
1075 				if (ptr == NULL) {
1076 					 return(0);
1077 				}
1078 				first_v_col = false;
1079 
1080 				max_v_log_len
1081 					= dict_max_v_field_len_store_undo(
1082 						table, fld->field_no);
1083 
1084 				field = static_cast<byte*>(
1085 					fld->old_v_val->data);
1086 				flen = fld->old_v_val->len;
1087 
1088 				/* Only log sufficient bytes for index
1089 				record update */
1090 				if (flen != UNIV_SQL_NULL) {
1091 					flen = ut_min(
1092 						flen, max_v_log_len);
1093 				}
1094 			} else {
1095 				field = rec_get_nth_field(rec, offsets,
1096 							  pos, &flen);
1097 			}
1098 
1099 			if (trx_undo_left(undo_page, ptr) < 15) {
1100 
1101 				return(0);
1102 			}
1103 
1104 			if (!is_virtual && rec_offs_nth_extern(offsets, pos)) {
1105 				const dict_col_t*	col
1106 					= dict_index_get_nth_col(index, pos);
1107 				ulint			prefix_len
1108 					= dict_max_field_len_store_undo(
1109 						table, col);
1110 
1111 				ut_ad(prefix_len + BTR_EXTERN_FIELD_REF_SIZE
1112 				      <= sizeof ext_buf);
1113 
1114 				ptr = trx_undo_page_report_modify_ext(
1115 					ptr,
1116 					col->ord_part
1117 					&& !ignore_prefix
1118 					&& flen < REC_ANTELOPE_MAX_INDEX_COL_LEN
1119 					? ext_buf : NULL, prefix_len,
1120 					dict_table_page_size(table),
1121 					&field, &flen, SPATIAL_UNKNOWN);
1122 
1123 				/* Notify purge that it eventually has to
1124 				free the old externally stored field */
1125 
1126 				undo_ptr->update_undo->del_marks = TRUE;
1127 
1128 				*type_cmpl_ptr |= TRX_UNDO_UPD_EXTERN;
1129 			} else {
1130 				ptr += mach_write_compressed(ptr, flen);
1131 			}
1132 
1133 			if (flen != UNIV_SQL_NULL) {
1134 				if (trx_undo_left(undo_page, ptr) < flen) {
1135 
1136 					return(0);
1137 				}
1138 
1139 				ut_memcpy(ptr, field, flen);
1140 				ptr += flen;
1141 			}
1142 
1143 			/* Also record the new value for virtual column */
1144 			if (is_virtual) {
1145 				field = static_cast<byte*>(fld->new_val.data);
1146 				flen = fld->new_val.len;
1147 				if (flen != UNIV_SQL_NULL) {
1148 					flen = ut_min(
1149 						flen, max_v_log_len);
1150 				}
1151 
1152 				if (trx_undo_left(undo_page, ptr) < 15) {
1153 
1154 					return(0);
1155 				}
1156 
1157 				ptr += mach_write_compressed(ptr, flen);
1158 
1159 				if (flen != UNIV_SQL_NULL) {
1160 					if (trx_undo_left(undo_page, ptr) < flen) {
1161 
1162 						return(0);
1163 					}
1164 
1165 					ut_memcpy(ptr, field, flen);
1166 					ptr += flen;
1167 				}
1168 			}
1169 		}
1170 	}
1171 
1172 	/* Reset the first_v_col, so to put the virtual column undo
1173 	version marker again, when we log all the indexed columns */
1174 	first_v_col = true;
1175 
1176 	/*----------------------------------------*/
1177 	/* In the case of a delete marking, and also in the case of an update
1178 	where any ordering field of any index changes, store the values of all
1179 	columns which occur as ordering fields in any index. This info is used
1180 	in the purge of old versions where we use it to build and search the
1181 	delete marked index records, to look if we can remove them from the
1182 	index tree. Note that starting from 4.0.14 also externally stored
1183 	fields can be ordering in some index. Starting from 5.2, we no longer
1184 	store REC_MAX_INDEX_COL_LEN first bytes to the undo log record,
1185 	but we can construct the column prefix fields in the index by
1186 	fetching the first page of the BLOB that is pointed to by the
1187 	clustered index. This works also in crash recovery, because all pages
1188 	(including BLOBs) are recovered before anything is rolled back. */
1189 
1190 	if (!update || !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
1191 		byte*		old_ptr = ptr;
1192 		double		mbr[SPDIMS * 2];
1193 		mem_heap_t*	row_heap = NULL;
1194 
1195 		undo_ptr->update_undo->del_marks = TRUE;
1196 
1197 		if (trx_undo_left(undo_page, ptr) < 5) {
1198 
1199 			return(0);
1200 		}
1201 
1202 		/* Reserve 2 bytes to write the number of bytes the stored
1203 		fields take in this undo record */
1204 
1205 		ptr += 2;
1206 
1207 		for (col_no = 0; col_no < dict_table_get_n_cols(table);
1208 		     col_no++) {
1209 
1210 			const dict_col_t*	col
1211 				= dict_table_get_nth_col(table, col_no);
1212 
1213 			if (col->ord_part) {
1214 				ulint			pos;
1215 				spatial_status_t	spatial_status;
1216 
1217 				spatial_status = SPATIAL_NONE;
1218 
1219 				/* Write field number to undo log */
1220 				if (trx_undo_left(undo_page, ptr) < 5 + 15) {
1221 
1222 					return(0);
1223 				}
1224 
1225 				pos = dict_index_get_nth_col_pos(index,
1226 								 col_no);
1227 				ptr += mach_write_compressed(ptr, pos);
1228 
1229 				/* Save the old value of field */
1230 				field = rec_get_nth_field(rec, offsets, pos,
1231 							  &flen);
1232 
1233 				if (rec_offs_nth_extern(offsets, pos)) {
1234 					const dict_col_t*	col =
1235 						dict_index_get_nth_col(
1236 							index, pos);
1237 					ulint			prefix_len =
1238 						dict_max_field_len_store_undo(
1239 							table, col);
1240 
1241 					ut_a(prefix_len < sizeof ext_buf);
1242 
1243 
1244 					spatial_status =
1245 						dict_col_get_spatial_status(
1246 							col);
1247 
1248 					/* If there is a spatial index on it,
1249 					log its MBR */
1250 					if (spatial_status != SPATIAL_NONE) {
1251 						ut_ad(DATA_GEOMETRY_MTYPE(
1252 								col->mtype));
1253 
1254 						trx_undo_get_mbr_from_ext(
1255 							mbr,
1256 							dict_table_page_size(
1257 								table),
1258 							field, &flen);
1259 					}
1260 
1261 					ptr = trx_undo_page_report_modify_ext(
1262 						ptr,
1263 						flen < REC_ANTELOPE_MAX_INDEX_COL_LEN
1264 						&& !ignore_prefix
1265 						? ext_buf : NULL, prefix_len,
1266 						dict_table_page_size(table),
1267 						&field, &flen,
1268 						spatial_status);
1269 				} else {
1270 					ptr += mach_write_compressed(
1271 						ptr, flen);
1272 				}
1273 
1274 				if (flen != UNIV_SQL_NULL
1275 				    && spatial_status != SPATIAL_ONLY) {
1276 					if (trx_undo_left(undo_page, ptr)
1277 					    < flen) {
1278 
1279 						return(0);
1280 					}
1281 
1282 					ut_memcpy(ptr, field, flen);
1283 					ptr += flen;
1284 				}
1285 
1286 				if (spatial_status != SPATIAL_NONE) {
1287 					if (trx_undo_left(undo_page, ptr)
1288 					    < DATA_MBR_LEN) {
1289 						return(0);
1290 					}
1291 
1292 					for (int i = 0; i < SPDIMS * 2;
1293 					     i++) {
1294 						mach_double_write(
1295 							ptr, mbr[i]);
1296 						ptr +=  sizeof(double);
1297 					}
1298 				}
1299 			}
1300 		}
1301 
1302 		for (col_no = 0; col_no < dict_table_get_n_v_cols(table);
1303 		     col_no++) {
1304 			dfield_t*	vfield = NULL;
1305 
1306 			const dict_v_col_t*     col
1307 				= dict_table_get_nth_v_col(table, col_no);
1308 
1309 			if (col->m_col.ord_part) {
1310 				ulint   pos = col_no;
1311 				ulint	max_v_log_len
1312 					= dict_max_v_field_len_store_undo(
1313 						table, pos);
1314 
1315 				/* Write field number to undo log.
1316 				Make sure there is enought space in log */
1317 				if (trx_undo_left(undo_page, ptr) < 5) {
1318 
1319 					return(0);
1320 				}
1321 
1322 				pos += REC_MAX_N_FIELDS;
1323 				ptr += mach_write_compressed(ptr, pos);
1324 
1325 				ut_ad(col_no < table->n_v_def);
1326 				ptr = trx_undo_log_v_idx(undo_page, table,
1327 							 col_no, ptr,
1328 							 first_v_col);
1329 				first_v_col = false;
1330 
1331 				if (!ptr) {
1332 					 return(0);
1333 				}
1334 
1335 				if (update) {
1336 					ut_ad(!row);
1337 					if (update->old_vrow == NULL) {
1338 						flen = UNIV_SQL_NULL;
1339 					} else {
1340 						vfield = dtuple_get_nth_v_field(
1341 							update->old_vrow,
1342 							col->v_pos);
1343 					}
1344 				} else if (row) {
1345 					vfield = dtuple_get_nth_v_field(
1346 						row, col->v_pos);
1347 				} else {
1348 					ut_ad(0);
1349 				}
1350 
1351 				if (vfield) {
1352 					field = static_cast<byte*>(vfield->data);
1353 					flen = vfield->len;
1354 				} else {
1355 					ut_ad(flen == UNIV_SQL_NULL);
1356 				}
1357 
1358 				if (flen != UNIV_SQL_NULL) {
1359 					flen = ut_min(
1360 						flen, max_v_log_len);
1361 				}
1362 
1363 				ptr += mach_write_compressed(ptr, flen);
1364 
1365 				if (flen != UNIV_SQL_NULL) {
1366 					if (trx_undo_left(undo_page, ptr)
1367 					    < flen) {
1368 
1369 						return(0);
1370 					}
1371 
1372 					ut_memcpy(ptr, field, flen);
1373 					ptr += flen;
1374 				}
1375 			}
1376 		}
1377 
1378 		mach_write_to_2(old_ptr, ptr - old_ptr);
1379 
1380 		if (row_heap) {
1381 			mem_heap_free(row_heap);
1382 		}
1383 	}
1384 
1385 	/*----------------------------------------*/
1386 	/* Write pointers to the previous and the next undo log records */
1387 	if (trx_undo_left(undo_page, ptr) < 2) {
1388 
1389 		return(0);
1390 	}
1391 
1392 	mach_write_to_2(ptr, first_free);
1393 	ptr += 2;
1394 	mach_write_to_2(undo_page + first_free, ptr - undo_page);
1395 
1396 	mach_write_to_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE,
1397 			ptr - undo_page);
1398 
1399 	/* Write to the REDO log about this change in the UNDO log */
1400 
1401 	trx_undof_page_add_undo_rec_log(undo_page, first_free,
1402 					ptr - undo_page, mtr);
1403 	return(first_free);
1404 }
1405 
1406 /**********************************************************************//**
1407 Reads from an undo log update record the system field values of the old
1408 version.
1409 @return remaining part of undo log record after reading these values */
1410 byte*
trx_undo_update_rec_get_sys_cols(const byte * ptr,trx_id_t * trx_id,roll_ptr_t * roll_ptr,ulint * info_bits)1411 trx_undo_update_rec_get_sys_cols(
1412 /*=============================*/
1413 	const byte*	ptr,		/*!< in: remaining part of undo
1414 					log record after reading
1415 					general parameters */
1416 	trx_id_t*	trx_id,		/*!< out: trx id */
1417 	roll_ptr_t*	roll_ptr,	/*!< out: roll ptr */
1418 	ulint*		info_bits)	/*!< out: info bits state */
1419 {
1420 	/* Read the state of the info bits */
1421 	*info_bits = mach_read_from_1(ptr);
1422 	ptr += 1;
1423 
1424 	/* Read the values of the system columns */
1425 
1426 	*trx_id = mach_u64_read_next_compressed(&ptr);
1427 	*roll_ptr = mach_u64_read_next_compressed(&ptr);
1428 
1429 	return(const_cast<byte*>(ptr));
1430 }
1431 
1432 /*******************************************************************//**
1433 Builds an update vector based on a remaining part of an undo log record.
1434 @return remaining part of the record, NULL if an error detected, which
1435 means that the record is corrupted */
1436 byte*
trx_undo_update_rec_get_update(const byte * ptr,dict_index_t * index,ulint type,trx_id_t trx_id,roll_ptr_t roll_ptr,ulint info_bits,trx_t * trx,mem_heap_t * heap,upd_t ** upd)1437 trx_undo_update_rec_get_update(
1438 /*===========================*/
1439 	const byte*	ptr,	/*!< in: remaining part in update undo log
1440 				record, after reading the row reference
1441 				NOTE that this copy of the undo log record must
1442 				be preserved as long as the update vector is
1443 				used, as we do NOT copy the data in the
1444 				record! */
1445 	dict_index_t*	index,	/*!< in: clustered index */
1446 	ulint		type,	/*!< in: TRX_UNDO_UPD_EXIST_REC,
1447 				TRX_UNDO_UPD_DEL_REC, or
1448 				TRX_UNDO_DEL_MARK_REC; in the last case,
1449 				only trx id and roll ptr fields are added to
1450 				the update vector */
1451 	trx_id_t	trx_id,	/*!< in: transaction id from this undo record */
1452 	roll_ptr_t	roll_ptr,/*!< in: roll pointer from this undo record */
1453 	ulint		info_bits,/*!< in: info bits from this undo record */
1454 	trx_t*		trx,	/*!< in: transaction */
1455 	mem_heap_t*	heap,	/*!< in: memory heap from which the memory
1456 				needed is allocated */
1457 	upd_t**		upd)	/*!< out, own: update vector */
1458 {
1459 	upd_field_t*	upd_field;
1460 	upd_t*		update;
1461 	ulint		n_fields;
1462 	byte*		buf;
1463 	ulint		i;
1464 	bool		first_v_col = true;
1465 	bool		is_undo_log = true;
1466 	ulint		n_skip_field = 0;
1467 
1468 	ut_a(dict_index_is_clust(index));
1469 
1470 	if (type != TRX_UNDO_DEL_MARK_REC) {
1471 		n_fields = mach_read_next_compressed(&ptr);
1472 	} else {
1473 		n_fields = 0;
1474 	}
1475 
1476 	update = upd_create(n_fields + 2, heap);
1477 
1478 	update->info_bits = info_bits;
1479 
1480 	/* Store first trx id and roll ptr to update vector */
1481 
1482 	upd_field = upd_get_nth_field(update, n_fields);
1483 
1484 	buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_TRX_ID_LEN));
1485 
1486 	trx_write_trx_id(buf, trx_id);
1487 
1488 	upd_field_set_field_no(upd_field,
1489 			       dict_index_get_sys_col_pos(index, DATA_TRX_ID),
1490 			       index, trx);
1491 	dfield_set_data(&(upd_field->new_val), buf, DATA_TRX_ID_LEN);
1492 
1493 	upd_field = upd_get_nth_field(update, n_fields + 1);
1494 
1495 	buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_ROLL_PTR_LEN));
1496 
1497 	trx_write_roll_ptr(buf, roll_ptr);
1498 
1499 	upd_field_set_field_no(
1500 		upd_field, dict_index_get_sys_col_pos(index, DATA_ROLL_PTR),
1501 		index, trx);
1502 	dfield_set_data(&(upd_field->new_val), buf, DATA_ROLL_PTR_LEN);
1503 
1504 	/* Store then the updated ordinary columns to the update vector */
1505 
1506 	for (i = 0; i < n_fields; i++) {
1507 
1508 		const byte*	field;
1509 		ulint		len;
1510 		ulint		field_no;
1511 		ulint		orig_len;
1512 		bool		is_virtual;
1513 
1514 		field_no = mach_read_next_compressed(&ptr);
1515 
1516 		is_virtual = (field_no >= REC_MAX_N_FIELDS);
1517 
1518 		if (is_virtual) {
1519 			/* If new version, we need to check index list to figure
1520 			out the correct virtual column position */
1521 			ptr = trx_undo_read_v_idx(
1522 				index->table, ptr, first_v_col, &is_undo_log,
1523 				&field_no);
1524 			first_v_col = false;
1525 		} else if (field_no >= dict_index_get_n_fields(index)) {
1526 			ib::error() << "Trying to access update undo rec"
1527 				" field " << field_no
1528 				<< " in index " << index->name
1529 				<< " of table " << index->table->name
1530 				<< " but index has only "
1531 				<< dict_index_get_n_fields(index)
1532 				<< " fields " << BUG_REPORT_MSG
1533 				<< ". Run also CHECK TABLE "
1534 				<< index->table->name << "."
1535 				" n_fields = " << n_fields << ", i = " << i
1536 				<< ", ptr " << ptr;
1537 
1538 			ut_ad(0);
1539 			*upd = NULL;
1540 			return(NULL);
1541 		}
1542 
1543 		upd_field = upd_get_nth_field(update, i);
1544 
1545 		if (is_virtual) {
1546 			/* This column could be dropped or no longer indexed */
1547 			if (field_no == ULINT_UNDEFINED) {
1548 				/* Mark this is no longer needed */
1549 				upd_field->field_no = REC_MAX_N_FIELDS;
1550 
1551 				ptr = trx_undo_rec_get_col_val(
1552 					ptr, &field, &len, &orig_len);
1553 				ptr = trx_undo_rec_get_col_val(
1554 					ptr, &field, &len, &orig_len);
1555 				n_skip_field++;
1556 				continue;
1557 			}
1558 
1559 			upd_field_set_v_field_no(
1560 				upd_field, field_no, index);
1561 		} else {
1562 			upd_field_set_field_no(upd_field, field_no, index, trx);
1563 		}
1564 
1565 		ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1566 
1567 		upd_field->orig_len = orig_len;
1568 
1569 		if (len == UNIV_SQL_NULL) {
1570 			dfield_set_null(&upd_field->new_val);
1571 		} else if (len < UNIV_EXTERN_STORAGE_FIELD) {
1572 			dfield_set_data(&upd_field->new_val, field, len);
1573 		} else {
1574 			len -= UNIV_EXTERN_STORAGE_FIELD;
1575 
1576 			dfield_set_data(&upd_field->new_val, field, len);
1577 			dfield_set_ext(&upd_field->new_val);
1578 		}
1579 
1580 		if (is_virtual) {
1581 			upd_field->old_v_val = static_cast<dfield_t*>(
1582 				mem_heap_alloc(
1583 					heap, sizeof *upd_field->old_v_val));
1584 			ptr = trx_undo_rec_get_col_val(
1585 				ptr, &field, &len, &orig_len);
1586 	                if (len == UNIV_SQL_NULL) {
1587 				dfield_set_null(upd_field->old_v_val);
1588 			} else if (len < UNIV_EXTERN_STORAGE_FIELD) {
1589 				dfield_set_data(
1590 					upd_field->old_v_val, field, len);
1591 			} else {
1592 				ut_ad(0);
1593 			}
1594 		}
1595 	}
1596 
1597 	/* In rare scenario, we could have skipped virtual column (as they
1598 	are dropped. We will regenerate a update vector and skip them */
1599 	if (n_skip_field > 0) {
1600 		ulint	n = 0;
1601 		ut_ad(n_skip_field <= n_fields);
1602 
1603 		upd_t*	new_update = upd_create(
1604 			n_fields + 2 - n_skip_field, heap);
1605 
1606 		for (i = 0; i < n_fields + 2; i++) {
1607 			upd_field = upd_get_nth_field(update, i);
1608 
1609 			if (upd_field->field_no == REC_MAX_N_FIELDS) {
1610 				continue;
1611 			}
1612 
1613 			upd_field_t*	new_upd_field
1614 				 = upd_get_nth_field(new_update, n);
1615 			*new_upd_field = *upd_field;
1616 			n++;
1617 		}
1618 		ut_ad(n == n_fields + 2 - n_skip_field);
1619 		*upd = new_update;
1620 	} else {
1621 		*upd = update;
1622 	}
1623 
1624 	return(const_cast<byte*>(ptr));
1625 }
1626 
1627 /*******************************************************************//**
1628 Builds a partial row from an update undo log record, for purge.
1629 It contains the columns which occur as ordering in any index of the table.
1630 Any missing columns are indicated by col->mtype == DATA_MISSING.
1631 @return pointer to remaining part of undo record */
1632 byte*
trx_undo_rec_get_partial_row(const byte * ptr,dict_index_t * index,dtuple_t ** row,ibool ignore_prefix,mem_heap_t * heap)1633 trx_undo_rec_get_partial_row(
1634 /*=========================*/
1635 	const byte*	ptr,	/*!< in: remaining part in update undo log
1636 				record of a suitable type, at the start of
1637 				the stored index columns;
1638 				NOTE that this copy of the undo log record must
1639 				be preserved as long as the partial row is
1640 				used, as we do NOT copy the data in the
1641 				record! */
1642 	dict_index_t*	index,	/*!< in: clustered index */
1643 	dtuple_t**	row,	/*!< out, own: partial row */
1644 	ibool		ignore_prefix, /*!< in: flag to indicate if we
1645 				expect blob prefixes in undo. Used
1646 				only in the assertion. */
1647 	mem_heap_t*	heap)	/*!< in: memory heap from which the memory
1648 				needed is allocated */
1649 {
1650 	const byte*	end_ptr;
1651 	bool		first_v_col = true;
1652 	bool		is_undo_log = true;
1653 
1654 	ut_ad(index);
1655 	ut_ad(ptr);
1656 	ut_ad(row);
1657 	ut_ad(heap);
1658 	ut_ad(dict_index_is_clust(index));
1659 
1660 	*row = dtuple_create_with_vcol(
1661 		heap, dict_table_get_n_cols(index->table),
1662 		dict_table_get_n_v_cols(index->table));
1663 
1664 	/* Mark all columns in the row uninitialized, so that
1665 	we can distinguish missing fields from fields that are SQL NULL. */
1666 	for (ulint i = 0; i < dict_table_get_n_cols(index->table); i++) {
1667 		dfield_get_type(dtuple_get_nth_field(*row, i))
1668 			->mtype = DATA_MISSING;
1669 	}
1670 
1671 	dtuple_init_v_fld(*row);
1672 
1673 	end_ptr = ptr + mach_read_from_2(ptr);
1674 	ptr += 2;
1675 
1676 	while (ptr != end_ptr) {
1677 		dfield_t*	dfield;
1678 		const byte*	field;
1679 		ulint		field_no;
1680 		const dict_col_t* col;
1681 		ulint		col_no;
1682 		ulint		len;
1683 		ulint		orig_len;
1684 		bool		is_virtual;
1685 
1686 		field_no = mach_read_next_compressed(&ptr);
1687 
1688 		is_virtual = (field_no >= REC_MAX_N_FIELDS);
1689 
1690 		if (is_virtual) {
1691 			ptr = trx_undo_read_v_idx(
1692 				index->table, ptr, first_v_col, &is_undo_log,
1693 				&field_no);
1694 			first_v_col = false;
1695 		}
1696 
1697 		ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
1698 
1699 		/* This column could be dropped or no longer indexed */
1700 		if (field_no == ULINT_UNDEFINED) {
1701 			ut_ad(is_virtual);
1702 			continue;
1703 		}
1704 
1705 		if (is_virtual) {
1706 			dict_v_col_t* vcol = dict_table_get_nth_v_col(
1707 						index->table, field_no);
1708 			col = &vcol->m_col;
1709 			col_no = dict_col_get_no(col);
1710 			dfield = dtuple_get_nth_v_field(*row, vcol->v_pos);
1711 			dict_col_copy_type(
1712 				&vcol->m_col,
1713 				dfield_get_type(dfield));
1714 		} else {
1715 			col = dict_index_get_nth_col(index, field_no);
1716 			col_no = dict_col_get_no(col);
1717 			dfield = dtuple_get_nth_field(*row, col_no);
1718 			dict_col_copy_type(
1719 				dict_table_get_nth_col(index->table, col_no),
1720 				dfield_get_type(dfield));
1721 		}
1722 
1723 		dfield_set_data(dfield, field, len);
1724 
1725 		if (len != UNIV_SQL_NULL
1726 		    && len >= UNIV_EXTERN_STORAGE_FIELD) {
1727 			spatial_status_t	spatial_status;
1728 
1729 			/* Decode spatial status. */
1730 			spatial_status = static_cast<spatial_status_t>(
1731 				(len & SPATIAL_STATUS_MASK)
1732 				>> SPATIAL_STATUS_SHIFT);
1733 			len &= ~SPATIAL_STATUS_MASK;
1734 
1735 			/* Keep compatible with 5.7.9 format. */
1736 			if (spatial_status == SPATIAL_UNKNOWN) {
1737 				spatial_status =
1738 					dict_col_get_spatial_status(col);
1739 			}
1740 
1741 			switch (spatial_status) {
1742 			case SPATIAL_ONLY:
1743 				ut_ad(len - UNIV_EXTERN_STORAGE_FIELD
1744 				      == DATA_MBR_LEN);
1745 				dfield_set_len(
1746 					dfield,
1747 					len - UNIV_EXTERN_STORAGE_FIELD);
1748 				break;
1749 
1750 			case SPATIAL_MIXED:
1751 				dfield_set_len(
1752 					dfield,
1753 					len - UNIV_EXTERN_STORAGE_FIELD
1754 					- DATA_MBR_LEN);
1755 				break;
1756 
1757 			case SPATIAL_NONE:
1758 				dfield_set_len(
1759 					dfield,
1760 					len - UNIV_EXTERN_STORAGE_FIELD);
1761 				break;
1762 
1763 			case SPATIAL_UNKNOWN:
1764 				ut_ad(0);
1765 				break;
1766 			}
1767 
1768 			dfield_set_ext(dfield);
1769 			dfield_set_spatial_status(dfield, spatial_status);
1770 
1771 			/* If the prefix of this column is indexed,
1772 			ensure that enough prefix is stored in the
1773 			undo log record. */
1774 			if (!ignore_prefix && col->ord_part
1775 			    && spatial_status != SPATIAL_ONLY) {
1776 				ut_a(dfield_get_len(dfield)
1777 				     >= BTR_EXTERN_FIELD_REF_SIZE);
1778 				ut_a(dict_table_get_format(index->table)
1779 				     >= UNIV_FORMAT_B
1780 				     || dfield_get_len(dfield)
1781 				     >= REC_ANTELOPE_MAX_INDEX_COL_LEN
1782 				     + BTR_EXTERN_FIELD_REF_SIZE);
1783 			}
1784 		}
1785 	}
1786 
1787 	return(const_cast<byte*>(ptr));
1788 }
1789 #endif /* !UNIV_HOTBACKUP */
1790 
1791 /***********************************************************************//**
1792 Erases the unused undo log page end.
1793 @return TRUE if the page contained something, FALSE if it was empty */
1794 static MY_ATTRIBUTE((nonnull))
1795 ibool
trx_undo_erase_page_end(page_t * undo_page,mtr_t * mtr)1796 trx_undo_erase_page_end(
1797 /*====================*/
1798 	page_t*	undo_page,	/*!< in/out: undo page whose end to erase */
1799 	mtr_t*	mtr)		/*!< in/out: mini-transaction */
1800 {
1801 	ulint	first_free;
1802 
1803 	first_free = mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR
1804 				      + TRX_UNDO_PAGE_FREE);
1805 	memset(undo_page + first_free, 0xff,
1806 	       (UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) - first_free);
1807 
1808 	mlog_write_initial_log_record(undo_page, MLOG_UNDO_ERASE_END, mtr);
1809 	return(first_free != TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
1810 }
1811 
1812 /***********************************************************//**
1813 Parses a redo log record of erasing of an undo page end.
1814 @return end of log record or NULL */
1815 byte*
trx_undo_parse_erase_page_end(byte * ptr,byte * end_ptr MY_ATTRIBUTE ((unused)),page_t * page,mtr_t * mtr)1816 trx_undo_parse_erase_page_end(
1817 /*==========================*/
1818 	byte*	ptr,	/*!< in: buffer */
1819 	byte*	end_ptr MY_ATTRIBUTE((unused)), /*!< in: buffer end */
1820 	page_t*	page,	/*!< in: page or NULL */
1821 	mtr_t*	mtr)	/*!< in: mtr or NULL */
1822 {
1823 	ut_ad(ptr != NULL);
1824 	ut_ad(end_ptr != NULL);
1825 
1826 	if (page == NULL) {
1827 
1828 		return(ptr);
1829 	}
1830 
1831 	trx_undo_erase_page_end(page, mtr);
1832 
1833 	return(ptr);
1834 }
1835 
1836 #ifndef UNIV_HOTBACKUP
1837 /***********************************************************************//**
1838 Writes information to an undo log about an insert, update, or a delete marking
1839 of a clustered index record. This information is used in a rollback of the
1840 transaction and in consistent reads that must look to the history of this
1841 transaction.
1842 @return DB_SUCCESS or error code */
1843 dberr_t
trx_undo_report_row_operation(ulint flags,ulint op_type,que_thr_t * thr,dict_index_t * index,const dtuple_t * clust_entry,const upd_t * update,ulint cmpl_info,const rec_t * rec,const ulint * offsets,roll_ptr_t * roll_ptr)1844 trx_undo_report_row_operation(
1845 /*==========================*/
1846 	ulint		flags,		/*!< in: if BTR_NO_UNDO_LOG_FLAG bit is
1847 					set, does nothing */
1848 	ulint		op_type,	/*!< in: TRX_UNDO_INSERT_OP or
1849 					TRX_UNDO_MODIFY_OP */
1850 	que_thr_t*	thr,		/*!< in: query thread */
1851 	dict_index_t*	index,		/*!< in: clustered index */
1852 	const dtuple_t*	clust_entry,	/*!< in: in the case of an insert,
1853 					index entry to insert into the
1854 					clustered index, otherwise NULL */
1855 	const upd_t*	update,		/*!< in: in the case of an update,
1856 					the update vector, otherwise NULL */
1857 	ulint		cmpl_info,	/*!< in: compiler info on secondary
1858 					index updates */
1859 	const rec_t*	rec,		/*!< in: in case of an update or delete
1860 					marking, the record in the clustered
1861 					index, otherwise NULL */
1862 	const ulint*	offsets,	/*!< in: rec_get_offsets(rec) */
1863 	roll_ptr_t*	roll_ptr)	/*!< out: rollback pointer to the
1864 					inserted undo log record,
1865 					0 if BTR_NO_UNDO_LOG
1866 					flag was specified */
1867 {
1868 	trx_t*		trx;
1869 	trx_undo_t*	undo;
1870 	ulint		page_no;
1871 	buf_block_t*	undo_block;
1872 	trx_undo_ptr_t*	undo_ptr;
1873 	mtr_t		mtr;
1874 	dberr_t		err		= DB_SUCCESS;
1875 #ifdef UNIV_DEBUG
1876 	int		loop_count	= 0;
1877 #endif /* UNIV_DEBUG */
1878 
1879 	ut_a(dict_index_is_clust(index));
1880 	ut_ad(!rec || rec_offs_validate(rec, index, offsets));
1881 
1882 	if (flags & BTR_NO_UNDO_LOG_FLAG) {
1883 
1884 		*roll_ptr = 0;
1885 
1886 		return(DB_SUCCESS);
1887 	}
1888 
1889 	ut_ad(thr);
1890 	ut_ad(!srv_read_only_mode);
1891 	ut_ad((op_type != TRX_UNDO_INSERT_OP)
1892 	      || (clust_entry && !update && !rec));
1893 
1894 	trx = thr_get_trx(thr);
1895 
1896 	bool	is_temp_table = dict_table_is_temporary(index->table);
1897 
1898 	/* Temporary tables do not go into INFORMATION_SCHEMA.TABLES,
1899 	so do not bother adding it to the list of modified tables by
1900 	the transaction - this list is only used for maintaining
1901 	INFORMATION_SCHEMA.TABLES.UPDATE_TIME. */
1902 	if (!is_temp_table) {
1903 		trx->mod_tables.insert(index->table);
1904 	}
1905 
1906 	/* If trx is read-only then only temp-tables can be written.
1907 	If trx is read-write and involves temp-table only then we
1908 	assign temporary rseg. */
1909 	if (trx->read_only || is_temp_table) {
1910 
1911 		ut_ad(!srv_read_only_mode || is_temp_table);
1912 
1913 		/* MySQL should block writes to non-temporary tables. */
1914 		ut_a(is_temp_table);
1915 
1916 		if (trx->rsegs.m_noredo.rseg == 0) {
1917 			trx_assign_rseg(trx);
1918 		}
1919 	}
1920 
1921 	/* If object is temporary, disable REDO logging that is done to track
1922 	changes done to UNDO logs. This is feasible given that temporary tables
1923 	are not restored on restart. */
1924 	mtr_start(&mtr);
1925 	dict_disable_redo_if_temporary(index->table, &mtr);
1926 	mutex_enter(&trx->undo_mutex);
1927 
1928 	/* If object is temp-table then select noredo rseg as changes
1929 	to undo logs don't need REDO logging given that they are not
1930 	restored on restart as corresponding object doesn't exist on restart.*/
1931 	undo_ptr = is_temp_table ? &trx->rsegs.m_noredo : &trx->rsegs.m_redo;
1932 
1933 	switch (op_type) {
1934 	case TRX_UNDO_INSERT_OP:
1935 		undo = undo_ptr->insert_undo;
1936 
1937 		if (undo == NULL) {
1938 
1939 			err = trx_undo_assign_undo(
1940 				trx, undo_ptr, TRX_UNDO_INSERT);
1941 			undo = undo_ptr->insert_undo;
1942 
1943 			if (undo == NULL) {
1944 				/* Did not succeed */
1945 				ut_ad(err != DB_SUCCESS);
1946 				goto err_exit;
1947 			}
1948 
1949 			ut_ad(err == DB_SUCCESS);
1950 		}
1951 		break;
1952 	default:
1953 		ut_ad(op_type == TRX_UNDO_MODIFY_OP);
1954 
1955 		undo = undo_ptr->update_undo;
1956 
1957 		if (undo == NULL) {
1958 			err = trx_undo_assign_undo(
1959 				trx, undo_ptr, TRX_UNDO_UPDATE);
1960 			undo = undo_ptr->update_undo;
1961 
1962 			if (undo == NULL) {
1963 				/* Did not succeed */
1964 				ut_ad(err != DB_SUCCESS);
1965 				goto err_exit;
1966 			}
1967 		}
1968 
1969 		ut_ad(err == DB_SUCCESS);
1970 	}
1971 
1972 	page_no = undo->last_page_no;
1973 
1974 	undo_block = buf_page_get_gen(
1975 		page_id_t(undo->space, page_no), undo->page_size, RW_X_LATCH,
1976 		undo->guess_block, BUF_GET, __FILE__, __LINE__,&mtr);
1977 
1978 	buf_block_dbg_add_level(undo_block, SYNC_TRX_UNDO_PAGE);
1979 
1980 	do {
1981 		page_t*		undo_page;
1982 		ulint		offset;
1983 
1984 		undo_page = buf_block_get_frame(undo_block);
1985 		ut_ad(page_no == undo_block->page.id.page_no());
1986 
1987 		switch (op_type) {
1988 		case TRX_UNDO_INSERT_OP:
1989 			offset = trx_undo_page_report_insert(
1990 				undo_page, trx, index, clust_entry, &mtr);
1991 			break;
1992 		default:
1993 			ut_ad(op_type == TRX_UNDO_MODIFY_OP);
1994 			offset = trx_undo_page_report_modify(
1995 				undo_page, trx, index, rec, offsets, update,
1996 				cmpl_info, clust_entry, &mtr);
1997 		}
1998 
1999 		if (UNIV_UNLIKELY(offset == 0)) {
2000 			/* The record did not fit on the page. We erase the
2001 			end segment of the undo log page and write a log
2002 			record of it: this is to ensure that in the debug
2003 			version the replicate page constructed using the log
2004 			records stays identical to the original page */
2005 
2006 			if (!trx_undo_erase_page_end(undo_page, &mtr)) {
2007 				/* The record did not fit on an empty
2008 				undo page. Discard the freshly allocated
2009 				page and return an error. */
2010 
2011 				/* When we remove a page from an undo
2012 				log, this is analogous to a
2013 				pessimistic insert in a B-tree, and we
2014 				must reserve the counterpart of the
2015 				tree latch, which is the rseg
2016 				mutex. We must commit the mini-transaction
2017 				first, because it may be holding lower-level
2018 				latches, such as SYNC_FSP and SYNC_FSP_PAGE. */
2019 
2020 				mtr_commit(&mtr);
2021 				mtr_start(&mtr);
2022 				dict_disable_redo_if_temporary(
2023 					index->table, &mtr);
2024 
2025 				mutex_enter(&undo_ptr->rseg->mutex);
2026 				trx_undo_free_last_page(trx, undo, &mtr);
2027 				mutex_exit(&undo_ptr->rseg->mutex);
2028 
2029 				err = DB_UNDO_RECORD_TOO_BIG;
2030 				goto err_exit;
2031 			}
2032 
2033 			mtr_commit(&mtr);
2034 		} else {
2035 			/* Success */
2036 			undo->guess_block = undo_block;
2037 			mtr_commit(&mtr);
2038 
2039 			undo->empty = FALSE;
2040 			undo->top_page_no = page_no;
2041 			undo->top_offset  = offset;
2042 			undo->top_undo_no = trx->undo_no;
2043 
2044 			trx->undo_no++;
2045 			trx->undo_rseg_space = undo_ptr->rseg->space;
2046 
2047 			mutex_exit(&trx->undo_mutex);
2048 
2049 			*roll_ptr = trx_undo_build_roll_ptr(
2050 				op_type == TRX_UNDO_INSERT_OP,
2051 				undo_ptr->rseg->id, page_no, offset);
2052 			return(DB_SUCCESS);
2053 		}
2054 
2055 		ut_ad(page_no == undo->last_page_no);
2056 
2057 		/* We have to extend the undo log by one page */
2058 
2059 		ut_ad(++loop_count < 2);
2060 		mtr_start(&mtr);
2061 		dict_disable_redo_if_temporary(index->table, &mtr);
2062 
2063 		/* When we add a page to an undo log, this is analogous to
2064 		a pessimistic insert in a B-tree, and we must reserve the
2065 		counterpart of the tree latch, which is the rseg mutex. */
2066 
2067 		mutex_enter(&undo_ptr->rseg->mutex);
2068 		undo_block = trx_undo_add_page(trx, undo, undo_ptr, &mtr);
2069 		mutex_exit(&undo_ptr->rseg->mutex);
2070 
2071 		page_no = undo->last_page_no;
2072 
2073 		DBUG_EXECUTE_IF("ib_err_ins_undo_page_add_failure",
2074 				undo_block = NULL;);
2075 	} while (undo_block != NULL);
2076 
2077 	ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
2078 		ER_INNODB_UNDO_LOG_FULL,
2079 		"No more space left over in %s tablespace for allocating UNDO"
2080 		" log pages. Please add new data file to the tablespace or"
2081 		" check if filesystem is full or enable auto-extension for"
2082 		" the tablespace",
2083 		((undo->space == srv_sys_space.space_id())
2084 		? "system" :
2085 		  ((fsp_is_system_temporary(undo->space))
2086 		   ? "temporary" : "undo")));
2087 
2088 	/* Did not succeed: out of space */
2089 	err = DB_OUT_OF_FILE_SPACE;
2090 
2091 err_exit:
2092 	mutex_exit(&trx->undo_mutex);
2093 	mtr_commit(&mtr);
2094 	return(err);
2095 }
2096 
2097 /*============== BUILDING PREVIOUS VERSION OF A RECORD ===============*/
2098 
2099 /******************************************************************//**
2100 Copies an undo record to heap. This function can be called if we know that
2101 the undo log record exists.
2102 @return own: copy of the record */
2103 trx_undo_rec_t*
trx_undo_get_undo_rec_low(roll_ptr_t roll_ptr,mem_heap_t * heap,bool is_redo_rseg)2104 trx_undo_get_undo_rec_low(
2105 /*======================*/
2106 	roll_ptr_t	roll_ptr,	/*!< in: roll pointer to record */
2107 	mem_heap_t*	heap,		/*!< in: memory heap where copied */
2108 	bool		is_redo_rseg)	/*!< in: true if redo rseg. */
2109 {
2110 	trx_undo_rec_t*	undo_rec;
2111 	ulint		rseg_id;
2112 	ulint		page_no;
2113 	ulint		offset;
2114 	const page_t*	undo_page;
2115 	trx_rseg_t*	rseg;
2116 	ibool		is_insert;
2117 	mtr_t		mtr;
2118 
2119 	trx_undo_decode_roll_ptr(roll_ptr, &is_insert, &rseg_id, &page_no,
2120 				 &offset);
2121 	rseg = trx_rseg_get_on_id(rseg_id, is_redo_rseg);
2122 
2123 	mtr_start(&mtr);
2124 
2125 	undo_page = trx_undo_page_get_s_latched(
2126 		page_id_t(rseg->space, page_no), rseg->page_size,
2127 		&mtr);
2128 
2129 	undo_rec = trx_undo_rec_copy(undo_page, offset, heap);
2130 
2131 	mtr_commit(&mtr);
2132 
2133 	return(undo_rec);
2134 }
2135 
2136 /******************************************************************//**
2137 Copies an undo record to heap.
2138 @param[in]	roll_ptr	roll pointer to record
2139 @param[in]	trx_id		id of the trx that generated
2140 				the roll pointer: it points to an
2141 				undo log of this transaction
2142 @param[in]	heap		memory heap where copied
2143 @param[in]	is_redo_rseg	true if redo rseg.
2144 @param[in]	name		table name
2145 @param[out]	undo_rec	own: copy of the record
2146 @retval true if the undo log has been
2147 truncated and we cannot fetch the old version
2148 @retval false if the undo log record is available
2149 NOTE: the caller must have latches on the clustered index page. */
2150 static MY_ATTRIBUTE((warn_unused_result))
2151 bool
trx_undo_get_undo_rec(roll_ptr_t roll_ptr,trx_id_t trx_id,mem_heap_t * heap,bool is_redo_rseg,const table_name_t & name,trx_undo_rec_t ** undo_rec)2152 trx_undo_get_undo_rec(
2153 /*==================*/
2154 	roll_ptr_t		roll_ptr,
2155 	trx_id_t		trx_id,
2156 	mem_heap_t*		heap,
2157 	bool			is_redo_rseg,
2158 	const table_name_t&	name,
2159 	trx_undo_rec_t**	undo_rec)
2160 {
2161 	bool		missing_history;
2162 
2163 	rw_lock_s_lock(&purge_sys->latch);
2164 
2165 	missing_history = purge_sys->view.changes_visible(trx_id, name);
2166 	if (!missing_history) {
2167 		*undo_rec = trx_undo_get_undo_rec_low(
2168 			roll_ptr, heap, is_redo_rseg);
2169 	}
2170 
2171 	rw_lock_s_unlock(&purge_sys->latch);
2172 
2173 	return(missing_history);
2174 }
2175 
2176 #ifdef UNIV_DEBUG
2177 #define ATTRIB_USED_ONLY_IN_DEBUG
2178 #else /* UNIV_DEBUG */
2179 #define ATTRIB_USED_ONLY_IN_DEBUG	MY_ATTRIBUTE((unused))
2180 #endif /* UNIV_DEBUG */
2181 
2182 /*******************************************************************//**
2183 Build a previous version of a clustered index record. The caller must
2184 hold a latch on the index page of the clustered index record.
2185 @retval true if previous version was built, or if it was an insert
2186 or the table has been rebuilt
2187 @retval false if the previous version is earlier than purge_view,
2188 or being purged, which means that it may have been removed */
2189 bool
trx_undo_prev_version_build(const rec_t * index_rec ATTRIB_USED_ONLY_IN_DEBUG,mtr_t * index_mtr ATTRIB_USED_ONLY_IN_DEBUG,const rec_t * rec,dict_index_t * index,ulint * offsets,mem_heap_t * heap,rec_t ** old_vers,mem_heap_t * v_heap,const dtuple_t ** vrow,ulint v_status)2190 trx_undo_prev_version_build(
2191 /*========================*/
2192 	const rec_t*	index_rec ATTRIB_USED_ONLY_IN_DEBUG,
2193 				/*!< in: clustered index record in the
2194 				index tree */
2195 	mtr_t*		index_mtr ATTRIB_USED_ONLY_IN_DEBUG,
2196 				/*!< in: mtr which contains the latch to
2197 				index_rec page and purge_view */
2198 	const rec_t*	rec,	/*!< in: version of a clustered index record */
2199 	dict_index_t*	index,	/*!< in: clustered index */
2200 	ulint*		offsets,/*!< in/out: rec_get_offsets(rec, index) */
2201 	mem_heap_t*	heap,	/*!< in: memory heap from which the memory
2202 				needed is allocated */
2203 	rec_t**		old_vers,/*!< out, own: previous version, or NULL if
2204 				rec is the first inserted version, or if
2205 				history data has been deleted (an error),
2206 				or if the purge COULD have removed the version
2207 				though it has not yet done so */
2208 	mem_heap_t*	v_heap,	/* !< in: memory heap used to create vrow
2209 				dtuple if it is not yet created. This heap
2210 				diffs from "heap" above in that it could be
2211 				prebuilt->old_vers_heap for selection */
2212 	const dtuple_t**vrow,	/*!< out: virtual column info, if any */
2213 	ulint		v_status)
2214 				/*!< in: status determine if it is going
2215 				into this function by purge thread or not.
2216 				And if we read "after image" of undo log */
2217 
2218 
2219 {
2220 	trx_undo_rec_t*	undo_rec	= NULL;
2221 	dtuple_t*	entry;
2222 	trx_id_t	rec_trx_id;
2223 	ulint		type;
2224 	undo_no_t	undo_no;
2225 	table_id_t	table_id;
2226 	trx_id_t	trx_id;
2227 	roll_ptr_t	roll_ptr;
2228 	upd_t*		update;
2229 	byte*		ptr;
2230 	ulint		info_bits;
2231 	ulint		cmpl_info;
2232 	bool		dummy_extern;
2233 	byte*		buf;
2234 
2235 	ut_ad(!rw_lock_own(&purge_sys->latch, RW_LOCK_S));
2236 	ut_ad(mtr_memo_contains_page(index_mtr, index_rec, MTR_MEMO_PAGE_S_FIX)
2237 	      || mtr_memo_contains_page(index_mtr, index_rec,
2238 					MTR_MEMO_PAGE_X_FIX));
2239 	ut_ad(rec_offs_validate(rec, index, offsets));
2240 	ut_a(dict_index_is_clust(index));
2241 
2242 	roll_ptr = row_get_rec_roll_ptr(rec, index, offsets);
2243 
2244 	*old_vers = NULL;
2245 
2246 	if (trx_undo_roll_ptr_is_insert(roll_ptr)) {
2247 		/* The record rec is the first inserted version */
2248 		return(true);
2249 	}
2250 
2251 	rec_trx_id = row_get_rec_trx_id(rec, index, offsets);
2252 
2253 	/* REDO rollback segment are used only for non-temporary objects.
2254 	For temporary objects NON-REDO rollback segments are used. */
2255 	bool is_redo_rseg =
2256 		dict_table_is_temporary(index->table) ? false : true;
2257 	if (trx_undo_get_undo_rec(
2258 		roll_ptr, rec_trx_id, heap, is_redo_rseg,
2259 		index->table->name, &undo_rec)) {
2260 		if (v_status & TRX_UNDO_PREV_IN_PURGE) {
2261 			/* We are fetching the record being purged */
2262 			undo_rec = trx_undo_get_undo_rec_low(
2263 				roll_ptr, heap, is_redo_rseg);
2264 		} else {
2265 			/* The undo record may already have been purged,
2266 			during purge or semi-consistent read. */
2267 			return(false);
2268 		}
2269 	}
2270 
2271 	ptr = trx_undo_rec_get_pars(undo_rec, &type, &cmpl_info,
2272 				    &dummy_extern, &undo_no, &table_id);
2273 
2274 	if (table_id != index->table->id) {
2275 		/* The table should have been rebuilt, but purge has
2276 		not yet removed the undo log records for the
2277 		now-dropped old table (table_id). */
2278 		return(true);
2279 	}
2280 
2281 	ptr = trx_undo_update_rec_get_sys_cols(ptr, &trx_id, &roll_ptr,
2282 					       &info_bits);
2283 
2284 	/* (a) If a clustered index record version is such that the
2285 	trx id stamp in it is bigger than purge_sys->view, then the
2286 	BLOBs in that version are known to exist (the purge has not
2287 	progressed that far);
2288 
2289 	(b) if the version is the first version such that trx id in it
2290 	is less than purge_sys->view, and it is not delete-marked,
2291 	then the BLOBs in that version are known to exist (the purge
2292 	cannot have purged the BLOBs referenced by that version
2293 	yet).
2294 
2295 	This function does not fetch any BLOBs.  The callers might, by
2296 	possibly invoking row_ext_create() via row_build().  However,
2297 	they should have all needed information in the *old_vers
2298 	returned by this function.  This is because *old_vers is based
2299 	on the transaction undo log records.  The function
2300 	trx_undo_page_fetch_ext() will write BLOB prefixes to the
2301 	transaction undo log that are at least as long as the longest
2302 	possible column prefix in a secondary index.  Thus, secondary
2303 	index entries for *old_vers can be constructed without
2304 	dereferencing any BLOB pointers. */
2305 
2306 	ptr = trx_undo_rec_skip_row_ref(ptr, index);
2307 
2308 	ptr = trx_undo_update_rec_get_update(ptr, index, type, trx_id,
2309 					     roll_ptr, info_bits,
2310 					     NULL, heap, &update);
2311 	ut_a(ptr);
2312 
2313 	if (row_upd_changes_field_size_or_external(index, offsets, update)) {
2314 		ulint	n_ext;
2315 
2316 		/* We should confirm the existence of disowned external data,
2317 		if the previous version record is delete marked. If the trx_id
2318 		of the previous record is seen by purge view, we should treat
2319 		it as missing history, because the disowned external data
2320 		might be purged already.
2321 
2322 		The inherited external data (BLOBs) can be freed (purged)
2323 		after trx_id was committed, provided that no view was started
2324 		before trx_id. If the purge view can see the committed
2325 		delete-marked record by trx_id, no transactions need to access
2326 		the BLOB. */
2327 
2328 		/* the row_upd_changes_disowned_external(update) call could be
2329 		omitted, but the synchronization on purge_sys->latch is likely
2330 		more expensive. */
2331 
2332 		if ((update->info_bits & REC_INFO_DELETED_FLAG)
2333 		    && row_upd_changes_disowned_external(update)) {
2334 			bool	missing_extern;
2335 
2336 			rw_lock_s_lock(&purge_sys->latch);
2337 
2338 			missing_extern = purge_sys->view.changes_visible(
2339 				trx_id,	index->table->name);
2340 
2341 			rw_lock_s_unlock(&purge_sys->latch);
2342 
2343 			if (missing_extern) {
2344 				/* treat as a fresh insert, not to
2345 				cause assertion error at the caller. */
2346 				return(true);
2347 			}
2348 		}
2349 
2350 		/* We have to set the appropriate extern storage bits in the
2351 		old version of the record: the extern bits in rec for those
2352 		fields that update does NOT update, as well as the bits for
2353 		those fields that update updates to become externally stored
2354 		fields. Store the info: */
2355 
2356 		entry = row_rec_to_index_entry(
2357 			rec, index, offsets, &n_ext, heap);
2358 		/* The page containing the clustered index record
2359 		corresponding to entry is latched in mtr.  Thus the
2360 		following call is safe. */
2361 		row_upd_index_replace_new_col_vals(entry, index, update, heap);
2362 
2363 		/* Get number of externally stored columns in updated record */
2364 		n_ext = entry->get_n_ext();
2365 
2366 		buf = static_cast<byte*>(mem_heap_alloc(
2367 			heap, rec_get_converted_size(index, entry, n_ext)));
2368 
2369 		*old_vers = rec_convert_dtuple_to_rec(buf, index,
2370 						      entry, n_ext);
2371 	} else {
2372 		buf = static_cast<byte*>(mem_heap_alloc(
2373 			heap, rec_offs_size(offsets)));
2374 
2375 		*old_vers = rec_copy(buf, rec, offsets);
2376 		rec_offs_make_valid(*old_vers, index, offsets);
2377 		row_upd_rec_in_place(*old_vers, index, offsets, update, NULL);
2378 	}
2379 
2380 	/* Set the old value (which is the after image of an update) in the
2381 	update vector to dtuple vrow */
2382 	if (v_status & TRX_UNDO_GET_OLD_V_VALUE) {
2383 		row_upd_replace_vcol((dtuple_t*)*vrow, index->table, update,
2384 				     false, NULL, NULL);
2385 	}
2386 
2387 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2388 	ut_a(!rec_offs_any_null_extern(
2389 		*old_vers, rec_get_offsets(
2390 			*old_vers, index, NULL, ULINT_UNDEFINED, &heap)));
2391 #endif // defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
2392 
2393 	if (vrow && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
2394 		if (!(*vrow)) {
2395 			*vrow = dtuple_create_with_vcol(
2396 				v_heap ? v_heap : heap,
2397 				dict_table_get_n_cols(index->table),
2398 				dict_table_get_n_v_cols(index->table));
2399 			dtuple_init_v_fld(*vrow);
2400 		}
2401 
2402 		ut_ad(index->table->n_v_cols);
2403 		trx_undo_read_v_cols(index->table, ptr, *vrow,
2404 				     v_status & TRX_UNDO_PREV_IN_PURGE, NULL);
2405 	}
2406 
2407 	return(true);
2408 }
2409 
2410 /** Read virtual column value from undo log
2411 @param[in]	table		the table
2412 @param[in]	ptr		undo log pointer
2413 @param[in,out]	row		the row struct to fill
2414 @param[in]	in_purge	called by purge thread
2415 @param[in]	col_map		online rebuild column map */
2416 void
trx_undo_read_v_cols(const dict_table_t * table,const byte * ptr,const dtuple_t * row,bool in_purge,const ulint * col_map)2417 trx_undo_read_v_cols(
2418 	const dict_table_t*	table,
2419 	const byte*		ptr,
2420 	const dtuple_t*		row,
2421 	bool			in_purge,
2422 	const ulint*		col_map)
2423 {
2424 	const byte*     end_ptr;
2425 	bool		first_v_col = true;
2426 	bool		is_undo_log = true;
2427 
2428 	end_ptr = ptr + mach_read_from_2(ptr);
2429 	ptr += 2;
2430 	while (ptr < end_ptr) {
2431 		dfield_t*	dfield;
2432 		const byte*	field;
2433 		ulint		field_no;
2434 		ulint		len;
2435 		ulint		orig_len;
2436 		bool		is_virtual;
2437 
2438 		field_no = mach_read_next_compressed(
2439 				const_cast<const byte**>(&ptr));
2440 
2441 		is_virtual = (field_no >= REC_MAX_N_FIELDS);
2442 
2443 		if (is_virtual) {
2444 			ptr = trx_undo_read_v_idx(
2445 				table, ptr, first_v_col, &is_undo_log,
2446 				&field_no);
2447 			first_v_col = false;
2448 		}
2449 
2450 		ptr = trx_undo_rec_get_col_val(
2451 			ptr, &field, &len, &orig_len);
2452 
2453 		/* The virtual column is no longer indexed or does not exist.
2454 		This needs to put after trx_undo_rec_get_col_val() so the
2455 		undo ptr advances */
2456 		if (field_no == ULINT_UNDEFINED) {
2457 			ut_ad(is_virtual);
2458 			continue;
2459 		}
2460 
2461 		if (is_virtual) {
2462 			ulint		col_no;
2463 			dict_v_col_t*	vcol = dict_table_get_nth_v_col(
2464 				table, field_no);
2465 
2466 			if (!col_map) {
2467 				col_no = vcol->v_pos;
2468 			} else {
2469 				col_no = col_map[vcol->v_pos];
2470 			}
2471 
2472 			if (col_no == ULINT_UNDEFINED) {
2473 				continue;
2474 			}
2475 
2476 			dfield = dtuple_get_nth_v_field(row, col_no);
2477 
2478 			if (!in_purge
2479 			    || dfield_get_type(dfield)->mtype == DATA_MISSING) {
2480 				dict_col_copy_type(
2481 					&vcol->m_col,
2482 					dfield_get_type(dfield));
2483 				dfield_set_data(dfield, field, len);
2484 			}
2485 		}
2486 	}
2487 
2488 	ut_ad(ptr == end_ptr);
2489 }
2490 #endif /* !UNIV_HOTBACKUP */
2491