1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2017, 2021, MariaDB Corporation.
6 
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10 
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 
19 *****************************************************************************/
20 
21 /**************************************************//**
22 @file page/page0page.cc
23 Index page routines
24 
25 Created 2/2/1994 Heikki Tuuri
26 *******************************************************/
27 
28 #include "page0page.h"
29 #include "page0cur.h"
30 #include "page0zip.h"
31 #include "buf0buf.h"
32 #include "buf0checksum.h"
33 #include "btr0btr.h"
34 #include "srv0srv.h"
35 #include "lock0lock.h"
36 #include "fut0lst.h"
37 #include "btr0sea.h"
38 #include "trx0sys.h"
39 #include <algorithm>
40 
41 /*			THE INDEX PAGE
42 			==============
43 
44 The index page consists of a page header which contains the page's
45 id and other information. On top of it are the index records
46 in a heap linked into a one way linear list according to alphabetic order.
47 
48 Just below page end is an array of pointers which we call page directory,
49 to about every sixth record in the list. The pointers are placed in
50 the directory in the alphabetical order of the records pointed to,
51 enabling us to make binary search using the array. Each slot n:o I
52 in the directory points to a record, where a 4-bit field contains a count
53 of those records which are in the linear list between pointer I and
54 the pointer I - 1 in the directory, including the record
55 pointed to by pointer I and not including the record pointed to by I - 1.
56 We say that the record pointed to by slot I, or that slot I, owns
57 these records. The count is always kept in the range 4 to 8, with
58 the exception that it is 1 for the first slot, and 1--8 for the second slot.
59 
60 An essentially binary search can be performed in the list of index
61 records, like we could do if we had pointer to every record in the
62 page directory. The data structure is, however, more efficient when
63 we are doing inserts, because most inserts are just pushed on a heap.
64 Only every 8th insert requires block move in the directory pointer
65 table, which itself is quite small. A record is deleted from the page
66 by just taking it off the linear list and updating the number of owned
67 records-field of the record which owns it, and updating the page directory,
68 if necessary. A special case is the one when the record owns itself.
69 Because the overhead of inserts is so small, we may also increase the
70 page size from the projected default of 8 kB to 64 kB without too
71 much loss of efficiency in inserts. Bigger page becomes actual
72 when the disk transfer rate compared to seek and latency time rises.
73 On the present system, the page size is set so that the page transfer
74 time (3 ms) is 20 % of the disk random access time (15 ms).
75 
76 When the page is split, merged, or becomes full but contains deleted
77 records, we have to reorganize the page.
78 
79 Assuming a page size of 8 kB, a typical index page of a secondary
80 index contains 300 index entries, and the size of the page directory
81 is 50 x 4 bytes = 200 bytes. */
82 
83 /***************************************************************//**
84 Looks for the directory slot which owns the given record.
85 @return the directory slot number */
86 ulint
page_dir_find_owner_slot(const rec_t * rec)87 page_dir_find_owner_slot(
88 /*=====================*/
89 	const rec_t*	rec)	/*!< in: the physical record */
90 {
91 	ut_ad(page_rec_check(rec));
92 
93 	const page_t* page = page_align(rec);
94 	const page_dir_slot_t* first_slot = page_dir_get_nth_slot(page, 0);
95 	const page_dir_slot_t* slot = page_dir_get_nth_slot(
96 		page, ulint(page_dir_get_n_slots(page)) - 1);
97 	const rec_t*		r = rec;
98 
99 	if (page_is_comp(page)) {
100 		while (rec_get_n_owned_new(r) == 0) {
101 			r = rec_get_next_ptr_const(r, TRUE);
102 			ut_ad(r >= page + PAGE_NEW_SUPREMUM);
103 			ut_ad(r < page + (srv_page_size - PAGE_DIR));
104 		}
105 	} else {
106 		while (rec_get_n_owned_old(r) == 0) {
107 			r = rec_get_next_ptr_const(r, FALSE);
108 			ut_ad(r >= page + PAGE_OLD_SUPREMUM);
109 			ut_ad(r < page + (srv_page_size - PAGE_DIR));
110 		}
111 	}
112 
113 	uint16 rec_offs_bytes = mach_encode_2(ulint(r - page));
114 
115 	while (UNIV_LIKELY(*(uint16*) slot != rec_offs_bytes)) {
116 
117 		if (UNIV_UNLIKELY(slot == first_slot)) {
118 			ib::error() << "Probable data corruption on page "
119 				<< page_get_page_no(page)
120 				<< ". Original record on that page;";
121 
122 			if (page_is_comp(page)) {
123 				fputs("(compact record)", stderr);
124 			} else {
125 				rec_print_old(stderr, rec);
126 			}
127 
128 			ib::error() << "Cannot find the dir slot for this"
129 				" record on that page;";
130 
131 			if (page_is_comp(page)) {
132 				fputs("(compact record)", stderr);
133 			} else {
134 				rec_print_old(stderr, page
135 					      + mach_decode_2(rec_offs_bytes));
136 			}
137 
138 			ut_error;
139 		}
140 
141 		slot += PAGE_DIR_SLOT_SIZE;
142 	}
143 
144 	return(((ulint) (first_slot - slot)) / PAGE_DIR_SLOT_SIZE);
145 }
146 
147 /**************************************************************//**
148 Used to check the consistency of a directory slot.
149 @return TRUE if succeed */
150 static
151 ibool
page_dir_slot_check(const page_dir_slot_t * slot)152 page_dir_slot_check(
153 /*================*/
154 	const page_dir_slot_t*	slot)	/*!< in: slot */
155 {
156 	const page_t*	page;
157 	ulint		n_slots;
158 	ulint		n_owned;
159 
160 	ut_a(slot);
161 
162 	page = page_align(slot);
163 
164 	n_slots = page_dir_get_n_slots(page);
165 
166 	ut_a(slot <= page_dir_get_nth_slot(page, 0));
167 	ut_a(slot >= page_dir_get_nth_slot(page, n_slots - 1));
168 
169 	ut_a(page_rec_check(page_dir_slot_get_rec(slot)));
170 
171 	if (page_is_comp(page)) {
172 		n_owned = rec_get_n_owned_new(page_dir_slot_get_rec(slot));
173 	} else {
174 		n_owned = rec_get_n_owned_old(page_dir_slot_get_rec(slot));
175 	}
176 
177 	if (slot == page_dir_get_nth_slot(page, 0)) {
178 		ut_a(n_owned == 1);
179 	} else if (slot == page_dir_get_nth_slot(page, n_slots - 1)) {
180 		ut_a(n_owned >= 1);
181 		ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
182 	} else {
183 		ut_a(n_owned >= PAGE_DIR_SLOT_MIN_N_OWNED);
184 		ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
185 	}
186 
187 	return(TRUE);
188 }
189 
190 /*************************************************************//**
191 Sets the max trx id field value. */
192 void
page_set_max_trx_id(buf_block_t * block,page_zip_des_t * page_zip,trx_id_t trx_id,mtr_t * mtr)193 page_set_max_trx_id(
194 /*================*/
195 	buf_block_t*	block,	/*!< in/out: page */
196 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
197 	trx_id_t	trx_id,	/*!< in: transaction id */
198 	mtr_t*		mtr)	/*!< in/out: mini-transaction, or NULL */
199 {
200   ut_ad(!mtr || mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
201   ut_ad(!page_zip || page_zip == &block->page.zip);
202   static_assert((PAGE_HEADER + PAGE_MAX_TRX_ID) % 8 == 0, "alignment");
203   byte *max_trx_id= my_assume_aligned<8>(PAGE_MAX_TRX_ID +
204                                          PAGE_HEADER + block->frame);
205 
206   mtr->write<8>(*block, max_trx_id, trx_id);
207   if (UNIV_LIKELY_NULL(page_zip))
208     memcpy_aligned<8>(&page_zip->data[PAGE_MAX_TRX_ID + PAGE_HEADER],
209                       max_trx_id, 8);
210 }
211 
212 /** Persist the AUTO_INCREMENT value on a clustered index root page.
213 @param[in,out]	block	clustered index root page
214 @param[in]	index	clustered index
215 @param[in]	autoinc	next available AUTO_INCREMENT value
216 @param[in,out]	mtr	mini-transaction
217 @param[in]	reset	whether to reset the AUTO_INCREMENT
218 			to a possibly smaller value than currently
219 			exists in the page */
220 void
page_set_autoinc(buf_block_t * block,ib_uint64_t autoinc,mtr_t * mtr,bool reset)221 page_set_autoinc(
222 	buf_block_t*		block,
223 	ib_uint64_t		autoinc,
224 	mtr_t*			mtr,
225 	bool			reset)
226 {
227   ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX |
228                                    MTR_MEMO_PAGE_SX_FIX));
229 
230   byte *field= my_assume_aligned<8>(PAGE_HEADER + PAGE_ROOT_AUTO_INC +
231                                     block->frame);
232   ib_uint64_t old= mach_read_from_8(field);
233   if (old == autoinc || (old > autoinc && !reset))
234     return; /* nothing to update */
235 
236   mtr->write<8>(*block, field, autoinc);
237   if (UNIV_LIKELY_NULL(block->page.zip.data))
238     memcpy_aligned<8>(PAGE_HEADER + PAGE_ROOT_AUTO_INC + block->page.zip.data,
239                       field, 8);
240 }
241 
242 /** The page infimum and supremum of an empty page in ROW_FORMAT=REDUNDANT */
243 static const byte infimum_supremum_redundant[] = {
244 	/* the infimum record */
245 	0x08/*end offset*/,
246 	0x01/*n_owned*/,
247 	0x00, 0x00/*heap_no=0*/,
248 	0x03/*n_fields=1, 1-byte offsets*/,
249 	0x00, 0x74/* pointer to supremum */,
250 	'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
251 	/* the supremum record */
252 	0x09/*end offset*/,
253 	0x01/*n_owned*/,
254 	0x00, 0x08/*heap_no=1*/,
255 	0x03/*n_fields=1, 1-byte offsets*/,
256 	0x00, 0x00/* end of record list */,
257 	's', 'u', 'p', 'r', 'e', 'm', 'u', 'm', 0
258 };
259 
260 /** The page infimum and supremum of an empty page in ROW_FORMAT=COMPACT */
261 static const byte infimum_supremum_compact[] = {
262 	/* the infimum record */
263 	0x01/*n_owned=1*/,
264 	0x00, 0x02/* heap_no=0, REC_STATUS_INFIMUM */,
265 	0x00, 0x0d/* pointer to supremum */,
266 	'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
267 	/* the supremum record */
268 	0x01/*n_owned=1*/,
269 	0x00, 0x0b/* heap_no=1, REC_STATUS_SUPREMUM */,
270 	0x00, 0x00/* end of record list */,
271 	's', 'u', 'p', 'r', 'e', 'm', 'u', 'm'
272 };
273 
274 /** Create an index page.
275 @param[in,out]	block	buffer block
276 @param[in]	comp	nonzero=compact page format */
page_create_low(const buf_block_t * block,bool comp)277 void page_create_low(const buf_block_t* block, bool comp)
278 {
279 	page_t*		page;
280 
281 	compile_time_assert(PAGE_BTR_IBUF_FREE_LIST + FLST_BASE_NODE_SIZE
282 			    <= PAGE_DATA);
283 	compile_time_assert(PAGE_BTR_IBUF_FREE_LIST_NODE + FLST_NODE_SIZE
284 			    <= PAGE_DATA);
285 
286 	page = block->frame;
287 
288 	fil_page_set_type(page, FIL_PAGE_INDEX);
289 
290 	memset(page + PAGE_HEADER, 0, PAGE_HEADER_PRIV_END);
291 	page[PAGE_HEADER + PAGE_N_DIR_SLOTS + 1] = 2;
292 	page[PAGE_HEADER + PAGE_INSTANT] = 0;
293 	page[PAGE_HEADER + PAGE_DIRECTION_B] = PAGE_NO_DIRECTION;
294 
295 	if (comp) {
296 		page[PAGE_HEADER + PAGE_N_HEAP] = 0x80;/*page_is_comp()*/
297 		page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
298 		page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_NEW_SUPREMUM_END;
299 		memcpy(page + PAGE_DATA, infimum_supremum_compact,
300 		       sizeof infimum_supremum_compact);
301 		memset(page
302 		       + PAGE_NEW_SUPREMUM_END, 0,
303 		       srv_page_size - PAGE_DIR - PAGE_NEW_SUPREMUM_END);
304 		page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
305 			= PAGE_NEW_SUPREMUM;
306 		page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
307 			= PAGE_NEW_INFIMUM;
308 	} else {
309 		page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
310 		page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_OLD_SUPREMUM_END;
311 		memcpy(page + PAGE_DATA, infimum_supremum_redundant,
312 		       sizeof infimum_supremum_redundant);
313 		memset(page
314 		       + PAGE_OLD_SUPREMUM_END, 0,
315 		       srv_page_size - PAGE_DIR - PAGE_OLD_SUPREMUM_END);
316 		page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
317 			= PAGE_OLD_SUPREMUM;
318 		page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
319 			= PAGE_OLD_INFIMUM;
320 	}
321 }
322 
323 /** Create an uncompressed index page.
324 @param[in,out]	block	buffer block
325 @param[in,out]	mtr	mini-transaction
326 @param[in]	comp	set unless ROW_FORMAT=REDUNDANT */
page_create(buf_block_t * block,mtr_t * mtr,bool comp)327 void page_create(buf_block_t *block, mtr_t *mtr, bool comp)
328 {
329   mtr->page_create(*block, comp);
330   buf_block_modify_clock_inc(block);
331   page_create_low(block, comp);
332 }
333 
334 /**********************************************************//**
335 Create a compressed B-tree index page. */
336 void
page_create_zip(buf_block_t * block,dict_index_t * index,ulint level,trx_id_t max_trx_id,mtr_t * mtr)337 page_create_zip(
338 /*============*/
339 	buf_block_t*		block,		/*!< in/out: a buffer frame
340 						where the page is created */
341 	dict_index_t*		index,		/*!< in: the index of the
342 						page */
343 	ulint			level,		/*!< in: the B-tree level
344 						of the page */
345 	trx_id_t		max_trx_id,	/*!< in: PAGE_MAX_TRX_ID */
346 	mtr_t*			mtr)		/*!< in/out: mini-transaction
347 						handle */
348 {
349 	ut_ad(block);
350 	ut_ad(buf_block_get_page_zip(block));
351 	ut_ad(dict_table_is_comp(index->table));
352 
353 	/* PAGE_MAX_TRX_ID or PAGE_ROOT_AUTO_INC are always 0 for
354 	temporary tables. */
355 	ut_ad(max_trx_id == 0 || !index->table->is_temporary());
356 	/* In secondary indexes and the change buffer, PAGE_MAX_TRX_ID
357 	must be zero on non-leaf pages. max_trx_id can be 0 when the
358 	index consists of an empty root (leaf) page. */
359 	ut_ad(max_trx_id == 0
360 	      || level == 0
361 	      || !dict_index_is_sec_or_ibuf(index)
362 	      || index->table->is_temporary());
363 	/* In the clustered index, PAGE_ROOT_AUTOINC or
364 	PAGE_MAX_TRX_ID must be 0 on other pages than the root. */
365 	ut_ad(level == 0 || max_trx_id == 0
366 	      || !dict_index_is_sec_or_ibuf(index)
367 	      || index->table->is_temporary());
368 
369 	buf_block_modify_clock_inc(block);
370 	page_create_low(block, true);
371 
372 	if (index->is_spatial()) {
373 		mach_write_to_2(FIL_PAGE_TYPE + block->frame, FIL_PAGE_RTREE);
374 		memset(block->frame + FIL_RTREE_SPLIT_SEQ_NUM, 0, 8);
375 		memset(block->page.zip.data + FIL_RTREE_SPLIT_SEQ_NUM, 0, 8);
376 	}
377 
378 	mach_write_to_2(PAGE_HEADER + PAGE_LEVEL + block->frame, level);
379 	mach_write_to_8(PAGE_HEADER + PAGE_MAX_TRX_ID + block->frame,
380 			max_trx_id);
381 
382 	if (!page_zip_compress(block, index, page_zip_level, mtr)) {
383 		/* The compression of a newly created
384 		page should always succeed. */
385 		ut_error;
386 	}
387 }
388 
389 /**********************************************************//**
390 Empty a previously created B-tree index page. */
391 void
page_create_empty(buf_block_t * block,dict_index_t * index,mtr_t * mtr)392 page_create_empty(
393 /*==============*/
394 	buf_block_t*	block,	/*!< in/out: B-tree block */
395 	dict_index_t*	index,	/*!< in: the index of the page */
396 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
397 {
398 	trx_id_t	max_trx_id;
399 	page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
400 
401 	ut_ad(fil_page_index_page_check(block->frame));
402 	ut_ad(!index->is_dummy);
403 	ut_ad(block->page.id().space() == index->table->space->id);
404 
405 	/* Multiple transactions cannot simultaneously operate on the
406 	same temp-table in parallel.
407 	max_trx_id is ignored for temp tables because it not required
408 	for MVCC. */
409 	if (dict_index_is_sec_or_ibuf(index)
410 	    && !index->table->is_temporary()
411 	    && page_is_leaf(block->frame)) {
412 		max_trx_id = page_get_max_trx_id(block->frame);
413 		ut_ad(max_trx_id);
414 	} else if (block->page.id().page_no() == index->page) {
415 		/* Preserve PAGE_ROOT_AUTO_INC. */
416 		max_trx_id = page_get_max_trx_id(block->frame);
417 	} else {
418 		max_trx_id = 0;
419 	}
420 
421 	if (page_zip) {
422 		ut_ad(!index->table->is_temporary());
423 		page_create_zip(block, index,
424 				page_header_get_field(block->frame,
425 						      PAGE_LEVEL),
426 				max_trx_id, mtr);
427 	} else {
428 		page_create(block, mtr, index->table->not_redundant());
429 		if (index->is_spatial()) {
430 			static_assert(((FIL_PAGE_INDEX & 0xff00)
431 				       | byte(FIL_PAGE_RTREE))
432 				      == FIL_PAGE_RTREE, "compatibility");
433 			mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
434 				      byte(FIL_PAGE_RTREE));
435 			if (mach_read_from_8(block->frame
436 					     + FIL_RTREE_SPLIT_SEQ_NUM)) {
437 				mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM,
438 					    8, 0);
439 			}
440 		}
441 
442 		if (max_trx_id) {
443 			mtr->write<8>(*block, PAGE_HEADER + PAGE_MAX_TRX_ID
444 				      + block->frame, max_trx_id);
445 		}
446 	}
447 }
448 
449 /*************************************************************//**
450 Differs from page_copy_rec_list_end, because this function does not
451 touch the lock table and max trx id on page or compress the page.
452 
453 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
454 if new_block is a compressed leaf page in a secondary index.
455 This has to be done either within the same mini-transaction,
456 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
457 void
page_copy_rec_list_end_no_locks(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)458 page_copy_rec_list_end_no_locks(
459 /*============================*/
460 	buf_block_t*	new_block,	/*!< in: index page to copy to */
461 	buf_block_t*	block,		/*!< in: index page of rec */
462 	rec_t*		rec,		/*!< in: record on page */
463 	dict_index_t*	index,		/*!< in: record descriptor */
464 	mtr_t*		mtr)		/*!< in: mtr */
465 {
466 	page_t*		new_page	= buf_block_get_frame(new_block);
467 	page_cur_t	cur1;
468 	page_cur_t	cur2;
469 	mem_heap_t*	heap		= NULL;
470 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
471 	rec_offs*	offsets		= offsets_;
472 	rec_offs_init(offsets_);
473 
474 	page_cur_position(rec, block, &cur1);
475 
476 	if (page_cur_is_before_first(&cur1)) {
477 
478 		page_cur_move_to_next(&cur1);
479 	}
480 
481 	btr_assert_not_corrupted(new_block, index);
482 	ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
483 	ut_a(mach_read_from_2(new_page + srv_page_size - 10) == (ulint)
484 	     (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
485 	const ulint n_core = page_is_leaf(block->frame)
486 		? index->n_core_fields : 0;
487 
488 	page_cur_set_before_first(new_block, &cur2);
489 
490 	/* Copy records from the original page to the new page */
491 
492 	while (!page_cur_is_after_last(&cur1)) {
493 		rec_t*	ins_rec;
494 		offsets = rec_get_offsets(cur1.rec, index, offsets, n_core,
495 					  ULINT_UNDEFINED, &heap);
496 		ins_rec = page_cur_insert_rec_low(&cur2, index,
497 						  cur1.rec, offsets, mtr);
498 		if (UNIV_UNLIKELY(!ins_rec)) {
499 			ib::fatal() << "Rec offset " << page_offset(rec)
500 				<< ", cur1 offset " << page_offset(cur1.rec)
501 				<< ", cur2 offset " << page_offset(cur2.rec);
502 		}
503 
504 		page_cur_move_to_next(&cur1);
505 		ut_ad(!(rec_get_info_bits(cur1.rec, page_is_comp(new_page))
506 			& REC_INFO_MIN_REC_FLAG));
507 		cur2.rec = ins_rec;
508 	}
509 
510 	if (UNIV_LIKELY_NULL(heap)) {
511 		mem_heap_free(heap);
512 	}
513 }
514 
515 /*************************************************************//**
516 Copies records from page to new_page, from a given record onward,
517 including that record. Infimum and supremum records are not copied.
518 The records are copied to the start of the record list on new_page.
519 
520 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
521 if new_block is a compressed leaf page in a secondary index.
522 This has to be done either within the same mini-transaction,
523 or by invoking ibuf_reset_free_bits() before mtr_commit().
524 
525 @return pointer to the original successor of the infimum record on
526 new_page, or NULL on zip overflow (new_block will be decompressed) */
527 rec_t*
page_copy_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)528 page_copy_rec_list_end(
529 /*===================*/
530 	buf_block_t*	new_block,	/*!< in/out: index page to copy to */
531 	buf_block_t*	block,		/*!< in: index page containing rec */
532 	rec_t*		rec,		/*!< in: record on page */
533 	dict_index_t*	index,		/*!< in: record descriptor */
534 	mtr_t*		mtr)		/*!< in: mtr */
535 {
536 	page_t*		new_page	= buf_block_get_frame(new_block);
537 	page_zip_des_t*	new_page_zip	= buf_block_get_page_zip(new_block);
538 	page_t*		page		= block->frame;
539 	rec_t*		ret		= page_rec_get_next(
540 		page_get_infimum_rec(new_page));
541 	ulint		num_moved	= 0;
542 	rtr_rec_move_t*	rec_move	= NULL;
543 	mem_heap_t*	heap		= NULL;
544 	ut_ad(page_align(rec) == page);
545 
546 #ifdef UNIV_ZIP_DEBUG
547 	if (new_page_zip) {
548 		page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
549 		ut_a(page_zip);
550 
551 		/* Strict page_zip_validate() may fail here.
552 		Furthermore, btr_compress() may set FIL_PAGE_PREV to
553 		FIL_NULL on new_page while leaving it intact on
554 		new_page_zip.  So, we cannot validate new_page_zip. */
555 		ut_a(page_zip_validate_low(page_zip, page, index, TRUE));
556 	}
557 #endif /* UNIV_ZIP_DEBUG */
558 	ut_ad(buf_block_get_frame(block) == page);
559 	ut_ad(page_is_leaf(page) == page_is_leaf(new_page));
560 	ut_ad(page_is_comp(page) == page_is_comp(new_page));
561 	/* Here, "ret" may be pointing to a user record or the
562 	predefined supremum record. */
563 
564 	const mtr_log_t log_mode = new_page_zip
565 		? mtr->set_log_mode(MTR_LOG_NONE) : MTR_LOG_NONE;
566 	const bool was_empty = page_dir_get_n_heap(new_page)
567 		== PAGE_HEAP_NO_USER_LOW;
568 	alignas(2) byte h[PAGE_N_DIRECTION + 2 - PAGE_LAST_INSERT];
569 	memcpy_aligned<2>(h, PAGE_HEADER + PAGE_LAST_INSERT + new_page,
570 			  sizeof h);
571 
572 	if (index->is_spatial()) {
573 		ulint	max_to_move = page_get_n_recs(
574 			buf_block_get_frame(block));
575 		heap = mem_heap_create(256);
576 
577 		rec_move = static_cast<rtr_rec_move_t*>(
578 			mem_heap_alloc(heap, max_to_move * sizeof *rec_move));
579 
580 		/* For spatial index, we need to insert recs one by one
581 		to keep recs ordered. */
582 		rtr_page_copy_rec_list_end_no_locks(new_block,
583 						    block, rec, index,
584 						    heap, rec_move,
585 						    max_to_move,
586 						    &num_moved,
587 						    mtr);
588 	} else {
589 		page_copy_rec_list_end_no_locks(new_block, block, rec,
590 						index, mtr);
591 		if (was_empty) {
592 			mtr->memcpy<mtr_t::MAYBE_NOP>(*new_block, PAGE_HEADER
593 						      + PAGE_LAST_INSERT
594 						      + new_page, h, sizeof h);
595 		}
596 	}
597 
598 	/* Update PAGE_MAX_TRX_ID on the uncompressed page.
599 	Modifications will be redo logged and copied to the compressed
600 	page in page_zip_compress() or page_zip_reorganize() below.
601 	Multiple transactions cannot simultaneously operate on the
602 	same temp-table in parallel.
603 	max_trx_id is ignored for temp tables because it not required
604 	for MVCC. */
605 	if (dict_index_is_sec_or_ibuf(index)
606 	    && page_is_leaf(page)
607 	    && !index->table->is_temporary()) {
608 		ut_ad(!was_empty || page_dir_get_n_heap(new_page)
609 		      == PAGE_HEAP_NO_USER_LOW
610 		      + page_header_get_field(new_page, PAGE_N_RECS));
611 		page_update_max_trx_id(new_block, NULL,
612 				       page_get_max_trx_id(page), mtr);
613 	}
614 
615 	if (new_page_zip) {
616 		mtr_set_log_mode(mtr, log_mode);
617 
618 		if (!page_zip_compress(new_block, index,
619 				       page_zip_level, mtr)) {
620 			/* Before trying to reorganize the page,
621 			store the number of preceding records on the page. */
622 			ulint	ret_pos
623 				= page_rec_get_n_recs_before(ret);
624 			/* Before copying, "ret" was the successor of
625 			the predefined infimum record.  It must still
626 			have at least one predecessor (the predefined
627 			infimum record, or a freshly copied record
628 			that is smaller than "ret"). */
629 			ut_a(ret_pos > 0);
630 
631 			if (!page_zip_reorganize(new_block, index,
632 						 page_zip_level, mtr)) {
633 
634 				if (!page_zip_decompress(new_page_zip,
635 							 new_page, FALSE)) {
636 					ut_error;
637 				}
638 				ut_ad(page_validate(new_page, index));
639 
640 				if (heap) {
641 					mem_heap_free(heap);
642 				}
643 
644 				return(NULL);
645 			} else {
646 				/* The page was reorganized:
647 				Seek to ret_pos. */
648 				ret = page_rec_get_nth(new_page, ret_pos);
649 			}
650 		}
651 	}
652 
653 	/* Update the lock table and possible hash index */
654 
655 	if (dict_table_is_locking_disabled(index->table)) {
656 	} else if (rec_move && dict_index_is_spatial(index)) {
657 		lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
658 	} else {
659 		lock_move_rec_list_end(new_block, block, rec);
660 	}
661 
662 	if (heap) {
663 		mem_heap_free(heap);
664 	}
665 
666 	btr_search_move_or_delete_hash_entries(new_block, block);
667 
668 	return(ret);
669 }
670 
671 /*************************************************************//**
672 Copies records from page to new_page, up to the given record,
673 NOT including that record. Infimum and supremum records are not copied.
674 The records are copied to the end of the record list on new_page.
675 
676 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
677 if new_block is a compressed leaf page in a secondary index.
678 This has to be done either within the same mini-transaction,
679 or by invoking ibuf_reset_free_bits() before mtr_commit().
680 
681 @return pointer to the original predecessor of the supremum record on
682 new_page, or NULL on zip overflow (new_block will be decompressed) */
683 rec_t*
page_copy_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)684 page_copy_rec_list_start(
685 /*=====================*/
686 	buf_block_t*	new_block,	/*!< in/out: index page to copy to */
687 	buf_block_t*	block,		/*!< in: index page containing rec */
688 	rec_t*		rec,		/*!< in: record on page */
689 	dict_index_t*	index,		/*!< in: record descriptor */
690 	mtr_t*		mtr)		/*!< in: mtr */
691 {
692 	ut_ad(page_align(rec) == block->frame);
693 
694 	page_t*		new_page	= buf_block_get_frame(new_block);
695 	page_zip_des_t*	new_page_zip	= buf_block_get_page_zip(new_block);
696 	page_cur_t	cur1;
697 	page_cur_t	cur2;
698 	mem_heap_t*	heap		= NULL;
699 	ulint		num_moved	= 0;
700 	rtr_rec_move_t*	rec_move	= NULL;
701 	rec_t*		ret
702 		= page_rec_get_prev(page_get_supremum_rec(new_page));
703 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
704 	rec_offs*	offsets		= offsets_;
705 	rec_offs_init(offsets_);
706 
707 	/* Here, "ret" may be pointing to a user record or the
708 	predefined infimum record. */
709 
710 	if (page_rec_is_infimum(rec)) {
711 		return(ret);
712 	}
713 
714 	mtr_log_t	log_mode = MTR_LOG_NONE;
715 
716 	if (new_page_zip) {
717 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
718 	}
719 
720 	page_cur_set_before_first(block, &cur1);
721 	page_cur_move_to_next(&cur1);
722 
723 	page_cur_position(ret, new_block, &cur2);
724 
725 	const ulint n_core = page_rec_is_leaf(rec) ? index->n_core_fields : 0;
726 
727 	/* Copy records from the original page to the new page */
728 	if (index->is_spatial()) {
729 		ut_ad(!index->is_instant());
730 		ulint		max_to_move = page_get_n_recs(
731 						buf_block_get_frame(block));
732 		heap = mem_heap_create(256);
733 
734 		rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
735 					heap,
736 					sizeof (*rec_move) * max_to_move));
737 
738 		/* For spatial index, we need to insert recs one by one
739 		to keep recs ordered. */
740 		rtr_page_copy_rec_list_start_no_locks(new_block,
741 						      block, rec, index, heap,
742 						      rec_move, max_to_move,
743 						      &num_moved, mtr);
744 	} else {
745 		while (page_cur_get_rec(&cur1) != rec) {
746 			offsets = rec_get_offsets(cur1.rec, index, offsets,
747 						  n_core,
748 						  ULINT_UNDEFINED, &heap);
749 			cur2.rec = page_cur_insert_rec_low(&cur2, index,
750 							   cur1.rec, offsets,
751 							   mtr);
752 			ut_a(cur2.rec);
753 
754 			page_cur_move_to_next(&cur1);
755 			ut_ad(!(rec_get_info_bits(cur1.rec,
756 						  page_is_comp(new_page))
757 				& REC_INFO_MIN_REC_FLAG));
758 		}
759 	}
760 
761 	/* Update PAGE_MAX_TRX_ID on the uncompressed page.
762 	Modifications will be redo logged and copied to the compressed
763 	page in page_zip_compress() or page_zip_reorganize() below.
764 	Multiple transactions cannot simultaneously operate on the
765 	same temp-table in parallel.
766 	max_trx_id is ignored for temp tables because it not required
767 	for MVCC. */
768 	if (n_core && dict_index_is_sec_or_ibuf(index)
769 	    && !index->table->is_temporary()) {
770 		page_update_max_trx_id(new_block,
771 				       new_page_zip,
772 				       page_get_max_trx_id(block->frame),
773 				       mtr);
774 	}
775 
776 	if (new_page_zip) {
777 		mtr_set_log_mode(mtr, log_mode);
778 
779 		DBUG_EXECUTE_IF("page_copy_rec_list_start_compress_fail",
780 				goto zip_reorganize;);
781 
782 		if (!page_zip_compress(new_block, index,
783 				       page_zip_level, mtr)) {
784 			ulint	ret_pos;
785 #ifndef DBUG_OFF
786 zip_reorganize:
787 #endif /* DBUG_OFF */
788 			/* Before trying to reorganize the page,
789 			store the number of preceding records on the page. */
790 			ret_pos = page_rec_get_n_recs_before(ret);
791 			/* Before copying, "ret" was the predecessor
792 			of the predefined supremum record.  If it was
793 			the predefined infimum record, then it would
794 			still be the infimum, and we would have
795 			ret_pos == 0. */
796 
797 			if (UNIV_UNLIKELY
798 			    (!page_zip_reorganize(new_block, index,
799 						  page_zip_level, mtr))) {
800 
801 				if (UNIV_UNLIKELY
802 				    (!page_zip_decompress(new_page_zip,
803 							  new_page, FALSE))) {
804 					ut_error;
805 				}
806 				ut_ad(page_validate(new_page, index));
807 
808 				if (UNIV_LIKELY_NULL(heap)) {
809 					mem_heap_free(heap);
810 				}
811 
812 				return(NULL);
813 			}
814 
815 			/* The page was reorganized: Seek to ret_pos. */
816 			ret = page_rec_get_nth(new_page, ret_pos);
817 		}
818 	}
819 
820 	/* Update the lock table and possible hash index */
821 
822 	if (dict_table_is_locking_disabled(index->table)) {
823 	} else if (dict_index_is_spatial(index)) {
824 		lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
825 	} else {
826 		lock_move_rec_list_start(new_block, block, rec, ret);
827 	}
828 
829 	if (heap) {
830 		mem_heap_free(heap);
831 	}
832 
833 	btr_search_move_or_delete_hash_entries(new_block, block);
834 
835 	return(ret);
836 }
837 
838 /*************************************************************//**
839 Deletes records from a page from a given record onward, including that record.
840 The infimum and supremum records are not deleted. */
841 void
page_delete_rec_list_end(rec_t * rec,buf_block_t * block,dict_index_t * index,ulint n_recs,ulint size,mtr_t * mtr)842 page_delete_rec_list_end(
843 /*=====================*/
844 	rec_t*		rec,	/*!< in: pointer to record on page */
845 	buf_block_t*	block,	/*!< in: buffer block of the page */
846 	dict_index_t*	index,	/*!< in: record descriptor */
847 	ulint		n_recs,	/*!< in: number of records to delete,
848 				or ULINT_UNDEFINED if not known */
849 	ulint		size,	/*!< in: the sum of the sizes of the
850 				records in the end of the chain to
851 				delete, or ULINT_UNDEFINED if not known */
852 	mtr_t*		mtr)	/*!< in: mtr */
853 {
854   ut_ad(size == ULINT_UNDEFINED || size < srv_page_size);
855   ut_ad(page_align(rec) == block->frame);
856   ut_ad(index->table->not_redundant() == !!page_is_comp(block->frame));
857 #ifdef UNIV_ZIP_DEBUG
858   ut_a(!block->page.zip.data ||
859        page_zip_validate(&block->page.zip, block->frame, index));
860 #endif /* UNIV_ZIP_DEBUG */
861 
862   if (page_rec_is_supremum(rec))
863   {
864     ut_ad(n_recs == 0 || n_recs == ULINT_UNDEFINED);
865     /* Nothing to do, there are no records bigger than the page supremum. */
866     return;
867   }
868 
869   if (page_rec_is_infimum(rec) || n_recs == page_get_n_recs(block->frame) ||
870       rec == (page_is_comp(block->frame)
871               ? page_rec_get_next_low(block->frame + PAGE_NEW_INFIMUM, 1)
872               : page_rec_get_next_low(block->frame + PAGE_OLD_INFIMUM, 0)))
873   {
874     /* We are deleting all records. */
875     page_create_empty(block, index, mtr);
876     return;
877   }
878 
879 #if 0 // FIXME: consider deleting the last record as a special case
880   if (page_rec_is_last(rec))
881   {
882     page_cur_t cursor= { index, rec, offsets, block };
883     page_cur_delete_rec(&cursor, index, offsets, mtr);
884     return;
885   }
886 #endif
887 
888   /* The page becomes invalid for optimistic searches */
889   buf_block_modify_clock_inc(block);
890 
891   const ulint n_core= page_is_leaf(block->frame) ? index->n_core_fields : 0;
892   mem_heap_t *heap= nullptr;
893   rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
894   rec_offs *offsets= offsets_;
895   rec_offs_init(offsets_);
896 
897 #if 1 // FIXME: remove this, and write minimal amount of log! */
898   if (UNIV_LIKELY_NULL(block->page.zip.data))
899   {
900     ut_ad(page_is_comp(block->frame));
901     do
902     {
903       page_cur_t cur;
904       page_cur_position(rec, block, &cur);
905       offsets= rec_get_offsets(rec, index, offsets, n_core,
906 			       ULINT_UNDEFINED, &heap);
907       rec= rec_get_next_ptr(rec, TRUE);
908 #ifdef UNIV_ZIP_DEBUG
909       ut_a(page_zip_validate(&block->page.zip, block->frame, index));
910 #endif /* UNIV_ZIP_DEBUG */
911       page_cur_delete_rec(&cur, index, offsets, mtr);
912     }
913     while (page_offset(rec) != PAGE_NEW_SUPREMUM);
914 
915     if (UNIV_LIKELY_NULL(heap))
916       mem_heap_free(heap);
917     return;
918   }
919 #endif
920 
921   byte *prev_rec= page_rec_get_prev(rec);
922   byte *last_rec= page_rec_get_prev(page_get_supremum_rec(block->frame));
923 
924   // FIXME: consider a special case of shrinking PAGE_HEAP_TOP
925 
926   const bool scrub= srv_immediate_scrub_data_uncompressed;
927   if (scrub || size == ULINT_UNDEFINED || n_recs == ULINT_UNDEFINED)
928   {
929     rec_t *rec2= rec;
930     /* Calculate the sum of sizes and the number of records */
931     size= 0;
932     n_recs= 0;
933 
934     do
935     {
936       offsets = rec_get_offsets(rec2, index, offsets, n_core,
937                                 ULINT_UNDEFINED, &heap);
938       ulint s= rec_offs_size(offsets);
939       ut_ad(ulint(rec2 - block->frame) + s - rec_offs_extra_size(offsets) <
940             srv_page_size);
941       ut_ad(size + s < srv_page_size);
942       size+= s;
943       n_recs++;
944 
945       if (scrub)
946         mtr->memset(block, page_offset(rec2), rec_offs_data_size(offsets), 0);
947 
948       rec2 = page_rec_get_next(rec2);
949     }
950     while (!page_rec_is_supremum(rec2));
951 
952     if (UNIV_LIKELY_NULL(heap))
953       mem_heap_free(heap);
954   }
955 
956   ut_ad(size < srv_page_size);
957 
958   ulint slot_index, n_owned;
959   {
960     const rec_t *owner_rec= rec;
961     ulint count= 0;
962 
963     if (page_is_comp(block->frame))
964       while (!(n_owned= rec_get_n_owned_new(owner_rec)))
965       {
966         count++;
967 	owner_rec= rec_get_next_ptr_const(owner_rec, TRUE);
968       }
969     else
970       while (!(n_owned= rec_get_n_owned_old(owner_rec)))
971       {
972         count++;
973 	owner_rec= rec_get_next_ptr_const(owner_rec, FALSE);
974       }
975 
976     ut_ad(n_owned > count);
977     n_owned-= count;
978     slot_index= page_dir_find_owner_slot(owner_rec);
979     ut_ad(slot_index > 0);
980   }
981 
982   mtr->write<2,mtr_t::MAYBE_NOP>(*block, my_assume_aligned<2>
983                                  (PAGE_N_DIR_SLOTS + PAGE_HEADER +
984                                   block->frame), slot_index + 1);
985   mtr->write<2,mtr_t::MAYBE_NOP>(*block, my_assume_aligned<2>
986                                  (PAGE_LAST_INSERT + PAGE_HEADER +
987                                   block->frame), 0U);
988   /* Catenate the deleted chain segment to the page free list */
989   alignas(4) byte page_header[4];
990   byte *page_free= my_assume_aligned<4>(PAGE_HEADER + PAGE_FREE +
991                                         block->frame);
992   const uint16_t free= page_header_get_field(block->frame, PAGE_FREE);
993   static_assert(PAGE_FREE + 2 == PAGE_GARBAGE, "compatibility");
994 
995   mach_write_to_2(page_header, page_offset(rec));
996   mach_write_to_2(my_assume_aligned<2>(page_header + 2),
997                   mach_read_from_2(my_assume_aligned<2>(page_free + 2)) +
998                   size);
999   mtr->memcpy(*block, page_free, page_header, 4);
1000 
1001   byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
1002                                           block->frame);
1003   mtr->write<2>(*block, page_n_recs,
1004                 ulint{mach_read_from_2(page_n_recs)} - n_recs);
1005 
1006   /* Update the page directory; there is no need to balance the number
1007   of the records owned by the supremum record, as it is allowed to be
1008   less than PAGE_DIR_SLOT_MIN_N_OWNED */
1009   page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, slot_index);
1010 
1011   if (page_is_comp(block->frame))
1012   {
1013     mtr->write<2,mtr_t::MAYBE_NOP>(*block, slot, PAGE_NEW_SUPREMUM);
1014     byte *owned= PAGE_NEW_SUPREMUM - REC_NEW_N_OWNED + block->frame;
1015     byte new_owned= static_cast<byte>((*owned & ~REC_N_OWNED_MASK) |
1016                                       n_owned << REC_N_OWNED_SHIFT);
1017 #if 0 // FIXME: implement minimal logging for ROW_FORMAT=COMPRESSED
1018     if (UNIV_LIKELY_NULL(block->page.zip.data))
1019     {
1020       *owned= new_owned;
1021       memcpy_aligned<2>(PAGE_N_DIR_SLOTS + PAGE_HEADER + block->page.zip.data,
1022                         PAGE_N_DIR_SLOTS + PAGE_HEADER + block->frame,
1023 			PAGE_N_RECS + 2 - PAGE_N_DIR_SLOTS);
1024       // TODO: the equivalent of page_zip_dir_delete() for all records
1025       mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
1026 		      (PAGE_NEW_SUPREMUM - page_offset(prev_rec)));
1027       mach_write_to_2(last_rec - REC_NEXT, free
1028                     ? static_cast<uint16_t>(free - page_offset(last_rec))
1029                     : 0U);
1030       return;
1031     }
1032 #endif
1033     mtr->write<1,mtr_t::MAYBE_NOP>(*block, owned, new_owned);
1034     mtr->write<2>(*block, prev_rec - REC_NEXT, static_cast<uint16_t>
1035                   (PAGE_NEW_SUPREMUM - page_offset(prev_rec)));
1036     mtr->write<2>(*block, last_rec - REC_NEXT, free
1037                   ? static_cast<uint16_t>(free - page_offset(last_rec))
1038                   : 0U);
1039   }
1040   else
1041   {
1042     mtr->write<2,mtr_t::MAYBE_NOP>(*block, slot, PAGE_OLD_SUPREMUM);
1043     byte *owned= PAGE_OLD_SUPREMUM - REC_OLD_N_OWNED + block->frame;
1044     byte new_owned= static_cast<byte>((*owned & ~REC_N_OWNED_MASK) |
1045                                       n_owned << REC_N_OWNED_SHIFT);
1046     mtr->write<1,mtr_t::MAYBE_NOP>(*block, owned, new_owned);
1047     mtr->write<2>(*block, prev_rec - REC_NEXT, PAGE_OLD_SUPREMUM);
1048     mtr->write<2>(*block, last_rec - REC_NEXT, free);
1049   }
1050 }
1051 
1052 /*************************************************************//**
1053 Deletes records from page, up to the given record, NOT including
1054 that record. Infimum and supremum records are not deleted. */
1055 void
page_delete_rec_list_start(rec_t * rec,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1056 page_delete_rec_list_start(
1057 /*=======================*/
1058 	rec_t*		rec,	/*!< in: record on page */
1059 	buf_block_t*	block,	/*!< in: buffer block of the page */
1060 	dict_index_t*	index,	/*!< in: record descriptor */
1061 	mtr_t*		mtr)	/*!< in: mtr */
1062 {
1063 	page_cur_t	cur1;
1064 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
1065 	rec_offs*	offsets		= offsets_;
1066 	mem_heap_t*	heap		= NULL;
1067 
1068 	rec_offs_init(offsets_);
1069 
1070 	ut_ad(page_align(rec) == block->frame);
1071 	ut_ad((ibool) !!page_rec_is_comp(rec)
1072 	      == dict_table_is_comp(index->table));
1073 #ifdef UNIV_ZIP_DEBUG
1074 	{
1075 		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
1076 		page_t*		page	= buf_block_get_frame(block);
1077 
1078 		/* page_zip_validate() would detect a min_rec_mark mismatch
1079 		in btr_page_split_and_insert()
1080 		between btr_attach_half_pages() and insert_page = ...
1081 		when btr_page_get_split_rec_to_left() holds
1082 		(direction == FSP_DOWN). */
1083 		ut_a(!page_zip
1084 		     || page_zip_validate_low(page_zip, page, index, TRUE));
1085 	}
1086 #endif /* UNIV_ZIP_DEBUG */
1087 
1088 	if (page_rec_is_infimum(rec)) {
1089 		return;
1090 	}
1091 
1092 	if (page_rec_is_supremum(rec)) {
1093 		/* We are deleting all records. */
1094 		page_create_empty(block, index, mtr);
1095 		return;
1096 	}
1097 
1098 	page_cur_set_before_first(block, &cur1);
1099 	page_cur_move_to_next(&cur1);
1100 
1101 	const ulint	n_core = page_rec_is_leaf(rec)
1102 		? index->n_core_fields : 0;
1103 
1104 	while (page_cur_get_rec(&cur1) != rec) {
1105 		offsets = rec_get_offsets(page_cur_get_rec(&cur1), index,
1106 					  offsets, n_core,
1107 					  ULINT_UNDEFINED, &heap);
1108 		page_cur_delete_rec(&cur1, index, offsets, mtr);
1109 	}
1110 
1111 	if (UNIV_LIKELY_NULL(heap)) {
1112 		mem_heap_free(heap);
1113 	}
1114 }
1115 
1116 /*************************************************************//**
1117 Moves record list end to another page. Moved records include
1118 split_rec.
1119 
1120 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1121 if new_block is a compressed leaf page in a secondary index.
1122 This has to be done either within the same mini-transaction,
1123 or by invoking ibuf_reset_free_bits() before mtr_commit().
1124 
1125 @return TRUE on success; FALSE on compression failure (new_block will
1126 be decompressed) */
1127 ibool
page_move_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1128 page_move_rec_list_end(
1129 /*===================*/
1130 	buf_block_t*	new_block,	/*!< in/out: index page where to move */
1131 	buf_block_t*	block,		/*!< in: index page from where to move */
1132 	rec_t*		split_rec,	/*!< in: first record to move */
1133 	dict_index_t*	index,		/*!< in: record descriptor */
1134 	mtr_t*		mtr)		/*!< in: mtr */
1135 {
1136 	page_t*		new_page	= buf_block_get_frame(new_block);
1137 	ulint		old_data_size;
1138 	ulint		new_data_size;
1139 	ulint		old_n_recs;
1140 	ulint		new_n_recs;
1141 
1142 	ut_ad(!dict_index_is_spatial(index));
1143 
1144 	old_data_size = page_get_data_size(new_page);
1145 	old_n_recs = page_get_n_recs(new_page);
1146 #ifdef UNIV_ZIP_DEBUG
1147 	{
1148 		page_zip_des_t*	new_page_zip
1149 			= buf_block_get_page_zip(new_block);
1150 		page_zip_des_t*	page_zip
1151 			= buf_block_get_page_zip(block);
1152 		ut_a(!new_page_zip == !page_zip);
1153 		ut_a(!new_page_zip
1154 		     || page_zip_validate(new_page_zip, new_page, index));
1155 		ut_a(!page_zip
1156 		     || page_zip_validate(page_zip, page_align(split_rec),
1157 					  index));
1158 	}
1159 #endif /* UNIV_ZIP_DEBUG */
1160 
1161 	if (UNIV_UNLIKELY(!page_copy_rec_list_end(new_block, block,
1162 						  split_rec, index, mtr))) {
1163 		return(FALSE);
1164 	}
1165 
1166 	new_data_size = page_get_data_size(new_page);
1167 	new_n_recs = page_get_n_recs(new_page);
1168 
1169 	ut_ad(new_data_size >= old_data_size);
1170 
1171 	page_delete_rec_list_end(split_rec, block, index,
1172 				 new_n_recs - old_n_recs,
1173 				 new_data_size - old_data_size, mtr);
1174 
1175 	return(TRUE);
1176 }
1177 
1178 /*************************************************************//**
1179 Moves record list start to another page. Moved records do not include
1180 split_rec.
1181 
1182 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1183 if new_block is a compressed leaf page in a secondary index.
1184 This has to be done either within the same mini-transaction,
1185 or by invoking ibuf_reset_free_bits() before mtr_commit().
1186 
1187 @return TRUE on success; FALSE on compression failure */
1188 ibool
page_move_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1189 page_move_rec_list_start(
1190 /*=====================*/
1191 	buf_block_t*	new_block,	/*!< in/out: index page where to move */
1192 	buf_block_t*	block,		/*!< in/out: page containing split_rec */
1193 	rec_t*		split_rec,	/*!< in: first record not to move */
1194 	dict_index_t*	index,		/*!< in: record descriptor */
1195 	mtr_t*		mtr)		/*!< in: mtr */
1196 {
1197 	if (UNIV_UNLIKELY(!page_copy_rec_list_start(new_block, block,
1198 						    split_rec, index, mtr))) {
1199 		return(FALSE);
1200 	}
1201 
1202 	page_delete_rec_list_start(split_rec, block, index, mtr);
1203 
1204 	return(TRUE);
1205 }
1206 
1207 /************************************************************//**
1208 Returns the nth record of the record list.
1209 This is the inverse function of page_rec_get_n_recs_before().
1210 @return nth record */
1211 const rec_t*
page_rec_get_nth_const(const page_t * page,ulint nth)1212 page_rec_get_nth_const(
1213 /*===================*/
1214 	const page_t*	page,	/*!< in: page */
1215 	ulint		nth)	/*!< in: nth record */
1216 {
1217 	const page_dir_slot_t*	slot;
1218 	ulint			i;
1219 	ulint			n_owned;
1220 	const rec_t*		rec;
1221 
1222 	if (nth == 0) {
1223 		return(page_get_infimum_rec(page));
1224 	}
1225 
1226 	ut_ad(nth < srv_page_size / (REC_N_NEW_EXTRA_BYTES + 1));
1227 
1228 	for (i = 0;; i++) {
1229 
1230 		slot = page_dir_get_nth_slot(page, i);
1231 		n_owned = page_dir_slot_get_n_owned(slot);
1232 
1233 		if (n_owned > nth) {
1234 			break;
1235 		} else {
1236 			nth -= n_owned;
1237 		}
1238 	}
1239 
1240 	ut_ad(i > 0);
1241 	slot = page_dir_get_nth_slot(page, i - 1);
1242 	rec = page_dir_slot_get_rec(slot);
1243 
1244 	if (page_is_comp(page)) {
1245 		do {
1246 			rec = page_rec_get_next_low(rec, TRUE);
1247 			ut_ad(rec);
1248 		} while (nth--);
1249 	} else {
1250 		do {
1251 			rec = page_rec_get_next_low(rec, FALSE);
1252 			ut_ad(rec);
1253 		} while (nth--);
1254 	}
1255 
1256 	return(rec);
1257 }
1258 
1259 /***************************************************************//**
1260 Returns the number of records before the given record in chain.
1261 The number includes infimum and supremum records.
1262 @return number of records */
1263 ulint
page_rec_get_n_recs_before(const rec_t * rec)1264 page_rec_get_n_recs_before(
1265 /*=======================*/
1266 	const rec_t*	rec)	/*!< in: the physical record */
1267 {
1268 	const page_dir_slot_t*	slot;
1269 	const rec_t*		slot_rec;
1270 	const page_t*		page;
1271 	ulint			i;
1272 	lint			n	= 0;
1273 
1274 	ut_ad(page_rec_check(rec));
1275 
1276 	page = page_align(rec);
1277 	if (page_is_comp(page)) {
1278 		while (rec_get_n_owned_new(rec) == 0) {
1279 
1280 			rec = rec_get_next_ptr_const(rec, TRUE);
1281 			n--;
1282 		}
1283 
1284 		for (i = 0; ; i++) {
1285 			slot = page_dir_get_nth_slot(page, i);
1286 			slot_rec = page_dir_slot_get_rec(slot);
1287 
1288 			n += lint(rec_get_n_owned_new(slot_rec));
1289 
1290 			if (rec == slot_rec) {
1291 
1292 				break;
1293 			}
1294 		}
1295 	} else {
1296 		while (rec_get_n_owned_old(rec) == 0) {
1297 
1298 			rec = rec_get_next_ptr_const(rec, FALSE);
1299 			n--;
1300 		}
1301 
1302 		for (i = 0; ; i++) {
1303 			slot = page_dir_get_nth_slot(page, i);
1304 			slot_rec = page_dir_slot_get_rec(slot);
1305 
1306 			n += lint(rec_get_n_owned_old(slot_rec));
1307 
1308 			if (rec == slot_rec) {
1309 
1310 				break;
1311 			}
1312 		}
1313 	}
1314 
1315 	n--;
1316 
1317 	ut_ad(n >= 0);
1318 	ut_ad((ulong) n < srv_page_size / (REC_N_NEW_EXTRA_BYTES + 1));
1319 
1320 	return((ulint) n);
1321 }
1322 
1323 /************************************************************//**
1324 Prints record contents including the data relevant only in
1325 the index page context. */
1326 void
page_rec_print(const rec_t * rec,const rec_offs * offsets)1327 page_rec_print(
1328 /*===========*/
1329 	const rec_t*	rec,	/*!< in: physical record */
1330 	const rec_offs*	offsets)/*!< in: record descriptor */
1331 {
1332 	ut_a(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
1333 	rec_print_new(stderr, rec, offsets);
1334 	if (page_rec_is_comp(rec)) {
1335 		ib::info() << "n_owned: " << rec_get_n_owned_new(rec)
1336 			<< "; heap_no: " << rec_get_heap_no_new(rec)
1337 			<< "; next rec: " << rec_get_next_offs(rec, TRUE);
1338 	} else {
1339 		ib::info() << "n_owned: " << rec_get_n_owned_old(rec)
1340 			<< "; heap_no: " << rec_get_heap_no_old(rec)
1341 			<< "; next rec: " << rec_get_next_offs(rec, FALSE);
1342 	}
1343 
1344 	page_rec_check(rec);
1345 	rec_validate(rec, offsets);
1346 }
1347 
1348 #ifdef UNIV_BTR_PRINT
1349 /***************************************************************//**
1350 This is used to print the contents of the directory for
1351 debugging purposes. */
1352 void
page_dir_print(page_t * page,ulint pr_n)1353 page_dir_print(
1354 /*===========*/
1355 	page_t*	page,	/*!< in: index page */
1356 	ulint	pr_n)	/*!< in: print n first and n last entries */
1357 {
1358 	ulint			n;
1359 	ulint			i;
1360 	page_dir_slot_t*	slot;
1361 
1362 	n = page_dir_get_n_slots(page);
1363 
1364 	fprintf(stderr, "--------------------------------\n"
1365 		"PAGE DIRECTORY\n"
1366 		"Page address %p\n"
1367 		"Directory stack top at offs: %lu; number of slots: %lu\n",
1368 		page, (ulong) page_offset(page_dir_get_nth_slot(page, n - 1)),
1369 		(ulong) n);
1370 	for (i = 0; i < n; i++) {
1371 		slot = page_dir_get_nth_slot(page, i);
1372 		if ((i == pr_n) && (i < n - pr_n)) {
1373 			fputs("    ...   \n", stderr);
1374 		}
1375 		if ((i < pr_n) || (i >= n - pr_n)) {
1376 			fprintf(stderr,
1377 				"Contents of slot: %lu: n_owned: %lu,"
1378 				" rec offs: %lu\n",
1379 				(ulong) i,
1380 				(ulong) page_dir_slot_get_n_owned(slot),
1381 				(ulong)
1382 				page_offset(page_dir_slot_get_rec(slot)));
1383 		}
1384 	}
1385 	fprintf(stderr, "Total of %lu records\n"
1386 		"--------------------------------\n",
1387 		(ulong) (PAGE_HEAP_NO_USER_LOW + page_get_n_recs(page)));
1388 }
1389 
1390 /***************************************************************//**
1391 This is used to print the contents of the page record list for
1392 debugging purposes. */
1393 void
page_print_list(buf_block_t * block,dict_index_t * index,ulint pr_n)1394 page_print_list(
1395 /*============*/
1396 	buf_block_t*	block,	/*!< in: index page */
1397 	dict_index_t*	index,	/*!< in: dictionary index of the page */
1398 	ulint		pr_n)	/*!< in: print n first and n last entries */
1399 {
1400 	page_t*		page		= block->frame;
1401 	page_cur_t	cur;
1402 	ulint		count;
1403 	ulint		n_recs;
1404 	mem_heap_t*	heap		= NULL;
1405 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
1406 	rec_offs*	offsets		= offsets_;
1407 	rec_offs_init(offsets_);
1408 
1409 	ut_a((ibool)!!page_is_comp(page) == dict_table_is_comp(index->table));
1410 
1411 	fprint(stderr,
1412 		"--------------------------------\n"
1413 		"PAGE RECORD LIST\n"
1414 		"Page address %p\n", page);
1415 
1416 	n_recs = page_get_n_recs(page);
1417 
1418 	page_cur_set_before_first(block, &cur);
1419 	count = 0;
1420 	for (;;) {
1421 		offsets = rec_get_offsets(cur.rec, index, offsets,
1422 					  page_rec_is_leaf(cur.rec),
1423 					  ULINT_UNDEFINED, &heap);
1424 		page_rec_print(cur.rec, offsets);
1425 
1426 		if (count == pr_n) {
1427 			break;
1428 		}
1429 		if (page_cur_is_after_last(&cur)) {
1430 			break;
1431 		}
1432 		page_cur_move_to_next(&cur);
1433 		count++;
1434 	}
1435 
1436 	if (n_recs > 2 * pr_n) {
1437 		fputs(" ... \n", stderr);
1438 	}
1439 
1440 	while (!page_cur_is_after_last(&cur)) {
1441 		page_cur_move_to_next(&cur);
1442 
1443 		if (count + pr_n >= n_recs) {
1444 			offsets = rec_get_offsets(cur.rec, index, offsets,
1445 						  page_rec_is_leaf(cur.rec),
1446 						  ULINT_UNDEFINED, &heap);
1447 			page_rec_print(cur.rec, offsets);
1448 		}
1449 		count++;
1450 	}
1451 
1452 	fprintf(stderr,
1453 		"Total of %lu records \n"
1454 		"--------------------------------\n",
1455 		(ulong) (count + 1));
1456 
1457 	if (UNIV_LIKELY_NULL(heap)) {
1458 		mem_heap_free(heap);
1459 	}
1460 }
1461 
1462 /***************************************************************//**
1463 Prints the info in a page header. */
1464 void
page_header_print(const page_t * page)1465 page_header_print(
1466 /*==============*/
1467 	const page_t*	page)
1468 {
1469 	fprintf(stderr,
1470 		"--------------------------------\n"
1471 		"PAGE HEADER INFO\n"
1472 		"Page address %p, n records %u (%s)\n"
1473 		"n dir slots %u, heap top %u\n"
1474 		"Page n heap %u, free %u, garbage %u\n"
1475 		"Page last insert %u, direction %u, n direction %u\n",
1476 		page, page_header_get_field(page, PAGE_N_RECS),
1477 		page_is_comp(page) ? "compact format" : "original format",
1478 		page_header_get_field(page, PAGE_N_DIR_SLOTS),
1479 		page_header_get_field(page, PAGE_HEAP_TOP),
1480 		page_dir_get_n_heap(page),
1481 		page_header_get_field(page, PAGE_FREE),
1482 		page_header_get_field(page, PAGE_GARBAGE),
1483 		page_header_get_field(page, PAGE_LAST_INSERT),
1484 		page_get_direction(page),
1485 		page_header_get_field(page, PAGE_N_DIRECTION));
1486 }
1487 
1488 /***************************************************************//**
1489 This is used to print the contents of the page for
1490 debugging purposes. */
1491 void
page_print(buf_block_t * block,dict_index_t * index,ulint dn,ulint rn)1492 page_print(
1493 /*=======*/
1494 	buf_block_t*	block,	/*!< in: index page */
1495 	dict_index_t*	index,	/*!< in: dictionary index of the page */
1496 	ulint		dn,	/*!< in: print dn first and last entries
1497 				in directory */
1498 	ulint		rn)	/*!< in: print rn first and last records
1499 				in directory */
1500 {
1501 	page_t*	page = block->frame;
1502 
1503 	page_header_print(page);
1504 	page_dir_print(page, dn);
1505 	page_print_list(block, index, rn);
1506 }
1507 #endif /* UNIV_BTR_PRINT */
1508 
1509 /***************************************************************//**
1510 The following is used to validate a record on a page. This function
1511 differs from rec_validate as it can also check the n_owned field and
1512 the heap_no field.
1513 @return TRUE if ok */
1514 ibool
page_rec_validate(const rec_t * rec,const rec_offs * offsets)1515 page_rec_validate(
1516 /*==============*/
1517 	const rec_t*	rec,	/*!< in: physical record */
1518 	const rec_offs*	offsets)/*!< in: array returned by rec_get_offsets() */
1519 {
1520 	ulint		n_owned;
1521 	ulint		heap_no;
1522 	const page_t*	page;
1523 
1524 	page = page_align(rec);
1525 	ut_a(!page_is_comp(page) == !rec_offs_comp(offsets));
1526 
1527 	page_rec_check(rec);
1528 	rec_validate(rec, offsets);
1529 
1530 	if (page_rec_is_comp(rec)) {
1531 		n_owned = rec_get_n_owned_new(rec);
1532 		heap_no = rec_get_heap_no_new(rec);
1533 	} else {
1534 		n_owned = rec_get_n_owned_old(rec);
1535 		heap_no = rec_get_heap_no_old(rec);
1536 	}
1537 
1538 	if (UNIV_UNLIKELY(!(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED))) {
1539 		ib::warn() << "Dir slot of rec " << page_offset(rec)
1540 			<< ", n owned too big " << n_owned;
1541 		return(FALSE);
1542 	}
1543 
1544 	if (UNIV_UNLIKELY(!(heap_no < page_dir_get_n_heap(page)))) {
1545 		ib::warn() << "Heap no of rec " << page_offset(rec)
1546 			<< " too big " << heap_no << " "
1547 			<< page_dir_get_n_heap(page);
1548 		return(FALSE);
1549 	}
1550 
1551 	return(TRUE);
1552 }
1553 
1554 #ifdef UNIV_DEBUG
1555 /***************************************************************//**
1556 Checks that the first directory slot points to the infimum record and
1557 the last to the supremum. This function is intended to track if the
1558 bug fixed in 4.0.14 has caused corruption to users' databases. */
1559 void
page_check_dir(const page_t * page)1560 page_check_dir(
1561 /*===========*/
1562 	const page_t*	page)	/*!< in: index page */
1563 {
1564 	ulint	n_slots;
1565 	ulint	infimum_offs;
1566 	ulint	supremum_offs;
1567 
1568 	n_slots = page_dir_get_n_slots(page);
1569 	infimum_offs = mach_read_from_2(page_dir_get_nth_slot(page, 0));
1570 	supremum_offs = mach_read_from_2(page_dir_get_nth_slot(page,
1571 							       n_slots - 1));
1572 
1573 	if (UNIV_UNLIKELY(!page_rec_is_infimum_low(infimum_offs))) {
1574 
1575 		ib::fatal() << "Page directory corruption: infimum not"
1576 			" pointed to";
1577 	}
1578 
1579 	if (UNIV_UNLIKELY(!page_rec_is_supremum_low(supremum_offs))) {
1580 
1581 		ib::fatal() << "Page directory corruption: supremum not"
1582 			" pointed to";
1583 	}
1584 }
1585 #endif /* UNIV_DEBUG */
1586 
1587 /***************************************************************//**
1588 This function checks the consistency of an index page when we do not
1589 know the index. This is also resilient so that this should never crash
1590 even if the page is total garbage.
1591 @return TRUE if ok */
1592 ibool
page_simple_validate_old(const page_t * page)1593 page_simple_validate_old(
1594 /*=====================*/
1595 	const page_t*	page)	/*!< in: index page in ROW_FORMAT=REDUNDANT */
1596 {
1597 	const page_dir_slot_t*	slot;
1598 	ulint			slot_no;
1599 	ulint			n_slots;
1600 	const rec_t*		rec;
1601 	const byte*		rec_heap_top;
1602 	ulint			count;
1603 	ulint			own_count;
1604 	ibool			ret	= FALSE;
1605 
1606 	ut_a(!page_is_comp(page));
1607 
1608 	/* Check first that the record heap and the directory do not
1609 	overlap. */
1610 
1611 	n_slots = page_dir_get_n_slots(page);
1612 
1613 	if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
1614 		ib::error() << "Nonsensical number of page dir slots: "
1615 			    << n_slots;
1616 		goto func_exit;
1617 	}
1618 
1619 	rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
1620 
1621 	if (UNIV_UNLIKELY(rec_heap_top
1622 			  > page_dir_get_nth_slot(page, n_slots - 1))) {
1623 		ib::error()
1624 			<< "Record heap and dir overlap on a page, heap top "
1625 			<< page_header_get_field(page, PAGE_HEAP_TOP)
1626 			<< ", dir "
1627 			<< page_offset(page_dir_get_nth_slot(page,
1628 							     n_slots - 1));
1629 
1630 		goto func_exit;
1631 	}
1632 
1633 	/* Validate the record list in a loop checking also that it is
1634 	consistent with the page record directory. */
1635 
1636 	count = 0;
1637 	own_count = 1;
1638 	slot_no = 0;
1639 	slot = page_dir_get_nth_slot(page, slot_no);
1640 
1641 	rec = page_get_infimum_rec(page);
1642 
1643 	for (;;) {
1644 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
1645 			ib::error() << "Record " << (rec - page)
1646 				<< " is above rec heap top "
1647 				<< (rec_heap_top - page);
1648 
1649 			goto func_exit;
1650 		}
1651 
1652 		if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) != 0)) {
1653 			/* This is a record pointed to by a dir slot */
1654 			if (UNIV_UNLIKELY(rec_get_n_owned_old(rec)
1655 					  != own_count)) {
1656 
1657 				ib::error() << "Wrong owned count "
1658 					<< rec_get_n_owned_old(rec)
1659 					<< ", " << own_count << ", rec "
1660 					<< (rec - page);
1661 
1662 				goto func_exit;
1663 			}
1664 
1665 			if (UNIV_UNLIKELY
1666 			    (page_dir_slot_get_rec(slot) != rec)) {
1667 				ib::error() << "Dir slot does not point"
1668 					" to right rec " << (rec - page);
1669 
1670 				goto func_exit;
1671 			}
1672 
1673 			own_count = 0;
1674 
1675 			if (!page_rec_is_supremum(rec)) {
1676 				slot_no++;
1677 				slot = page_dir_get_nth_slot(page, slot_no);
1678 			}
1679 		}
1680 
1681 		if (page_rec_is_supremum(rec)) {
1682 
1683 			break;
1684 		}
1685 
1686 		if (UNIV_UNLIKELY
1687 		    (rec_get_next_offs(rec, FALSE) < FIL_PAGE_DATA
1688 		     || rec_get_next_offs(rec, FALSE) >= srv_page_size)) {
1689 
1690 			ib::error() << "Next record offset nonsensical "
1691 				<< rec_get_next_offs(rec, FALSE) << " for rec "
1692 				<< (rec - page);
1693 
1694 			goto func_exit;
1695 		}
1696 
1697 		count++;
1698 
1699 		if (UNIV_UNLIKELY(count > srv_page_size)) {
1700 			ib::error() << "Page record list appears"
1701 				" to be circular " << count;
1702 			goto func_exit;
1703 		}
1704 
1705 		rec = page_rec_get_next_const(rec);
1706 		own_count++;
1707 	}
1708 
1709 	if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
1710 		ib::error() << "n owned is zero in a supremum rec";
1711 
1712 		goto func_exit;
1713 	}
1714 
1715 	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
1716 		ib::error() <<  "n slots wrong "
1717 			<< slot_no << ", " << (n_slots - 1);
1718 		goto func_exit;
1719 	}
1720 
1721 	if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
1722 			  + PAGE_HEAP_NO_USER_LOW
1723 			  != count + 1)) {
1724 		ib::error() <<  "n recs wrong "
1725 			<< page_header_get_field(page, PAGE_N_RECS)
1726 			+ PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
1727 
1728 		goto func_exit;
1729 	}
1730 
1731 	/* Check then the free list */
1732 	rec = page_header_get_ptr(page, PAGE_FREE);
1733 
1734 	while (rec != NULL) {
1735 		if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
1736 				  || rec >= page + srv_page_size)) {
1737 			ib::error() << "Free list record has"
1738 				" a nonsensical offset " << (rec - page);
1739 
1740 			goto func_exit;
1741 		}
1742 
1743 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
1744 			ib::error() << "Free list record " << (rec - page)
1745 				<< " is above rec heap top "
1746 				<< (rec_heap_top - page);
1747 
1748 			goto func_exit;
1749 		}
1750 
1751 		count++;
1752 
1753 		if (UNIV_UNLIKELY(count > srv_page_size)) {
1754 			ib::error() << "Page free list appears"
1755 				" to be circular " << count;
1756 			goto func_exit;
1757 		}
1758 
1759 		ulint offs = rec_get_next_offs(rec, FALSE);
1760 		if (!offs) {
1761 			break;
1762 		}
1763 		if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
1764 				  || offs >= srv_page_size)) {
1765 			ib::error() << "Page free list is corrupted " << count;
1766 			goto func_exit;
1767 		}
1768 
1769 		rec = page + offs;
1770 	}
1771 
1772 	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
1773 
1774 		ib::error() <<  "N heap is wrong "
1775 			<< page_dir_get_n_heap(page) << ", " << (count + 1);
1776 
1777 		goto func_exit;
1778 	}
1779 
1780 	ret = TRUE;
1781 
1782 func_exit:
1783 	return(ret);
1784 }
1785 
1786 /***************************************************************//**
1787 This function checks the consistency of an index page when we do not
1788 know the index. This is also resilient so that this should never crash
1789 even if the page is total garbage.
1790 @return TRUE if ok */
1791 ibool
page_simple_validate_new(const page_t * page)1792 page_simple_validate_new(
1793 /*=====================*/
1794 	const page_t*	page)	/*!< in: index page in ROW_FORMAT!=REDUNDANT */
1795 {
1796 	const page_dir_slot_t*	slot;
1797 	ulint			slot_no;
1798 	ulint			n_slots;
1799 	const rec_t*		rec;
1800 	const byte*		rec_heap_top;
1801 	ulint			count;
1802 	ulint			own_count;
1803 	ibool			ret	= FALSE;
1804 
1805 	ut_a(page_is_comp(page));
1806 
1807 	/* Check first that the record heap and the directory do not
1808 	overlap. */
1809 
1810 	n_slots = page_dir_get_n_slots(page);
1811 
1812 	if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
1813 		ib::error() << "Nonsensical number of page dir slots: "
1814 			    << n_slots;
1815 		goto func_exit;
1816 	}
1817 
1818 	rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
1819 
1820 	if (UNIV_UNLIKELY(rec_heap_top
1821 			  > page_dir_get_nth_slot(page, n_slots - 1))) {
1822 
1823 		ib::error() << "Record heap and dir overlap on a page,"
1824 			" heap top "
1825 			<< page_header_get_field(page, PAGE_HEAP_TOP)
1826 			<< ", dir " << page_offset(
1827 				page_dir_get_nth_slot(page, n_slots - 1));
1828 
1829 		goto func_exit;
1830 	}
1831 
1832 	/* Validate the record list in a loop checking also that it is
1833 	consistent with the page record directory. */
1834 
1835 	count = 0;
1836 	own_count = 1;
1837 	slot_no = 0;
1838 	slot = page_dir_get_nth_slot(page, slot_no);
1839 
1840 	rec = page_get_infimum_rec(page);
1841 
1842 	for (;;) {
1843 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
1844 
1845 			ib::error() << "Record " << page_offset(rec)
1846 				<< " is above rec heap top "
1847 				<< page_offset(rec_heap_top);
1848 
1849 			goto func_exit;
1850 		}
1851 
1852 		if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) != 0)) {
1853 			/* This is a record pointed to by a dir slot */
1854 			if (UNIV_UNLIKELY(rec_get_n_owned_new(rec)
1855 					  != own_count)) {
1856 
1857 				ib::error() << "Wrong owned count "
1858 					<< rec_get_n_owned_new(rec) << ", "
1859 					<< own_count << ", rec "
1860 					<< page_offset(rec);
1861 
1862 				goto func_exit;
1863 			}
1864 
1865 			if (UNIV_UNLIKELY
1866 			    (page_dir_slot_get_rec(slot) != rec)) {
1867 				ib::error() << "Dir slot does not point"
1868 					" to right rec " << page_offset(rec);
1869 
1870 				goto func_exit;
1871 			}
1872 
1873 			own_count = 0;
1874 
1875 			if (!page_rec_is_supremum(rec)) {
1876 				slot_no++;
1877 				slot = page_dir_get_nth_slot(page, slot_no);
1878 			}
1879 		}
1880 
1881 		if (page_rec_is_supremum(rec)) {
1882 
1883 			break;
1884 		}
1885 
1886 		if (UNIV_UNLIKELY
1887 		    (rec_get_next_offs(rec, TRUE) < FIL_PAGE_DATA
1888 		     || rec_get_next_offs(rec, TRUE) >= srv_page_size)) {
1889 
1890 			ib::error() << "Next record offset nonsensical "
1891 				<< rec_get_next_offs(rec, TRUE)
1892 				<< " for rec " << page_offset(rec);
1893 
1894 			goto func_exit;
1895 		}
1896 
1897 		count++;
1898 
1899 		if (UNIV_UNLIKELY(count > srv_page_size)) {
1900 			ib::error() << "Page record list appears to be"
1901 				" circular " << count;
1902 			goto func_exit;
1903 		}
1904 
1905 		rec = page_rec_get_next_const(rec);
1906 		own_count++;
1907 	}
1908 
1909 	if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
1910 		ib::error() << "n owned is zero in a supremum rec";
1911 
1912 		goto func_exit;
1913 	}
1914 
1915 	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
1916 		ib::error() << "n slots wrong " << slot_no << ", "
1917 			<< (n_slots - 1);
1918 		goto func_exit;
1919 	}
1920 
1921 	if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
1922 			  + PAGE_HEAP_NO_USER_LOW
1923 			  != count + 1)) {
1924 		ib::error() << "n recs wrong "
1925 			<< page_header_get_field(page, PAGE_N_RECS)
1926 			+ PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
1927 
1928 		goto func_exit;
1929 	}
1930 
1931 	/* Check then the free list */
1932 	rec = page_header_get_ptr(page, PAGE_FREE);
1933 
1934 	while (rec != NULL) {
1935 		if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
1936 				  || rec >= page + srv_page_size)) {
1937 
1938 			ib::error() << "Free list record has"
1939 				" a nonsensical offset " << page_offset(rec);
1940 
1941 			goto func_exit;
1942 		}
1943 
1944 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
1945 			ib::error() << "Free list record " << page_offset(rec)
1946 				<< " is above rec heap top "
1947 				<< page_offset(rec_heap_top);
1948 
1949 			goto func_exit;
1950 		}
1951 
1952 		count++;
1953 
1954 		if (UNIV_UNLIKELY(count > srv_page_size)) {
1955 			ib::error() << "Page free list appears to be"
1956 				" circular " << count;
1957 			goto func_exit;
1958 		}
1959 
1960 		const ulint offs = rec_get_next_offs(rec, TRUE);
1961 		if (!offs) {
1962 			break;
1963 		}
1964 		if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
1965 				  || offs >= srv_page_size)) {
1966 			ib::error() << "Page free list is corrupted " << count;
1967 			goto func_exit;
1968 		}
1969 
1970 		rec = page + offs;
1971 	}
1972 
1973 	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
1974 
1975 		ib::error() << "N heap is wrong "
1976 			<< page_dir_get_n_heap(page) << ", " << (count + 1);
1977 
1978 		goto func_exit;
1979 	}
1980 
1981 	ret = TRUE;
1982 
1983 func_exit:
1984 	return(ret);
1985 }
1986 
1987 /** Check the consistency of an index page.
1988 @param[in]	page	index page
1989 @param[in]	index	B-tree or R-tree index
1990 @return	whether the page is valid */
page_validate(const page_t * page,const dict_index_t * index)1991 bool page_validate(const page_t* page, const dict_index_t* index)
1992 {
1993 	const page_dir_slot_t*	slot;
1994 	const rec_t*		rec;
1995 	const rec_t*		old_rec		= NULL;
1996 	const rec_t*		first_rec	= NULL;
1997 	ulint			offs = 0;
1998 	ulint			n_slots;
1999 	ibool			ret		= TRUE;
2000 	ulint			i;
2001 	rec_offs		offsets_1[REC_OFFS_NORMAL_SIZE];
2002 	rec_offs		offsets_2[REC_OFFS_NORMAL_SIZE];
2003 	rec_offs*		offsets		= offsets_1;
2004 	rec_offs*		old_offsets	= offsets_2;
2005 
2006 	rec_offs_init(offsets_1);
2007 	rec_offs_init(offsets_2);
2008 
2009 #ifdef UNIV_GIS_DEBUG
2010 	if (dict_index_is_spatial(index)) {
2011 		fprintf(stderr, "Page no: %lu\n", page_get_page_no(page));
2012 	}
2013 #endif /* UNIV_DEBUG */
2014 
2015 	if (UNIV_UNLIKELY((ibool) !!page_is_comp(page)
2016 			  != dict_table_is_comp(index->table))) {
2017 		ib::error() << "'compact format' flag mismatch";
2018 func_exit2:
2019 		ib::error() << "Apparent corruption in space "
2020 			    << page_get_space_id(page) << " page "
2021 			    << page_get_page_no(page)
2022 			    << " of index " << index->name
2023 			    << " of table " << index->table->name;
2024 		return FALSE;
2025 	}
2026 
2027 	if (page_is_comp(page)) {
2028 		if (UNIV_UNLIKELY(!page_simple_validate_new(page))) {
2029 			goto func_exit2;
2030 		}
2031 	} else {
2032 		if (UNIV_UNLIKELY(!page_simple_validate_old(page))) {
2033 			goto func_exit2;
2034 		}
2035 	}
2036 
2037 	/* Multiple transactions cannot simultaneously operate on the
2038 	same temp-table in parallel.
2039 	max_trx_id is ignored for temp tables because it not required
2040 	for MVCC. */
2041 	if (!page_is_leaf(page) || page_is_empty(page)
2042 	    || !dict_index_is_sec_or_ibuf(index)
2043 	    || index->table->is_temporary()) {
2044 	} else if (trx_id_t sys_max_trx_id = trx_sys.get_max_trx_id()) {
2045 		trx_id_t	max_trx_id	= page_get_max_trx_id(page);
2046 
2047 		if (max_trx_id == 0 || max_trx_id > sys_max_trx_id) {
2048 			ib::error() << "PAGE_MAX_TRX_ID out of bounds: "
2049 				<< max_trx_id << ", " << sys_max_trx_id;
2050 			ret = FALSE;
2051 		}
2052 	} else {
2053 		ut_ad(srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);
2054 	}
2055 
2056 	/* Check first that the record heap and the directory do not
2057 	overlap. */
2058 
2059 	n_slots = page_dir_get_n_slots(page);
2060 
2061 	if (UNIV_UNLIKELY(!(page_header_get_ptr(page, PAGE_HEAP_TOP)
2062 			    <= page_dir_get_nth_slot(page, n_slots - 1)))) {
2063 
2064 		ib::warn() << "Record heap and directory overlap";
2065 		goto func_exit2;
2066 	}
2067 
2068 	switch (uint16_t type = fil_page_get_type(page)) {
2069 	case FIL_PAGE_RTREE:
2070 		if (!index->is_spatial()) {
2071 wrong_page_type:
2072 			ib::warn() << "Wrong page type " << type;
2073 			ret = FALSE;
2074 		}
2075 		break;
2076 	case FIL_PAGE_TYPE_INSTANT:
2077 		if (index->is_instant()
2078 		    && page_get_page_no(page) == index->page) {
2079 			break;
2080 		}
2081 		goto wrong_page_type;
2082 	case FIL_PAGE_INDEX:
2083 		if (index->is_spatial()) {
2084 			goto wrong_page_type;
2085 		}
2086 		if (index->is_instant()
2087 		    && page_get_page_no(page) == index->page) {
2088 			goto wrong_page_type;
2089 		}
2090 		break;
2091 	default:
2092 		goto wrong_page_type;
2093 	}
2094 
2095 	/* The following buffer is used to check that the
2096 	records in the page record heap do not overlap */
2097 	mem_heap_t* heap = mem_heap_create(srv_page_size + 200);;
2098 	byte* buf = static_cast<byte*>(mem_heap_zalloc(heap, srv_page_size));
2099 
2100 	/* Validate the record list in a loop checking also that
2101 	it is consistent with the directory. */
2102 	ulint count = 0, data_size = 0, own_count = 1, slot_no = 0;
2103 	ulint info_bits;
2104 	slot_no = 0;
2105 	slot = page_dir_get_nth_slot(page, slot_no);
2106 
2107 	rec = page_get_infimum_rec(page);
2108 
2109 	const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
2110 
2111 	for (;;) {
2112 		offsets = rec_get_offsets(rec, index, offsets, n_core,
2113 					  ULINT_UNDEFINED, &heap);
2114 
2115 		if (page_is_comp(page) && page_rec_is_user_rec(rec)
2116 		    && UNIV_UNLIKELY(rec_get_node_ptr_flag(rec)
2117 				     == page_is_leaf(page))) {
2118 			ib::error() << "'node_ptr' flag mismatch";
2119 			ret = FALSE;
2120 			goto next_rec;
2121 		}
2122 
2123 		if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2124 			ret = FALSE;
2125 			goto next_rec;
2126 		}
2127 
2128 		info_bits = rec_get_info_bits(rec, page_is_comp(page));
2129 		if (info_bits
2130 		    & ~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG)) {
2131 			ib::error() << "info_bits has an incorrect value "
2132 				    << info_bits;
2133 			ret = false;
2134 		}
2135 
2136 		if (rec == first_rec) {
2137 			if (info_bits & REC_INFO_MIN_REC_FLAG) {
2138 				if (page_has_prev(page)) {
2139 					ib::error() << "REC_INFO_MIN_REC_FLAG "
2140 						"is set on non-left page";
2141 					ret = false;
2142 				} else if (!page_is_leaf(page)) {
2143 					/* leftmost node pointer page */
2144 				} else if (!index->is_instant()) {
2145 					ib::error() << "REC_INFO_MIN_REC_FLAG "
2146 						"is set in a leaf-page record";
2147 					ret = false;
2148 				} else if (!(info_bits & REC_INFO_DELETED_FLAG)
2149 					   != !index->table->instant) {
2150 					ib::error() << (index->table->instant
2151 							? "Metadata record "
2152 							"is not delete-marked"
2153 							: "Metadata record "
2154 							"is delete-marked");
2155 					ret = false;
2156 				}
2157 			} else if (!page_has_prev(page)
2158 				   && index->is_instant()) {
2159 				ib::error() << "Metadata record is missing";
2160 				ret = false;
2161 			}
2162 		} else if (info_bits & REC_INFO_MIN_REC_FLAG) {
2163 			ib::error() << "REC_INFO_MIN_REC_FLAG record is not "
2164 				       "first in page";
2165 			ret = false;
2166 		}
2167 
2168 		if (page_is_comp(page)) {
2169 			const rec_comp_status_t status = rec_get_status(rec);
2170 			if (status != REC_STATUS_ORDINARY
2171 			    && status != REC_STATUS_NODE_PTR
2172 			    && status != REC_STATUS_INFIMUM
2173 			    && status != REC_STATUS_SUPREMUM
2174 			    && status != REC_STATUS_INSTANT) {
2175 				ib::error() << "impossible record status "
2176 					    << status;
2177 				ret = false;
2178 			} else if (page_rec_is_infimum(rec)) {
2179 				if (status != REC_STATUS_INFIMUM) {
2180 					ib::error()
2181 						<< "infimum record has status "
2182 						<< status;
2183 					ret = false;
2184 				}
2185 			} else if (page_rec_is_supremum(rec)) {
2186 				if (status != REC_STATUS_SUPREMUM) {
2187 					ib::error() << "supremum record has "
2188 						       "status "
2189 						    << status;
2190 					ret = false;
2191 				}
2192 			} else if (!page_is_leaf(page)) {
2193 				if (status != REC_STATUS_NODE_PTR) {
2194 					ib::error() << "node ptr record has "
2195 						       "status "
2196 						    << status;
2197 					ret = false;
2198 				}
2199 			} else if (!index->is_instant()
2200 				   && status == REC_STATUS_INSTANT) {
2201 				ib::error() << "instantly added record in a "
2202 					       "non-instant index";
2203 				ret = false;
2204 			}
2205 		}
2206 
2207 		/* Check that the records are in the ascending order */
2208 		if (count >= PAGE_HEAP_NO_USER_LOW
2209 		    && !page_rec_is_supremum(rec)) {
2210 
2211 			int	ret = cmp_rec_rec(
2212 				rec, old_rec, offsets, old_offsets, index);
2213 
2214 			/* For spatial index, on nonleaf leavel, we
2215 			allow recs to be equal. */
2216 			if (ret <= 0 && !(ret == 0 && index->is_spatial()
2217 					  && !page_is_leaf(page))) {
2218 
2219 				ib::error() << "Records in wrong order";
2220 
2221 				fputs("\nInnoDB: previous record ", stderr);
2222 				/* For spatial index, print the mbr info.*/
2223 				if (index->type & DICT_SPATIAL) {
2224 					putc('\n', stderr);
2225 					rec_print_mbr_rec(stderr,
2226 						old_rec, old_offsets);
2227 					fputs("\nInnoDB: record ", stderr);
2228 					putc('\n', stderr);
2229 					rec_print_mbr_rec(stderr, rec, offsets);
2230 					putc('\n', stderr);
2231 					putc('\n', stderr);
2232 
2233 				} else {
2234 					rec_print_new(stderr, old_rec, old_offsets);
2235 					fputs("\nInnoDB: record ", stderr);
2236 					rec_print_new(stderr, rec, offsets);
2237 					putc('\n', stderr);
2238 				}
2239 
2240 				ret = FALSE;
2241 			}
2242 		}
2243 
2244 		if (page_rec_is_user_rec(rec)) {
2245 
2246 			data_size += rec_offs_size(offsets);
2247 
2248 #if defined(UNIV_GIS_DEBUG)
2249 			/* For spatial index, print the mbr info.*/
2250 			if (index->type & DICT_SPATIAL) {
2251 				rec_print_mbr_rec(stderr, rec, offsets);
2252 				putc('\n', stderr);
2253 			}
2254 #endif /* UNIV_GIS_DEBUG */
2255 		}
2256 
2257 		offs = page_offset(rec_get_start(rec, offsets));
2258 		i = rec_offs_size(offsets);
2259 		if (UNIV_UNLIKELY(offs + i >= srv_page_size)) {
2260 			ib::error() << "Record offset out of bounds: "
2261 				    << offs << '+' << i;
2262 			ret = FALSE;
2263 			goto next_rec;
2264 		}
2265 		while (i--) {
2266 			if (UNIV_UNLIKELY(buf[offs + i])) {
2267 				ib::error() << "Record overlaps another: "
2268 					    << offs << '+' << i;
2269 				ret = FALSE;
2270 				break;
2271 			}
2272 			buf[offs + i] = 1;
2273 		}
2274 
2275 		if (ulint rec_own_count = page_is_comp(page)
2276 		    ? rec_get_n_owned_new(rec)
2277 		    : rec_get_n_owned_old(rec)) {
2278 			/* This is a record pointed to by a dir slot */
2279 			if (UNIV_UNLIKELY(rec_own_count != own_count)) {
2280 				ib::error() << "Wrong owned count at " << offs
2281 					    << ": " << rec_own_count
2282 					    << ", " << own_count;
2283 				ret = FALSE;
2284 			}
2285 
2286 			if (page_dir_slot_get_rec(slot) != rec) {
2287 				ib::error() << "Dir slot does not"
2288 					" point to right rec at " << offs;
2289 				ret = FALSE;
2290 			}
2291 
2292 			if (ret) {
2293 				page_dir_slot_check(slot);
2294 			}
2295 
2296 			own_count = 0;
2297 			if (!page_rec_is_supremum(rec)) {
2298 				slot_no++;
2299 				slot = page_dir_get_nth_slot(page, slot_no);
2300 			}
2301 		}
2302 
2303 next_rec:
2304 		if (page_rec_is_supremum(rec)) {
2305 			break;
2306 		}
2307 
2308 		count++;
2309 		own_count++;
2310 		old_rec = rec;
2311 		rec = page_rec_get_next_const(rec);
2312 
2313 		if (page_rec_is_infimum(old_rec)
2314 		    && page_rec_is_user_rec(rec)) {
2315 			first_rec = rec;
2316 		}
2317 
2318 		/* set old_offsets to offsets; recycle offsets */
2319 		std::swap(old_offsets, offsets);
2320 	}
2321 
2322 	if (page_is_comp(page)) {
2323 		if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
2324 
2325 			goto n_owned_zero;
2326 		}
2327 	} else if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
2328 n_owned_zero:
2329 		ib::error() <<  "n owned is zero at " << offs;
2330 		ret = FALSE;
2331 	}
2332 
2333 	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2334 		ib::error() << "n slots wrong " << slot_no << " "
2335 			<< (n_slots - 1);
2336 		ret = FALSE;
2337 	}
2338 
2339 	if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
2340 			  + PAGE_HEAP_NO_USER_LOW
2341 			  != count + 1)) {
2342 		ib::error() << "n recs wrong "
2343 			<< page_header_get_field(page, PAGE_N_RECS)
2344 			+ PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2345 		ret = FALSE;
2346 	}
2347 
2348 	if (UNIV_UNLIKELY(data_size != page_get_data_size(page))) {
2349 		ib::error() << "Summed data size " << data_size
2350 			<< ", returned by func " << page_get_data_size(page);
2351 		ret = FALSE;
2352 	}
2353 
2354 	/* Check then the free list */
2355 	rec = page_header_get_ptr(page, PAGE_FREE);
2356 
2357 	while (rec != NULL) {
2358 		offsets = rec_get_offsets(rec, index, offsets, n_core,
2359 					  ULINT_UNDEFINED, &heap);
2360 		if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2361 			ret = FALSE;
2362 next_free:
2363 			const ulint offs = rec_get_next_offs(
2364 				rec, page_is_comp(page));
2365 			if (!offs) {
2366 				break;
2367 			}
2368 			if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
2369 					  || offs >= srv_page_size)) {
2370 				ib::error() << "Page free list is corrupted";
2371 				ret = FALSE;
2372 				break;
2373 			}
2374 
2375 			rec = page + offs;
2376 			continue;
2377 		}
2378 
2379 		count++;
2380 		offs = page_offset(rec_get_start(rec, offsets));
2381 		i = rec_offs_size(offsets);
2382 		if (UNIV_UNLIKELY(offs + i >= srv_page_size)) {
2383 			ib::error() << "Free record offset out of bounds: "
2384 				    << offs << '+' << i;
2385 			ret = FALSE;
2386 			goto next_free;
2387 		}
2388 		while (i--) {
2389 			if (UNIV_UNLIKELY(buf[offs + i])) {
2390 				ib::error() << "Free record overlaps another: "
2391 					    << offs << '+' << i;
2392 				ret = FALSE;
2393 				break;
2394 			}
2395 			buf[offs + i] = 1;
2396 		}
2397 
2398 		goto next_free;
2399 	}
2400 
2401 	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2402 		ib::error() << "N heap is wrong "
2403 			<< page_dir_get_n_heap(page) << " " << count + 1;
2404 		ret = FALSE;
2405 	}
2406 
2407 	mem_heap_free(heap);
2408 
2409 	if (UNIV_UNLIKELY(!ret)) {
2410 		goto func_exit2;
2411 	}
2412 
2413 	return(ret);
2414 }
2415 
2416 /***************************************************************//**
2417 Looks in the page record list for a record with the given heap number.
2418 @return record, NULL if not found */
2419 const rec_t*
page_find_rec_with_heap_no(const page_t * page,ulint heap_no)2420 page_find_rec_with_heap_no(
2421 /*=======================*/
2422 	const page_t*	page,	/*!< in: index page */
2423 	ulint		heap_no)/*!< in: heap number */
2424 {
2425 	const rec_t*	rec;
2426 
2427 	if (page_is_comp(page)) {
2428 		rec = page + PAGE_NEW_INFIMUM;
2429 
2430 		for (;;) {
2431 			ulint	rec_heap_no = rec_get_heap_no_new(rec);
2432 
2433 			if (rec_heap_no == heap_no) {
2434 
2435 				return(rec);
2436 			} else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2437 
2438 				return(NULL);
2439 			}
2440 
2441 			rec = page + rec_get_next_offs(rec, TRUE);
2442 		}
2443 	} else {
2444 		rec = page + PAGE_OLD_INFIMUM;
2445 
2446 		for (;;) {
2447 			ulint	rec_heap_no = rec_get_heap_no_old(rec);
2448 
2449 			if (rec_heap_no == heap_no) {
2450 
2451 				return(rec);
2452 			} else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2453 
2454 				return(NULL);
2455 			}
2456 
2457 			rec = page + rec_get_next_offs(rec, FALSE);
2458 		}
2459 	}
2460 }
2461 
2462 /** Get the last non-delete-marked record on a page.
2463 @param[in]	page	index tree leaf page
2464 @return the last record, not delete-marked
2465 @retval infimum record if all records are delete-marked */
2466 const rec_t*
page_find_rec_max_not_deleted(const page_t * page)2467 page_find_rec_max_not_deleted(
2468 	const page_t*	page)
2469 {
2470 	const rec_t*	rec = page_get_infimum_rec(page);
2471 	const rec_t*	prev_rec = NULL; // remove warning
2472 
2473 	/* Because the page infimum is never delete-marked
2474 	and never the metadata pseudo-record (MIN_REC_FLAG)),
2475 	prev_rec will always be assigned to it first. */
2476 	ut_ad(!rec_get_info_bits(rec, page_rec_is_comp(rec)));
2477 	ut_ad(page_is_leaf(page));
2478 
2479 	if (page_is_comp(page)) {
2480 		do {
2481 			if (!(rec[-REC_NEW_INFO_BITS]
2482 			      & (REC_INFO_DELETED_FLAG
2483 				 | REC_INFO_MIN_REC_FLAG))) {
2484 				prev_rec = rec;
2485 			}
2486 			rec = page_rec_get_next_low(rec, true);
2487 		} while (rec != page + PAGE_NEW_SUPREMUM);
2488 	} else {
2489 		do {
2490 			if (!(rec[-REC_OLD_INFO_BITS]
2491 			      & (REC_INFO_DELETED_FLAG
2492 				 | REC_INFO_MIN_REC_FLAG))) {
2493 				prev_rec = rec;
2494 			}
2495 			rec = page_rec_get_next_low(rec, false);
2496 		} while (rec != page + PAGE_OLD_SUPREMUM);
2497 	}
2498 	return(prev_rec);
2499 }
2500