1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2017, 2021, MariaDB Corporation.
6 
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10 
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 
19 *****************************************************************************/
20 
21 /**************************************************//**
22 @file page/page0page.cc
23 Index page routines
24 
25 Created 2/2/1994 Heikki Tuuri
26 *******************************************************/
27 
28 #include "page0page.h"
29 #include "page0cur.h"
30 #include "page0zip.h"
31 #include "buf0buf.h"
32 #include "buf0checksum.h"
33 #include "btr0btr.h"
34 #include "srv0srv.h"
35 #include "lock0lock.h"
36 #include "fut0lst.h"
37 #include "btr0sea.h"
38 #include "trx0sys.h"
39 #include <algorithm>
40 
41 /*			THE INDEX PAGE
42 			==============
43 
44 The index page consists of a page header which contains the page's
45 id and other information. On top of it are the index records
46 in a heap linked into a one way linear list according to alphabetic order.
47 
48 Just below page end is an array of pointers which we call page directory,
49 to about every sixth record in the list. The pointers are placed in
50 the directory in the alphabetical order of the records pointed to,
51 enabling us to make binary search using the array. Each slot n:o I
52 in the directory points to a record, where a 4-bit field contains a count
53 of those records which are in the linear list between pointer I and
54 the pointer I - 1 in the directory, including the record
55 pointed to by pointer I and not including the record pointed to by I - 1.
56 We say that the record pointed to by slot I, or that slot I, owns
57 these records. The count is always kept in the range 4 to 8, with
58 the exception that it is 1 for the first slot, and 1--8 for the second slot.
59 
60 An essentially binary search can be performed in the list of index
61 records, like we could do if we had pointer to every record in the
62 page directory. The data structure is, however, more efficient when
63 we are doing inserts, because most inserts are just pushed on a heap.
64 Only every 8th insert requires block move in the directory pointer
65 table, which itself is quite small. A record is deleted from the page
66 by just taking it off the linear list and updating the number of owned
67 records-field of the record which owns it, and updating the page directory,
68 if necessary. A special case is the one when the record owns itself.
69 Because the overhead of inserts is so small, we may also increase the
70 page size from the projected default of 8 kB to 64 kB without too
71 much loss of efficiency in inserts. Bigger page becomes actual
72 when the disk transfer rate compared to seek and latency time rises.
73 On the present system, the page size is set so that the page transfer
74 time (3 ms) is 20 % of the disk random access time (15 ms).
75 
76 When the page is split, merged, or becomes full but contains deleted
77 records, we have to reorganize the page.
78 
79 Assuming a page size of 8 kB, a typical index page of a secondary
80 index contains 300 index entries, and the size of the page directory
81 is 50 x 4 bytes = 200 bytes. */
82 
83 /***************************************************************//**
84 Looks for the directory slot which owns the given record.
85 @return the directory slot number */
86 ulint
page_dir_find_owner_slot(const rec_t * rec)87 page_dir_find_owner_slot(
88 /*=====================*/
89 	const rec_t*	rec)	/*!< in: the physical record */
90 {
91 	ut_ad(page_rec_check(rec));
92 
93 	const page_t* page = page_align(rec);
94 	const page_dir_slot_t* first_slot = page_dir_get_nth_slot(page, 0);
95 	const page_dir_slot_t* slot = page_dir_get_nth_slot(
96 		page, ulint(page_dir_get_n_slots(page)) - 1);
97 	const rec_t*		r = rec;
98 
99 	if (page_is_comp(page)) {
100 		while (rec_get_n_owned_new(r) == 0) {
101 			r = rec_get_next_ptr_const(r, TRUE);
102 			ut_ad(r >= page + PAGE_NEW_SUPREMUM);
103 			ut_ad(r < page + (srv_page_size - PAGE_DIR));
104 		}
105 	} else {
106 		while (rec_get_n_owned_old(r) == 0) {
107 			r = rec_get_next_ptr_const(r, FALSE);
108 			ut_ad(r >= page + PAGE_OLD_SUPREMUM);
109 			ut_ad(r < page + (srv_page_size - PAGE_DIR));
110 		}
111 	}
112 
113 	uint16 rec_offs_bytes = mach_encode_2(ulint(r - page));
114 
115 	while (UNIV_LIKELY(*(uint16*) slot != rec_offs_bytes)) {
116 
117 		if (UNIV_UNLIKELY(slot == first_slot)) {
118 			ib::error() << "Probable data corruption on page "
119 				<< page_get_page_no(page)
120 				<< ". Original record on that page;";
121 
122 			if (page_is_comp(page)) {
123 				fputs("(compact record)", stderr);
124 			} else {
125 				rec_print_old(stderr, rec);
126 			}
127 
128 			ib::error() << "Cannot find the dir slot for this"
129 				" record on that page;";
130 
131 			if (page_is_comp(page)) {
132 				fputs("(compact record)", stderr);
133 			} else {
134 				rec_print_old(stderr, page
135 					      + mach_decode_2(rec_offs_bytes));
136 			}
137 
138 			ut_error;
139 		}
140 
141 		slot += PAGE_DIR_SLOT_SIZE;
142 	}
143 
144 	return(((ulint) (first_slot - slot)) / PAGE_DIR_SLOT_SIZE);
145 }
146 
147 /**************************************************************//**
148 Used to check the consistency of a directory slot.
149 @return TRUE if succeed */
150 static
151 ibool
page_dir_slot_check(const page_dir_slot_t * slot)152 page_dir_slot_check(
153 /*================*/
154 	const page_dir_slot_t*	slot)	/*!< in: slot */
155 {
156 	const page_t*	page;
157 	ulint		n_slots;
158 	ulint		n_owned;
159 
160 	ut_a(slot);
161 
162 	page = page_align(slot);
163 
164 	n_slots = page_dir_get_n_slots(page);
165 
166 	ut_a(slot <= page_dir_get_nth_slot(page, 0));
167 	ut_a(slot >= page_dir_get_nth_slot(page, n_slots - 1));
168 
169 	ut_a(page_rec_check(page_dir_slot_get_rec(slot)));
170 
171 	if (page_is_comp(page)) {
172 		n_owned = rec_get_n_owned_new(page_dir_slot_get_rec(slot));
173 	} else {
174 		n_owned = rec_get_n_owned_old(page_dir_slot_get_rec(slot));
175 	}
176 
177 	if (slot == page_dir_get_nth_slot(page, 0)) {
178 		ut_a(n_owned == 1);
179 	} else if (slot == page_dir_get_nth_slot(page, n_slots - 1)) {
180 		ut_a(n_owned >= 1);
181 		ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
182 	} else {
183 		ut_a(n_owned >= PAGE_DIR_SLOT_MIN_N_OWNED);
184 		ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
185 	}
186 
187 	return(TRUE);
188 }
189 
190 /*************************************************************//**
191 Sets the max trx id field value. */
192 void
page_set_max_trx_id(buf_block_t * block,page_zip_des_t * page_zip,trx_id_t trx_id,mtr_t * mtr)193 page_set_max_trx_id(
194 /*================*/
195 	buf_block_t*	block,	/*!< in/out: page */
196 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
197 	trx_id_t	trx_id,	/*!< in: transaction id */
198 	mtr_t*		mtr)	/*!< in/out: mini-transaction, or NULL */
199 {
200 	page_t*		page		= buf_block_get_frame(block);
201 	ut_ad(!mtr || mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
202 
203 	/* It is not necessary to write this change to the redo log, as
204 	during a database recovery we assume that the max trx id of every
205 	page is the maximum trx id assigned before the crash. */
206 
207 	if (page_zip) {
208 		mach_write_to_8(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), trx_id);
209 		page_zip_write_header(page_zip,
210 				      page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
211 				      8, mtr);
212 	} else if (mtr) {
213 		mlog_write_ull(page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
214 			       trx_id, mtr);
215 	} else {
216 		mach_write_to_8(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), trx_id);
217 	}
218 }
219 
220 /** Persist the AUTO_INCREMENT value on a clustered index root page.
221 @param[in,out]	block	clustered index root page
222 @param[in]	index	clustered index
223 @param[in]	autoinc	next available AUTO_INCREMENT value
224 @param[in,out]	mtr	mini-transaction
225 @param[in]	reset	whether to reset the AUTO_INCREMENT
226 			to a possibly smaller value than currently
227 			exists in the page */
228 void
page_set_autoinc(buf_block_t * block,const dict_index_t * index MY_ATTRIBUTE ((unused)),ib_uint64_t autoinc,mtr_t * mtr,bool reset)229 page_set_autoinc(
230 	buf_block_t*		block,
231 	const dict_index_t*	index MY_ATTRIBUTE((unused)),
232 	ib_uint64_t		autoinc,
233 	mtr_t*			mtr,
234 	bool			reset)
235 {
236 	ut_ad(mtr_memo_contains_flagged(
237 		      mtr, block, MTR_MEMO_PAGE_X_FIX | MTR_MEMO_PAGE_SX_FIX));
238 	ut_ad(index->is_primary());
239 	ut_ad(index->page == block->page.id.page_no());
240 	ut_ad(index->table->space_id == block->page.id.space());
241 
242 	byte*	field = PAGE_HEADER + PAGE_ROOT_AUTO_INC
243 		+ buf_block_get_frame(block);
244 	if (!reset && mach_read_from_8(field) >= autoinc) {
245 		/* nothing to update */
246 	} else if (page_zip_des_t* page_zip = buf_block_get_page_zip(block)) {
247 		mach_write_to_8(field, autoinc);
248 		page_zip_write_header(page_zip, field, 8, mtr);
249 	} else {
250 		mlog_write_ull(field, autoinc, mtr);
251 	}
252 }
253 
254 /**********************************************************//**
255 Writes a log record of page creation. */
256 UNIV_INLINE
257 void
page_create_write_log(buf_frame_t * frame,mtr_t * mtr,ibool comp,bool is_rtree)258 page_create_write_log(
259 /*==================*/
260 	buf_frame_t*	frame,	/*!< in: a buffer frame where the page is
261 				created */
262 	mtr_t*		mtr,	/*!< in: mini-transaction handle */
263 	ibool		comp,	/*!< in: TRUE=compact page format */
264 	bool		is_rtree) /*!< in: whether it is R-tree */
265 {
266 	mlog_id_t	type;
267 
268 	if (is_rtree) {
269 		type = comp ? MLOG_COMP_PAGE_CREATE_RTREE
270 			    : MLOG_PAGE_CREATE_RTREE;
271 	} else {
272 		type = comp ? MLOG_COMP_PAGE_CREATE : MLOG_PAGE_CREATE;
273 	}
274 
275 	mlog_write_initial_log_record(frame, type, mtr);
276 }
277 
278 /** The page infimum and supremum of an empty page in ROW_FORMAT=REDUNDANT */
279 static const byte infimum_supremum_redundant[] = {
280 	/* the infimum record */
281 	0x08/*end offset*/,
282 	0x01/*n_owned*/,
283 	0x00, 0x00/*heap_no=0*/,
284 	0x03/*n_fields=1, 1-byte offsets*/,
285 	0x00, 0x74/* pointer to supremum */,
286 	'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
287 	/* the supremum record */
288 	0x09/*end offset*/,
289 	0x01/*n_owned*/,
290 	0x00, 0x08/*heap_no=1*/,
291 	0x03/*n_fields=1, 1-byte offsets*/,
292 	0x00, 0x00/* end of record list */,
293 	's', 'u', 'p', 'r', 'e', 'm', 'u', 'm', 0
294 };
295 
296 /** The page infimum and supremum of an empty page in ROW_FORMAT=COMPACT */
297 static const byte infimum_supremum_compact[] = {
298 	/* the infimum record */
299 	0x01/*n_owned=1*/,
300 	0x00, 0x02/* heap_no=0, REC_STATUS_INFIMUM */,
301 	0x00, 0x0d/* pointer to supremum */,
302 	'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
303 	/* the supremum record */
304 	0x01/*n_owned=1*/,
305 	0x00, 0x0b/* heap_no=1, REC_STATUS_SUPREMUM */,
306 	0x00, 0x00/* end of record list */,
307 	's', 'u', 'p', 'r', 'e', 'm', 'u', 'm'
308 };
309 
310 /**********************************************************//**
311 The index page creation function.
312 @return pointer to the page */
313 static
314 page_t*
page_create_low(buf_block_t * block,ulint comp,bool is_rtree)315 page_create_low(
316 /*============*/
317 	buf_block_t*	block,		/*!< in: a buffer block where the
318 					page is created */
319 	ulint		comp,		/*!< in: nonzero=compact page format */
320 	bool		is_rtree)	/*!< in: if it is an R-Tree page */
321 {
322 	page_t*		page;
323 
324 	compile_time_assert(PAGE_BTR_IBUF_FREE_LIST + FLST_BASE_NODE_SIZE
325 			    <= PAGE_DATA);
326 	compile_time_assert(PAGE_BTR_IBUF_FREE_LIST_NODE + FLST_NODE_SIZE
327 			    <= PAGE_DATA);
328 
329 	buf_block_modify_clock_inc(block);
330 
331 	page = buf_block_get_frame(block);
332 
333 	if (is_rtree) {
334 		fil_page_set_type(page, FIL_PAGE_RTREE);
335 	} else {
336 		fil_page_set_type(page, FIL_PAGE_INDEX);
337 	}
338 
339 	memset(page + PAGE_HEADER, 0, PAGE_HEADER_PRIV_END);
340 	page[PAGE_HEADER + PAGE_N_DIR_SLOTS + 1] = 2;
341 	page[PAGE_HEADER + PAGE_INSTANT] = 0;
342 	page[PAGE_HEADER + PAGE_DIRECTION_B] = PAGE_NO_DIRECTION;
343 
344 	if (comp) {
345 		page[PAGE_HEADER + PAGE_N_HEAP] = 0x80;/*page_is_comp()*/
346 		page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
347 		page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_NEW_SUPREMUM_END;
348 		memcpy(page + PAGE_DATA, infimum_supremum_compact,
349 		       sizeof infimum_supremum_compact);
350 		memset(page
351 		       + PAGE_NEW_SUPREMUM_END, 0,
352 		       srv_page_size - PAGE_DIR - PAGE_NEW_SUPREMUM_END);
353 		page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
354 			= PAGE_NEW_SUPREMUM;
355 		page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
356 			= PAGE_NEW_INFIMUM;
357 	} else {
358 		page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
359 		page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_OLD_SUPREMUM_END;
360 		memcpy(page + PAGE_DATA, infimum_supremum_redundant,
361 		       sizeof infimum_supremum_redundant);
362 		memset(page
363 		       + PAGE_OLD_SUPREMUM_END, 0,
364 		       srv_page_size - PAGE_DIR - PAGE_OLD_SUPREMUM_END);
365 		page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
366 			= PAGE_OLD_SUPREMUM;
367 		page[srv_page_size - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
368 			= PAGE_OLD_INFIMUM;
369 	}
370 
371 	return(page);
372 }
373 
374 /** Parses a redo log record of creating a page.
375 @param[in,out]	block	buffer block, or NULL
376 @param[in]	comp	nonzero=compact page format
377 @param[in]	is_rtree whether it is rtree page */
378 void
page_parse_create(buf_block_t * block,ulint comp,bool is_rtree)379 page_parse_create(
380 	buf_block_t*	block,
381 	ulint		comp,
382 	bool		is_rtree)
383 {
384 	if (block != NULL) {
385 		page_create_low(block, comp, is_rtree);
386 	}
387 }
388 
389 /**********************************************************//**
390 Create an uncompressed B-tree or R-tree index page.
391 @return pointer to the page */
392 page_t*
page_create(buf_block_t * block,mtr_t * mtr,ulint comp,bool is_rtree)393 page_create(
394 /*========*/
395 	buf_block_t*	block,		/*!< in: a buffer block where the
396 					page is created */
397 	mtr_t*		mtr,		/*!< in: mini-transaction handle */
398 	ulint		comp,		/*!< in: nonzero=compact page format */
399 	bool		is_rtree)	/*!< in: whether it is a R-Tree page */
400 {
401 	ut_ad(mtr->is_named_space(block->page.id.space()));
402 	page_create_write_log(buf_block_get_frame(block), mtr, comp, is_rtree);
403 	return(page_create_low(block, comp, is_rtree));
404 }
405 
406 /**********************************************************//**
407 Create a compressed B-tree index page.
408 @return pointer to the page */
409 page_t*
page_create_zip(buf_block_t * block,dict_index_t * index,ulint level,trx_id_t max_trx_id,mtr_t * mtr)410 page_create_zip(
411 /*============*/
412 	buf_block_t*		block,		/*!< in/out: a buffer frame
413 						where the page is created */
414 	dict_index_t*		index,		/*!< in: the index of the
415 						page */
416 	ulint			level,		/*!< in: the B-tree level
417 						of the page */
418 	trx_id_t		max_trx_id,	/*!< in: PAGE_MAX_TRX_ID */
419 	mtr_t*			mtr)		/*!< in/out: mini-transaction
420 						handle */
421 {
422 	page_t*			page;
423 	page_zip_des_t*		page_zip = buf_block_get_page_zip(block);
424 
425 	ut_ad(block);
426 	ut_ad(page_zip);
427 	ut_ad(dict_table_is_comp(index->table));
428 
429 	/* PAGE_MAX_TRX_ID or PAGE_ROOT_AUTO_INC are always 0 for
430 	temporary tables. */
431 	ut_ad(max_trx_id == 0 || !index->table->is_temporary());
432 	/* In secondary indexes and the change buffer, PAGE_MAX_TRX_ID
433 	must be zero on non-leaf pages. max_trx_id can be 0 when the
434 	index consists of an empty root (leaf) page. */
435 	ut_ad(max_trx_id == 0
436 	      || level == 0
437 	      || !dict_index_is_sec_or_ibuf(index)
438 	      || index->table->is_temporary());
439 	/* In the clustered index, PAGE_ROOT_AUTOINC or
440 	PAGE_MAX_TRX_ID must be 0 on other pages than the root. */
441 	ut_ad(level == 0 || max_trx_id == 0
442 	      || !dict_index_is_sec_or_ibuf(index)
443 	      || index->table->is_temporary());
444 
445 	page = page_create_low(block, TRUE, dict_index_is_spatial(index));
446 	mach_write_to_2(PAGE_HEADER + PAGE_LEVEL + page, level);
447 	mach_write_to_8(PAGE_HEADER + PAGE_MAX_TRX_ID + page, max_trx_id);
448 
449 	if (!page_zip_compress(page_zip, page, index, page_zip_level, mtr)) {
450 		/* The compression of a newly created
451 		page should always succeed. */
452 		ut_error;
453 	}
454 
455 	return(page);
456 }
457 
458 /**********************************************************//**
459 Empty a previously created B-tree index page. */
460 void
page_create_empty(buf_block_t * block,dict_index_t * index,mtr_t * mtr)461 page_create_empty(
462 /*==============*/
463 	buf_block_t*	block,	/*!< in/out: B-tree block */
464 	dict_index_t*	index,	/*!< in: the index of the page */
465 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
466 {
467 	trx_id_t	max_trx_id;
468 	page_t*		page	= buf_block_get_frame(block);
469 	page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
470 
471 	ut_ad(fil_page_index_page_check(page));
472 	ut_ad(!index->is_dummy);
473 	ut_ad(block->page.id.space() == index->table->space->id);
474 
475 	/* Multiple transactions cannot simultaneously operate on the
476 	same temp-table in parallel.
477 	max_trx_id is ignored for temp tables because it not required
478 	for MVCC. */
479 	if (dict_index_is_sec_or_ibuf(index)
480 	    && !index->table->is_temporary()
481 	    && page_is_leaf(page)) {
482 		max_trx_id = page_get_max_trx_id(page);
483 		ut_ad(max_trx_id);
484 	} else if (block->page.id.page_no() == index->page) {
485 		/* Preserve PAGE_ROOT_AUTO_INC. */
486 		max_trx_id = page_get_max_trx_id(page);
487 	} else {
488 		max_trx_id = 0;
489 	}
490 
491 	if (page_zip) {
492 		ut_ad(!index->table->is_temporary());
493 		page_create_zip(block, index,
494 				page_header_get_field(page, PAGE_LEVEL),
495 				max_trx_id, mtr);
496 	} else {
497 		page_create(block, mtr, page_is_comp(page),
498 			    dict_index_is_spatial(index));
499 
500 		if (max_trx_id) {
501 			mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID + page,
502 				       max_trx_id, mtr);
503 		}
504 	}
505 }
506 
507 /*************************************************************//**
508 Differs from page_copy_rec_list_end, because this function does not
509 touch the lock table and max trx id on page or compress the page.
510 
511 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
512 if new_block is a compressed leaf page in a secondary index.
513 This has to be done either within the same mini-transaction,
514 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
515 void
page_copy_rec_list_end_no_locks(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)516 page_copy_rec_list_end_no_locks(
517 /*============================*/
518 	buf_block_t*	new_block,	/*!< in: index page to copy to */
519 	buf_block_t*	block,		/*!< in: index page of rec */
520 	rec_t*		rec,		/*!< in: record on page */
521 	dict_index_t*	index,		/*!< in: record descriptor */
522 	mtr_t*		mtr)		/*!< in: mtr */
523 {
524 	page_t*		new_page	= buf_block_get_frame(new_block);
525 	page_cur_t	cur1;
526 	rec_t*		cur2;
527 	mem_heap_t*	heap		= NULL;
528 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
529 	rec_offs*	offsets		= offsets_;
530 	rec_offs_init(offsets_);
531 
532 	page_cur_position(rec, block, &cur1);
533 
534 	if (page_cur_is_before_first(&cur1)) {
535 
536 		page_cur_move_to_next(&cur1);
537 	}
538 
539 	btr_assert_not_corrupted(new_block, index);
540 	ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
541 	ut_a(mach_read_from_2(new_page + srv_page_size - 10) == (ulint)
542 	     (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
543 	const ulint n_core = page_is_leaf(block->frame)
544 		? index->n_core_fields : 0;
545 
546 	cur2 = page_get_infimum_rec(buf_block_get_frame(new_block));
547 
548 	/* Copy records from the original page to the new page */
549 
550 	while (!page_cur_is_after_last(&cur1)) {
551 		rec_t*	ins_rec;
552 		offsets = rec_get_offsets(cur1.rec, index, offsets, n_core,
553 					  ULINT_UNDEFINED, &heap);
554 		ins_rec = page_cur_insert_rec_low(cur2, index,
555 						  cur1.rec, offsets, mtr);
556 		if (UNIV_UNLIKELY(!ins_rec)) {
557 			ib::fatal() << "Rec offset " << page_offset(rec)
558 				<< ", cur1 offset " << page_offset(cur1.rec)
559 				<< ", cur2 offset " << page_offset(cur2);
560 		}
561 
562 		page_cur_move_to_next(&cur1);
563 		ut_ad(!(rec_get_info_bits(cur1.rec, page_is_comp(new_page))
564 			& REC_INFO_MIN_REC_FLAG));
565 		cur2 = ins_rec;
566 	}
567 
568 	if (UNIV_LIKELY_NULL(heap)) {
569 		mem_heap_free(heap);
570 	}
571 }
572 
573 /*************************************************************//**
574 Copies records from page to new_page, from a given record onward,
575 including that record. Infimum and supremum records are not copied.
576 The records are copied to the start of the record list on new_page.
577 
578 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
579 if new_block is a compressed leaf page in a secondary index.
580 This has to be done either within the same mini-transaction,
581 or by invoking ibuf_reset_free_bits() before mtr_commit().
582 
583 @return pointer to the original successor of the infimum record on
584 new_page, or NULL on zip overflow (new_block will be decompressed) */
585 rec_t*
page_copy_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)586 page_copy_rec_list_end(
587 /*===================*/
588 	buf_block_t*	new_block,	/*!< in/out: index page to copy to */
589 	buf_block_t*	block,		/*!< in: index page containing rec */
590 	rec_t*		rec,		/*!< in: record on page */
591 	dict_index_t*	index,		/*!< in: record descriptor */
592 	mtr_t*		mtr)		/*!< in: mtr */
593 {
594 	page_t*		new_page	= buf_block_get_frame(new_block);
595 	page_zip_des_t*	new_page_zip	= buf_block_get_page_zip(new_block);
596 	page_t*		page		= page_align(rec);
597 	rec_t*		ret		= page_rec_get_next(
598 		page_get_infimum_rec(new_page));
599 	ulint		num_moved	= 0;
600 	rtr_rec_move_t*	rec_move	= NULL;
601 	mem_heap_t*	heap		= NULL;
602 
603 #ifdef UNIV_ZIP_DEBUG
604 	if (new_page_zip) {
605 		page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
606 		ut_a(page_zip);
607 
608 		/* Strict page_zip_validate() may fail here.
609 		Furthermore, btr_compress() may set FIL_PAGE_PREV to
610 		FIL_NULL on new_page while leaving it intact on
611 		new_page_zip.  So, we cannot validate new_page_zip. */
612 		ut_a(page_zip_validate_low(page_zip, page, index, TRUE));
613 	}
614 #endif /* UNIV_ZIP_DEBUG */
615 	ut_ad(buf_block_get_frame(block) == page);
616 	ut_ad(page_is_leaf(page) == page_is_leaf(new_page));
617 	ut_ad(page_is_comp(page) == page_is_comp(new_page));
618 	/* Here, "ret" may be pointing to a user record or the
619 	predefined supremum record. */
620 
621 	mtr_log_t	log_mode = MTR_LOG_NONE;
622 
623 	if (new_page_zip) {
624 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
625 	}
626 
627 	if (page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW) {
628 		page_copy_rec_list_end_to_created_page(new_page, rec,
629 						       index, mtr);
630 	} else {
631 		if (dict_index_is_spatial(index)) {
632 			ulint	max_to_move = page_get_n_recs(
633 						buf_block_get_frame(block));
634 			heap = mem_heap_create(256);
635 
636 			rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
637 					heap,
638 					sizeof (*rec_move) * max_to_move));
639 
640 			/* For spatial index, we need to insert recs one by one
641 			to keep recs ordered. */
642 			rtr_page_copy_rec_list_end_no_locks(new_block,
643 							    block, rec, index,
644 							    heap, rec_move,
645 							    max_to_move,
646 							    &num_moved,
647 							    mtr);
648 		} else {
649 			page_copy_rec_list_end_no_locks(new_block, block, rec,
650 							index, mtr);
651 		}
652 	}
653 
654 	/* Update PAGE_MAX_TRX_ID on the uncompressed page.
655 	Modifications will be redo logged and copied to the compressed
656 	page in page_zip_compress() or page_zip_reorganize() below.
657 	Multiple transactions cannot simultaneously operate on the
658 	same temp-table in parallel.
659 	max_trx_id is ignored for temp tables because it not required
660 	for MVCC. */
661 	if (dict_index_is_sec_or_ibuf(index)
662 	    && page_is_leaf(page)
663 	    && !index->table->is_temporary()) {
664 		page_update_max_trx_id(new_block, NULL,
665 				       page_get_max_trx_id(page), mtr);
666 	}
667 
668 	if (new_page_zip) {
669 		mtr_set_log_mode(mtr, log_mode);
670 
671 		if (!page_zip_compress(new_page_zip, new_page, index,
672 				       page_zip_level, mtr)) {
673 			/* Before trying to reorganize the page,
674 			store the number of preceding records on the page. */
675 			ulint	ret_pos
676 				= page_rec_get_n_recs_before(ret);
677 			/* Before copying, "ret" was the successor of
678 			the predefined infimum record.  It must still
679 			have at least one predecessor (the predefined
680 			infimum record, or a freshly copied record
681 			that is smaller than "ret"). */
682 			ut_a(ret_pos > 0);
683 
684 			if (!page_zip_reorganize(new_block, index, mtr)) {
685 
686 				if (!page_zip_decompress(new_page_zip,
687 							 new_page, FALSE)) {
688 					ut_error;
689 				}
690 				ut_ad(page_validate(new_page, index));
691 
692 				if (heap) {
693 					mem_heap_free(heap);
694 				}
695 
696 				return(NULL);
697 			} else {
698 				/* The page was reorganized:
699 				Seek to ret_pos. */
700 				ret = new_page + PAGE_NEW_INFIMUM;
701 
702 				do {
703 					ret = rec_get_next_ptr(ret, TRUE);
704 				} while (--ret_pos);
705 			}
706 		}
707 	}
708 
709 	/* Update the lock table and possible hash index */
710 
711 	if (dict_table_is_locking_disabled(index->table)) {
712 	} else if (rec_move && dict_index_is_spatial(index)) {
713 		lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
714 	} else {
715 		lock_move_rec_list_end(new_block, block, rec);
716 	}
717 
718 	if (heap) {
719 		mem_heap_free(heap);
720 	}
721 
722 	btr_search_move_or_delete_hash_entries(new_block, block);
723 
724 	return(ret);
725 }
726 
727 /*************************************************************//**
728 Copies records from page to new_page, up to the given record,
729 NOT including that record. Infimum and supremum records are not copied.
730 The records are copied to the end of the record list on new_page.
731 
732 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
733 if new_block is a compressed leaf page in a secondary index.
734 This has to be done either within the same mini-transaction,
735 or by invoking ibuf_reset_free_bits() before mtr_commit().
736 
737 @return pointer to the original predecessor of the supremum record on
738 new_page, or NULL on zip overflow (new_block will be decompressed) */
739 rec_t*
page_copy_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)740 page_copy_rec_list_start(
741 /*=====================*/
742 	buf_block_t*	new_block,	/*!< in/out: index page to copy to */
743 	buf_block_t*	block,		/*!< in: index page containing rec */
744 	rec_t*		rec,		/*!< in: record on page */
745 	dict_index_t*	index,		/*!< in: record descriptor */
746 	mtr_t*		mtr)		/*!< in: mtr */
747 {
748 	ut_ad(page_align(rec) == block->frame);
749 
750 	page_t*		new_page	= buf_block_get_frame(new_block);
751 	page_zip_des_t*	new_page_zip	= buf_block_get_page_zip(new_block);
752 	page_cur_t	cur1;
753 	rec_t*		cur2;
754 	mem_heap_t*	heap		= NULL;
755 	ulint		num_moved	= 0;
756 	rtr_rec_move_t*	rec_move	= NULL;
757 	rec_t*		ret
758 		= page_rec_get_prev(page_get_supremum_rec(new_page));
759 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
760 	rec_offs*	offsets		= offsets_;
761 	rec_offs_init(offsets_);
762 
763 	/* Here, "ret" may be pointing to a user record or the
764 	predefined infimum record. */
765 
766 	if (page_rec_is_infimum(rec)) {
767 		return(ret);
768 	}
769 
770 	mtr_log_t	log_mode = MTR_LOG_NONE;
771 
772 	if (new_page_zip) {
773 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
774 	}
775 
776 	page_cur_set_before_first(block, &cur1);
777 	page_cur_move_to_next(&cur1);
778 
779 	cur2 = ret;
780 
781 	const ulint n_core = page_rec_is_leaf(rec) ? index->n_core_fields : 0;
782 
783 	/* Copy records from the original page to the new page */
784 	if (index->is_spatial()) {
785 		ut_ad(!index->is_instant());
786 		ulint		max_to_move = page_get_n_recs(
787 						buf_block_get_frame(block));
788 		heap = mem_heap_create(256);
789 
790 		rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
791 					heap,
792 					sizeof (*rec_move) * max_to_move));
793 
794 		/* For spatial index, we need to insert recs one by one
795 		to keep recs ordered. */
796 		rtr_page_copy_rec_list_start_no_locks(new_block,
797 						      block, rec, index, heap,
798 						      rec_move, max_to_move,
799 						      &num_moved, mtr);
800 	} else {
801 		while (page_cur_get_rec(&cur1) != rec) {
802 			offsets = rec_get_offsets(cur1.rec, index, offsets,
803 						  n_core,
804 						  ULINT_UNDEFINED, &heap);
805 			cur2 = page_cur_insert_rec_low(cur2, index,
806 						       cur1.rec, offsets, mtr);
807 			ut_a(cur2);
808 
809 			page_cur_move_to_next(&cur1);
810 			ut_ad(!(rec_get_info_bits(cur1.rec,
811 						  page_is_comp(new_page))
812 				& REC_INFO_MIN_REC_FLAG));
813 		}
814 	}
815 
816 	/* Update PAGE_MAX_TRX_ID on the uncompressed page.
817 	Modifications will be redo logged and copied to the compressed
818 	page in page_zip_compress() or page_zip_reorganize() below.
819 	Multiple transactions cannot simultaneously operate on the
820 	same temp-table in parallel.
821 	max_trx_id is ignored for temp tables because it not required
822 	for MVCC. */
823 	if (n_core && dict_index_is_sec_or_ibuf(index)
824 	    && !index->table->is_temporary()) {
825 		page_update_max_trx_id(new_block, NULL,
826 				       page_get_max_trx_id(page_align(rec)),
827 				       mtr);
828 	}
829 
830 	if (new_page_zip) {
831 		mtr_set_log_mode(mtr, log_mode);
832 
833 		DBUG_EXECUTE_IF("page_copy_rec_list_start_compress_fail",
834 				goto zip_reorganize;);
835 
836 		if (!page_zip_compress(new_page_zip, new_page, index,
837 				       page_zip_level, mtr)) {
838 			ulint	ret_pos;
839 #ifndef DBUG_OFF
840 zip_reorganize:
841 #endif /* DBUG_OFF */
842 			/* Before trying to reorganize the page,
843 			store the number of preceding records on the page. */
844 			ret_pos = page_rec_get_n_recs_before(ret);
845 			/* Before copying, "ret" was the predecessor
846 			of the predefined supremum record.  If it was
847 			the predefined infimum record, then it would
848 			still be the infimum, and we would have
849 			ret_pos == 0. */
850 
851 			if (UNIV_UNLIKELY
852 			    (!page_zip_reorganize(new_block, index, mtr))) {
853 
854 				if (UNIV_UNLIKELY
855 				    (!page_zip_decompress(new_page_zip,
856 							  new_page, FALSE))) {
857 					ut_error;
858 				}
859 				ut_ad(page_validate(new_page, index));
860 
861 				if (UNIV_LIKELY_NULL(heap)) {
862 					mem_heap_free(heap);
863 				}
864 
865 				return(NULL);
866 			}
867 
868 			/* The page was reorganized: Seek to ret_pos. */
869 			ret = page_rec_get_nth(new_page, ret_pos);
870 		}
871 	}
872 
873 	/* Update the lock table and possible hash index */
874 
875 	if (dict_table_is_locking_disabled(index->table)) {
876 	} else if (dict_index_is_spatial(index)) {
877 		lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
878 	} else {
879 		lock_move_rec_list_start(new_block, block, rec, ret);
880 	}
881 
882 	if (heap) {
883 		mem_heap_free(heap);
884 	}
885 
886 	btr_search_move_or_delete_hash_entries(new_block, block);
887 
888 	return(ret);
889 }
890 
891 /**********************************************************//**
892 Writes a log record of a record list end or start deletion. */
893 UNIV_INLINE
894 void
page_delete_rec_list_write_log(rec_t * rec,dict_index_t * index,mlog_id_t type,mtr_t * mtr)895 page_delete_rec_list_write_log(
896 /*===========================*/
897 	rec_t*		rec,	/*!< in: record on page */
898 	dict_index_t*	index,	/*!< in: record descriptor */
899 	mlog_id_t	type,	/*!< in: operation type:
900 				MLOG_LIST_END_DELETE, ... */
901 	mtr_t*		mtr)	/*!< in: mtr */
902 {
903 	byte*	log_ptr;
904 	ut_ad(type == MLOG_LIST_END_DELETE
905 	      || type == MLOG_LIST_START_DELETE
906 	      || type == MLOG_COMP_LIST_END_DELETE
907 	      || type == MLOG_COMP_LIST_START_DELETE);
908 
909 	log_ptr = mlog_open_and_write_index(mtr, rec, index, type, 2);
910 	if (log_ptr) {
911 		/* Write the parameter as a 2-byte ulint */
912 		mach_write_to_2(log_ptr, page_offset(rec));
913 		mlog_close(mtr, log_ptr + 2);
914 	}
915 }
916 
917 /**********************************************************//**
918 Parses a log record of a record list end or start deletion.
919 @return end of log record or NULL */
920 byte*
page_parse_delete_rec_list(mlog_id_t type,byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)921 page_parse_delete_rec_list(
922 /*=======================*/
923 	mlog_id_t	type,	/*!< in: MLOG_LIST_END_DELETE,
924 				MLOG_LIST_START_DELETE,
925 				MLOG_COMP_LIST_END_DELETE or
926 				MLOG_COMP_LIST_START_DELETE */
927 	byte*		ptr,	/*!< in: buffer */
928 	byte*		end_ptr,/*!< in: buffer end */
929 	buf_block_t*	block,	/*!< in/out: buffer block or NULL */
930 	dict_index_t*	index,	/*!< in: record descriptor */
931 	mtr_t*		mtr)	/*!< in: mtr or NULL */
932 {
933 	page_t*	page;
934 	ulint	offset;
935 
936 	ut_ad(type == MLOG_LIST_END_DELETE
937 	      || type == MLOG_LIST_START_DELETE
938 	      || type == MLOG_COMP_LIST_END_DELETE
939 	      || type == MLOG_COMP_LIST_START_DELETE);
940 
941 	/* Read the record offset as a 2-byte ulint */
942 
943 	if (end_ptr < ptr + 2) {
944 
945 		return(NULL);
946 	}
947 
948 	offset = mach_read_from_2(ptr);
949 	ptr += 2;
950 
951 	if (!block) {
952 
953 		return(ptr);
954 	}
955 
956 	page = buf_block_get_frame(block);
957 
958 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
959 
960 	if (type == MLOG_LIST_END_DELETE
961 	    || type == MLOG_COMP_LIST_END_DELETE) {
962 		page_delete_rec_list_end(page + offset, block, index,
963 					 ULINT_UNDEFINED, ULINT_UNDEFINED,
964 					 mtr);
965 	} else {
966 		page_delete_rec_list_start(page + offset, block, index, mtr);
967 	}
968 
969 	return(ptr);
970 }
971 
972 /*************************************************************//**
973 Deletes records from a page from a given record onward, including that record.
974 The infimum and supremum records are not deleted. */
975 void
page_delete_rec_list_end(rec_t * rec,buf_block_t * block,dict_index_t * index,ulint n_recs,ulint size,mtr_t * mtr)976 page_delete_rec_list_end(
977 /*=====================*/
978 	rec_t*		rec,	/*!< in: pointer to record on page */
979 	buf_block_t*	block,	/*!< in: buffer block of the page */
980 	dict_index_t*	index,	/*!< in: record descriptor */
981 	ulint		n_recs,	/*!< in: number of records to delete,
982 				or ULINT_UNDEFINED if not known */
983 	ulint		size,	/*!< in: the sum of the sizes of the
984 				records in the end of the chain to
985 				delete, or ULINT_UNDEFINED if not known */
986 	mtr_t*		mtr)	/*!< in: mtr */
987 {
988 	page_dir_slot_t*slot;
989 	ulint		slot_index;
990 	rec_t*		last_rec;
991 	rec_t*		prev_rec;
992 	ulint		n_owned;
993 	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
994 	page_t*		page		= page_align(rec);
995 	mem_heap_t*	heap		= NULL;
996 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
997 	rec_offs*	offsets		= offsets_;
998 	rec_offs_init(offsets_);
999 
1000 	ut_ad(size == ULINT_UNDEFINED || size < srv_page_size);
1001 	ut_ad(!page_zip || page_rec_is_comp(rec));
1002 #ifdef UNIV_ZIP_DEBUG
1003 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1004 #endif /* UNIV_ZIP_DEBUG */
1005 
1006 	if (page_rec_is_supremum(rec)) {
1007 		ut_ad(n_recs == 0 || n_recs == ULINT_UNDEFINED);
1008 		/* Nothing to do, there are no records bigger than the
1009 		page supremum. */
1010 		return;
1011 	}
1012 
1013 	if (recv_recovery_is_on()) {
1014 		/* If we are replaying a redo log record, we must
1015 		replay it exactly. Since MySQL 5.6.11, we should be
1016 		generating a redo log record for page creation if
1017 		the page would become empty. Thus, this branch should
1018 		only be executed when applying redo log that was
1019 		generated by an older version of MySQL. */
1020 	} else if (page_rec_is_infimum(rec)
1021 		   || n_recs == page_get_n_recs(page)) {
1022 delete_all:
1023 		/* We are deleting all records. */
1024 		page_create_empty(block, index, mtr);
1025 		return;
1026 	} else if (page_is_comp(page)) {
1027 		if (page_rec_get_next_low(page + PAGE_NEW_INFIMUM, 1) == rec) {
1028 			/* We are deleting everything from the first
1029 			user record onwards. */
1030 			goto delete_all;
1031 		}
1032 	} else {
1033 		if (page_rec_get_next_low(page + PAGE_OLD_INFIMUM, 0) == rec) {
1034 			/* We are deleting everything from the first
1035 			user record onwards. */
1036 			goto delete_all;
1037 		}
1038 	}
1039 
1040 	/* Reset the last insert info in the page header and increment
1041 	the modify clock for the frame */
1042 
1043 	page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
1044 
1045 	/* The page gets invalid for optimistic searches: increment the
1046 	frame modify clock */
1047 
1048 	buf_block_modify_clock_inc(block);
1049 
1050 	page_delete_rec_list_write_log(rec, index, page_is_comp(page)
1051 				       ? MLOG_COMP_LIST_END_DELETE
1052 				       : MLOG_LIST_END_DELETE, mtr);
1053 
1054 	const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
1055 
1056 	if (page_zip) {
1057 		mtr_log_t	log_mode;
1058 
1059 		ut_a(page_is_comp(page));
1060 		/* Individual deletes are not logged */
1061 
1062 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1063 
1064 		do {
1065 			page_cur_t	cur;
1066 			page_cur_position(rec, block, &cur);
1067 
1068 			offsets = rec_get_offsets(rec, index, offsets, n_core,
1069 						  ULINT_UNDEFINED, &heap);
1070 			rec = rec_get_next_ptr(rec, TRUE);
1071 #ifdef UNIV_ZIP_DEBUG
1072 			ut_a(page_zip_validate(page_zip, page, index));
1073 #endif /* UNIV_ZIP_DEBUG */
1074 			page_cur_delete_rec(&cur, index, offsets, mtr);
1075 		} while (page_offset(rec) != PAGE_NEW_SUPREMUM);
1076 
1077 		if (UNIV_LIKELY_NULL(heap)) {
1078 			mem_heap_free(heap);
1079 		}
1080 
1081 		/* Restore log mode */
1082 
1083 		mtr_set_log_mode(mtr, log_mode);
1084 		return;
1085 	}
1086 
1087 	prev_rec = page_rec_get_prev(rec);
1088 
1089 	last_rec = page_rec_get_prev(page_get_supremum_rec(page));
1090 
1091 	bool scrub = srv_immediate_scrub_data_uncompressed;
1092 	if ((size == ULINT_UNDEFINED) || (n_recs == ULINT_UNDEFINED) ||
1093 	    scrub) {
1094 		rec_t*		rec2		= rec;
1095 		/* Calculate the sum of sizes and the number of records */
1096 		size = 0;
1097 		n_recs = 0;
1098 
1099 		do {
1100 			ulint	s;
1101 			offsets = rec_get_offsets(rec2, index, offsets, n_core,
1102 						  ULINT_UNDEFINED, &heap);
1103 			s = rec_offs_size(offsets);
1104 			ut_ad(ulint(rec2 - page) + s
1105 			      - rec_offs_extra_size(offsets)
1106 			      < srv_page_size);
1107 			ut_ad(size + s < srv_page_size);
1108 			size += s;
1109 			n_recs++;
1110 
1111 			if (scrub) {
1112 				/* scrub record */
1113 				memset(rec2, 0, rec_offs_data_size(offsets));
1114 			}
1115 
1116 			rec2 = page_rec_get_next(rec2);
1117 		} while (!page_rec_is_supremum(rec2));
1118 
1119 		if (UNIV_LIKELY_NULL(heap)) {
1120 			mem_heap_free(heap);
1121 		}
1122 	}
1123 
1124 	ut_ad(size < srv_page_size);
1125 
1126 	/* Update the page directory; there is no need to balance the number
1127 	of the records owned by the supremum record, as it is allowed to be
1128 	less than PAGE_DIR_SLOT_MIN_N_OWNED */
1129 
1130 	if (page_is_comp(page)) {
1131 		rec_t*	rec2	= rec;
1132 		ulint	count	= 0;
1133 
1134 		while (rec_get_n_owned_new(rec2) == 0) {
1135 			count++;
1136 
1137 			rec2 = rec_get_next_ptr(rec2, TRUE);
1138 		}
1139 
1140 		ut_ad(rec_get_n_owned_new(rec2) > count);
1141 
1142 		n_owned = rec_get_n_owned_new(rec2) - count;
1143 		slot_index = page_dir_find_owner_slot(rec2);
1144 		ut_ad(slot_index > 0);
1145 		slot = page_dir_get_nth_slot(page, slot_index);
1146 	} else {
1147 		rec_t*	rec2	= rec;
1148 		ulint	count	= 0;
1149 
1150 		while (rec_get_n_owned_old(rec2) == 0) {
1151 			count++;
1152 
1153 			rec2 = rec_get_next_ptr(rec2, FALSE);
1154 		}
1155 
1156 		ut_ad(rec_get_n_owned_old(rec2) > count);
1157 
1158 		n_owned = rec_get_n_owned_old(rec2) - count;
1159 		slot_index = page_dir_find_owner_slot(rec2);
1160 		ut_ad(slot_index > 0);
1161 		slot = page_dir_get_nth_slot(page, slot_index);
1162 	}
1163 
1164 	page_dir_slot_set_rec(slot, page_get_supremum_rec(page));
1165 	page_dir_slot_set_n_owned(slot, NULL, n_owned);
1166 
1167 	page_dir_set_n_slots(page, NULL, slot_index + 1);
1168 
1169 	/* Remove the record chain segment from the record chain */
1170 	page_rec_set_next(prev_rec, page_get_supremum_rec(page));
1171 
1172 	/* Catenate the deleted chain segment to the page free list */
1173 
1174 	page_rec_set_next(last_rec, page_header_get_ptr(page, PAGE_FREE));
1175 	page_header_set_ptr(page, NULL, PAGE_FREE, rec);
1176 
1177 	page_header_set_field(page, NULL, PAGE_GARBAGE, size
1178 			      + page_header_get_field(page, PAGE_GARBAGE));
1179 
1180 	ut_ad(page_get_n_recs(page) > n_recs);
1181 	page_header_set_field(page, NULL, PAGE_N_RECS,
1182 			      (ulint)(page_get_n_recs(page) - n_recs));
1183 }
1184 
1185 /*************************************************************//**
1186 Deletes records from page, up to the given record, NOT including
1187 that record. Infimum and supremum records are not deleted. */
1188 void
page_delete_rec_list_start(rec_t * rec,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1189 page_delete_rec_list_start(
1190 /*=======================*/
1191 	rec_t*		rec,	/*!< in: record on page */
1192 	buf_block_t*	block,	/*!< in: buffer block of the page */
1193 	dict_index_t*	index,	/*!< in: record descriptor */
1194 	mtr_t*		mtr)	/*!< in: mtr */
1195 {
1196 	page_cur_t	cur1;
1197 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
1198 	rec_offs*	offsets		= offsets_;
1199 	mem_heap_t*	heap		= NULL;
1200 
1201 	rec_offs_init(offsets_);
1202 
1203 	ut_ad(page_align(rec) == block->frame);
1204 	ut_ad((ibool) !!page_rec_is_comp(rec)
1205 	      == dict_table_is_comp(index->table));
1206 #ifdef UNIV_ZIP_DEBUG
1207 	{
1208 		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
1209 		page_t*		page	= buf_block_get_frame(block);
1210 
1211 		/* page_zip_validate() would detect a min_rec_mark mismatch
1212 		in btr_page_split_and_insert()
1213 		between btr_attach_half_pages() and insert_page = ...
1214 		when btr_page_get_split_rec_to_left() holds
1215 		(direction == FSP_DOWN). */
1216 		ut_a(!page_zip
1217 		     || page_zip_validate_low(page_zip, page, index, TRUE));
1218 	}
1219 #endif /* UNIV_ZIP_DEBUG */
1220 
1221 	if (page_rec_is_infimum(rec)) {
1222 		return;
1223 	}
1224 
1225 	if (page_rec_is_supremum(rec)) {
1226 		/* We are deleting all records. */
1227 		page_create_empty(block, index, mtr);
1228 		return;
1229 	}
1230 
1231 	mlog_id_t	type;
1232 
1233 	if (page_rec_is_comp(rec)) {
1234 		type = MLOG_COMP_LIST_START_DELETE;
1235 	} else {
1236 		type = MLOG_LIST_START_DELETE;
1237 	}
1238 
1239 	page_delete_rec_list_write_log(rec, index, type, mtr);
1240 
1241 	page_cur_set_before_first(block, &cur1);
1242 	page_cur_move_to_next(&cur1);
1243 
1244 	/* Individual deletes are not logged */
1245 
1246 	mtr_log_t	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1247 	const ulint	n_core = page_rec_is_leaf(rec)
1248 		? index->n_core_fields : 0;
1249 
1250 	while (page_cur_get_rec(&cur1) != rec) {
1251 		offsets = rec_get_offsets(page_cur_get_rec(&cur1), index,
1252 					  offsets, n_core,
1253 					  ULINT_UNDEFINED, &heap);
1254 		page_cur_delete_rec(&cur1, index, offsets, mtr);
1255 	}
1256 
1257 	if (UNIV_LIKELY_NULL(heap)) {
1258 		mem_heap_free(heap);
1259 	}
1260 
1261 	/* Restore log mode */
1262 
1263 	mtr_set_log_mode(mtr, log_mode);
1264 }
1265 
1266 /*************************************************************//**
1267 Moves record list end to another page. Moved records include
1268 split_rec.
1269 
1270 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1271 if new_block is a compressed leaf page in a secondary index.
1272 This has to be done either within the same mini-transaction,
1273 or by invoking ibuf_reset_free_bits() before mtr_commit().
1274 
1275 @return TRUE on success; FALSE on compression failure (new_block will
1276 be decompressed) */
1277 ibool
page_move_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1278 page_move_rec_list_end(
1279 /*===================*/
1280 	buf_block_t*	new_block,	/*!< in/out: index page where to move */
1281 	buf_block_t*	block,		/*!< in: index page from where to move */
1282 	rec_t*		split_rec,	/*!< in: first record to move */
1283 	dict_index_t*	index,		/*!< in: record descriptor */
1284 	mtr_t*		mtr)		/*!< in: mtr */
1285 {
1286 	page_t*		new_page	= buf_block_get_frame(new_block);
1287 	ulint		old_data_size;
1288 	ulint		new_data_size;
1289 	ulint		old_n_recs;
1290 	ulint		new_n_recs;
1291 
1292 	ut_ad(!dict_index_is_spatial(index));
1293 
1294 	old_data_size = page_get_data_size(new_page);
1295 	old_n_recs = page_get_n_recs(new_page);
1296 #ifdef UNIV_ZIP_DEBUG
1297 	{
1298 		page_zip_des_t*	new_page_zip
1299 			= buf_block_get_page_zip(new_block);
1300 		page_zip_des_t*	page_zip
1301 			= buf_block_get_page_zip(block);
1302 		ut_a(!new_page_zip == !page_zip);
1303 		ut_a(!new_page_zip
1304 		     || page_zip_validate(new_page_zip, new_page, index));
1305 		ut_a(!page_zip
1306 		     || page_zip_validate(page_zip, page_align(split_rec),
1307 					  index));
1308 	}
1309 #endif /* UNIV_ZIP_DEBUG */
1310 
1311 	if (UNIV_UNLIKELY(!page_copy_rec_list_end(new_block, block,
1312 						  split_rec, index, mtr))) {
1313 		return(FALSE);
1314 	}
1315 
1316 	new_data_size = page_get_data_size(new_page);
1317 	new_n_recs = page_get_n_recs(new_page);
1318 
1319 	ut_ad(new_data_size >= old_data_size);
1320 
1321 	page_delete_rec_list_end(split_rec, block, index,
1322 				 new_n_recs - old_n_recs,
1323 				 new_data_size - old_data_size, mtr);
1324 
1325 	return(TRUE);
1326 }
1327 
1328 /*************************************************************//**
1329 Moves record list start to another page. Moved records do not include
1330 split_rec.
1331 
1332 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1333 if new_block is a compressed leaf page in a secondary index.
1334 This has to be done either within the same mini-transaction,
1335 or by invoking ibuf_reset_free_bits() before mtr_commit().
1336 
1337 @return TRUE on success; FALSE on compression failure */
1338 ibool
page_move_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1339 page_move_rec_list_start(
1340 /*=====================*/
1341 	buf_block_t*	new_block,	/*!< in/out: index page where to move */
1342 	buf_block_t*	block,		/*!< in/out: page containing split_rec */
1343 	rec_t*		split_rec,	/*!< in: first record not to move */
1344 	dict_index_t*	index,		/*!< in: record descriptor */
1345 	mtr_t*		mtr)		/*!< in: mtr */
1346 {
1347 	if (UNIV_UNLIKELY(!page_copy_rec_list_start(new_block, block,
1348 						    split_rec, index, mtr))) {
1349 		return(FALSE);
1350 	}
1351 
1352 	page_delete_rec_list_start(split_rec, block, index, mtr);
1353 
1354 	return(TRUE);
1355 }
1356 
1357 /**************************************************************//**
1358 Used to delete n slots from the directory. This function updates
1359 also n_owned fields in the records, so that the first slot after
1360 the deleted ones inherits the records of the deleted slots. */
1361 UNIV_INLINE
1362 void
page_dir_delete_slot(page_t * page,page_zip_des_t * page_zip,ulint slot_no)1363 page_dir_delete_slot(
1364 /*=================*/
1365 	page_t*		page,	/*!< in/out: the index page */
1366 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
1367 	ulint		slot_no)/*!< in: slot to be deleted */
1368 {
1369 	page_dir_slot_t*	slot;
1370 	ulint			n_owned;
1371 	ulint			i;
1372 	ulint			n_slots;
1373 
1374 	ut_ad(!page_zip || page_is_comp(page));
1375 	ut_ad(slot_no > 0);
1376 	ut_ad(slot_no + 1 < page_dir_get_n_slots(page));
1377 
1378 	n_slots = page_dir_get_n_slots(page);
1379 
1380 	/* 1. Reset the n_owned fields of the slots to be
1381 	deleted */
1382 	slot = page_dir_get_nth_slot(page, slot_no);
1383 	n_owned = page_dir_slot_get_n_owned(slot);
1384 	page_dir_slot_set_n_owned(slot, page_zip, 0);
1385 
1386 	/* 2. Update the n_owned value of the first non-deleted slot */
1387 
1388 	slot = page_dir_get_nth_slot(page, slot_no + 1);
1389 	page_dir_slot_set_n_owned(slot, page_zip,
1390 				  n_owned + page_dir_slot_get_n_owned(slot));
1391 
1392 	/* 3. Destroy the slot by copying slots */
1393 	for (i = slot_no + 1; i < n_slots; i++) {
1394 		rec_t*	rec = (rec_t*)
1395 			page_dir_slot_get_rec(page_dir_get_nth_slot(page, i));
1396 		page_dir_slot_set_rec(page_dir_get_nth_slot(page, i - 1), rec);
1397 	}
1398 
1399 	/* 4. Zero out the last slot, which will be removed */
1400 	mach_write_to_2(page_dir_get_nth_slot(page, n_slots - 1), 0);
1401 
1402 	/* 5. Update the page header */
1403 	page_header_set_field(page, page_zip, PAGE_N_DIR_SLOTS, n_slots - 1);
1404 }
1405 
1406 /**************************************************************//**
1407 Used to add n slots to the directory. Does not set the record pointers
1408 in the added slots or update n_owned values: this is the responsibility
1409 of the caller. */
1410 UNIV_INLINE
1411 void
page_dir_add_slot(page_t * page,page_zip_des_t * page_zip,ulint start)1412 page_dir_add_slot(
1413 /*==============*/
1414 	page_t*		page,	/*!< in/out: the index page */
1415 	page_zip_des_t*	page_zip,/*!< in/out: comprssed page, or NULL */
1416 	ulint		start)	/*!< in: the slot above which the new slots
1417 				are added */
1418 {
1419 	page_dir_slot_t*	slot;
1420 	ulint			n_slots;
1421 
1422 	n_slots = page_dir_get_n_slots(page);
1423 
1424 	ut_ad(start < n_slots - 1);
1425 
1426 	/* Update the page header */
1427 	page_dir_set_n_slots(page, page_zip, n_slots + 1);
1428 
1429 	/* Move slots up */
1430 	slot = page_dir_get_nth_slot(page, n_slots);
1431 	memmove(slot, slot + PAGE_DIR_SLOT_SIZE,
1432 		(n_slots - 1 - start) * PAGE_DIR_SLOT_SIZE);
1433 }
1434 
1435 /****************************************************************//**
1436 Splits a directory slot which owns too many records. */
1437 void
page_dir_split_slot(page_t * page,page_zip_des_t * page_zip,ulint slot_no)1438 page_dir_split_slot(
1439 /*================*/
1440 	page_t*		page,	/*!< in/out: index page */
1441 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose
1442 				uncompressed part will be written, or NULL */
1443 	ulint		slot_no)/*!< in: the directory slot */
1444 {
1445 	rec_t*			rec;
1446 	page_dir_slot_t*	new_slot;
1447 	page_dir_slot_t*	prev_slot;
1448 	page_dir_slot_t*	slot;
1449 	ulint			i;
1450 	ulint			n_owned;
1451 
1452 	ut_ad(!page_zip || page_is_comp(page));
1453 	ut_ad(slot_no > 0);
1454 
1455 	slot = page_dir_get_nth_slot(page, slot_no);
1456 
1457 	n_owned = page_dir_slot_get_n_owned(slot);
1458 	ut_ad(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED + 1);
1459 
1460 	/* 1. We loop to find a record approximately in the middle of the
1461 	records owned by the slot. */
1462 
1463 	prev_slot = page_dir_get_nth_slot(page, slot_no - 1);
1464 	rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
1465 
1466 	for (i = 0; i < n_owned / 2; i++) {
1467 		rec = page_rec_get_next(rec);
1468 	}
1469 
1470 	ut_ad(n_owned / 2 >= PAGE_DIR_SLOT_MIN_N_OWNED);
1471 
1472 	/* 2. We add one directory slot immediately below the slot to be
1473 	split. */
1474 
1475 	page_dir_add_slot(page, page_zip, slot_no - 1);
1476 
1477 	/* The added slot is now number slot_no, and the old slot is
1478 	now number slot_no + 1 */
1479 
1480 	new_slot = page_dir_get_nth_slot(page, slot_no);
1481 	slot = page_dir_get_nth_slot(page, slot_no + 1);
1482 
1483 	/* 3. We store the appropriate values to the new slot. */
1484 
1485 	page_dir_slot_set_rec(new_slot, rec);
1486 	page_dir_slot_set_n_owned(new_slot, page_zip, n_owned / 2);
1487 
1488 	/* 4. Finally, we update the number of records field of the
1489 	original slot */
1490 
1491 	page_dir_slot_set_n_owned(slot, page_zip, n_owned - (n_owned / 2));
1492 }
1493 
1494 /*************************************************************//**
1495 Tries to balance the given directory slot with too few records with the upper
1496 neighbor, so that there are at least the minimum number of records owned by
1497 the slot; this may result in the merging of two slots. */
1498 void
page_dir_balance_slot(page_t * page,page_zip_des_t * page_zip,ulint slot_no)1499 page_dir_balance_slot(
1500 /*==================*/
1501 	page_t*		page,	/*!< in/out: index page */
1502 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
1503 	ulint		slot_no)/*!< in: the directory slot */
1504 {
1505 	page_dir_slot_t*	slot;
1506 	page_dir_slot_t*	up_slot;
1507 	ulint			n_owned;
1508 	ulint			up_n_owned;
1509 	rec_t*			old_rec;
1510 	rec_t*			new_rec;
1511 
1512 	ut_ad(!page_zip || page_is_comp(page));
1513 	ut_ad(slot_no > 0);
1514 
1515 	slot = page_dir_get_nth_slot(page, slot_no);
1516 
1517 	/* The last directory slot cannot be balanced with the upper
1518 	neighbor, as there is none. */
1519 
1520 	if (UNIV_UNLIKELY(slot_no + 1 == page_dir_get_n_slots(page))) {
1521 
1522 		return;
1523 	}
1524 
1525 	up_slot = page_dir_get_nth_slot(page, slot_no + 1);
1526 
1527 	n_owned = page_dir_slot_get_n_owned(slot);
1528 	up_n_owned = page_dir_slot_get_n_owned(up_slot);
1529 
1530 	ut_ad(n_owned == PAGE_DIR_SLOT_MIN_N_OWNED - 1);
1531 
1532 	/* If the upper slot has the minimum value of n_owned, we will merge
1533 	the two slots, therefore we assert: */
1534 	ut_ad(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1 <= PAGE_DIR_SLOT_MAX_N_OWNED);
1535 
1536 	if (up_n_owned > PAGE_DIR_SLOT_MIN_N_OWNED) {
1537 
1538 		/* In this case we can just transfer one record owned
1539 		by the upper slot to the property of the lower slot */
1540 		old_rec = (rec_t*) page_dir_slot_get_rec(slot);
1541 
1542 		if (page_is_comp(page)) {
1543 			new_rec = rec_get_next_ptr(old_rec, TRUE);
1544 
1545 			rec_set_n_owned_new(old_rec, page_zip, 0);
1546 			rec_set_n_owned_new(new_rec, page_zip, n_owned + 1);
1547 		} else {
1548 			new_rec = rec_get_next_ptr(old_rec, FALSE);
1549 
1550 			rec_set_n_owned_old(old_rec, 0);
1551 			rec_set_n_owned_old(new_rec, n_owned + 1);
1552 		}
1553 
1554 		page_dir_slot_set_rec(slot, new_rec);
1555 
1556 		page_dir_slot_set_n_owned(up_slot, page_zip, up_n_owned -1);
1557 	} else {
1558 		/* In this case we may merge the two slots */
1559 		page_dir_delete_slot(page, page_zip, slot_no);
1560 	}
1561 }
1562 
1563 /************************************************************//**
1564 Returns the nth record of the record list.
1565 This is the inverse function of page_rec_get_n_recs_before().
1566 @return nth record */
1567 const rec_t*
page_rec_get_nth_const(const page_t * page,ulint nth)1568 page_rec_get_nth_const(
1569 /*===================*/
1570 	const page_t*	page,	/*!< in: page */
1571 	ulint		nth)	/*!< in: nth record */
1572 {
1573 	const page_dir_slot_t*	slot;
1574 	ulint			i;
1575 	ulint			n_owned;
1576 	const rec_t*		rec;
1577 
1578 	if (nth == 0) {
1579 		return(page_get_infimum_rec(page));
1580 	}
1581 
1582 	ut_ad(nth < srv_page_size / (REC_N_NEW_EXTRA_BYTES + 1));
1583 
1584 	for (i = 0;; i++) {
1585 
1586 		slot = page_dir_get_nth_slot(page, i);
1587 		n_owned = page_dir_slot_get_n_owned(slot);
1588 
1589 		if (n_owned > nth) {
1590 			break;
1591 		} else {
1592 			nth -= n_owned;
1593 		}
1594 	}
1595 
1596 	ut_ad(i > 0);
1597 	slot = page_dir_get_nth_slot(page, i - 1);
1598 	rec = page_dir_slot_get_rec(slot);
1599 
1600 	if (page_is_comp(page)) {
1601 		do {
1602 			rec = page_rec_get_next_low(rec, TRUE);
1603 			ut_ad(rec);
1604 		} while (nth--);
1605 	} else {
1606 		do {
1607 			rec = page_rec_get_next_low(rec, FALSE);
1608 			ut_ad(rec);
1609 		} while (nth--);
1610 	}
1611 
1612 	return(rec);
1613 }
1614 
1615 /***************************************************************//**
1616 Returns the number of records before the given record in chain.
1617 The number includes infimum and supremum records.
1618 @return number of records */
1619 ulint
page_rec_get_n_recs_before(const rec_t * rec)1620 page_rec_get_n_recs_before(
1621 /*=======================*/
1622 	const rec_t*	rec)	/*!< in: the physical record */
1623 {
1624 	const page_dir_slot_t*	slot;
1625 	const rec_t*		slot_rec;
1626 	const page_t*		page;
1627 	ulint			i;
1628 	lint			n	= 0;
1629 
1630 	ut_ad(page_rec_check(rec));
1631 
1632 	page = page_align(rec);
1633 	if (page_is_comp(page)) {
1634 		while (rec_get_n_owned_new(rec) == 0) {
1635 
1636 			rec = rec_get_next_ptr_const(rec, TRUE);
1637 			n--;
1638 		}
1639 
1640 		for (i = 0; ; i++) {
1641 			slot = page_dir_get_nth_slot(page, i);
1642 			slot_rec = page_dir_slot_get_rec(slot);
1643 
1644 			n += lint(rec_get_n_owned_new(slot_rec));
1645 
1646 			if (rec == slot_rec) {
1647 
1648 				break;
1649 			}
1650 		}
1651 	} else {
1652 		while (rec_get_n_owned_old(rec) == 0) {
1653 
1654 			rec = rec_get_next_ptr_const(rec, FALSE);
1655 			n--;
1656 		}
1657 
1658 		for (i = 0; ; i++) {
1659 			slot = page_dir_get_nth_slot(page, i);
1660 			slot_rec = page_dir_slot_get_rec(slot);
1661 
1662 			n += lint(rec_get_n_owned_old(slot_rec));
1663 
1664 			if (rec == slot_rec) {
1665 
1666 				break;
1667 			}
1668 		}
1669 	}
1670 
1671 	n--;
1672 
1673 	ut_ad(n >= 0);
1674 	ut_ad((ulong) n < srv_page_size / (REC_N_NEW_EXTRA_BYTES + 1));
1675 
1676 	return((ulint) n);
1677 }
1678 
1679 /************************************************************//**
1680 Prints record contents including the data relevant only in
1681 the index page context. */
1682 void
page_rec_print(const rec_t * rec,const rec_offs * offsets)1683 page_rec_print(
1684 /*===========*/
1685 	const rec_t*	rec,	/*!< in: physical record */
1686 	const rec_offs*	offsets)/*!< in: record descriptor */
1687 {
1688 	ut_a(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
1689 	rec_print_new(stderr, rec, offsets);
1690 	if (page_rec_is_comp(rec)) {
1691 		ib::info() << "n_owned: " << rec_get_n_owned_new(rec)
1692 			<< "; heap_no: " << rec_get_heap_no_new(rec)
1693 			<< "; next rec: " << rec_get_next_offs(rec, TRUE);
1694 	} else {
1695 		ib::info() << "n_owned: " << rec_get_n_owned_old(rec)
1696 			<< "; heap_no: " << rec_get_heap_no_old(rec)
1697 			<< "; next rec: " << rec_get_next_offs(rec, FALSE);
1698 	}
1699 
1700 	page_rec_check(rec);
1701 	rec_validate(rec, offsets);
1702 }
1703 
1704 #ifdef UNIV_BTR_PRINT
1705 /***************************************************************//**
1706 This is used to print the contents of the directory for
1707 debugging purposes. */
1708 void
page_dir_print(page_t * page,ulint pr_n)1709 page_dir_print(
1710 /*===========*/
1711 	page_t*	page,	/*!< in: index page */
1712 	ulint	pr_n)	/*!< in: print n first and n last entries */
1713 {
1714 	ulint			n;
1715 	ulint			i;
1716 	page_dir_slot_t*	slot;
1717 
1718 	n = page_dir_get_n_slots(page);
1719 
1720 	fprintf(stderr, "--------------------------------\n"
1721 		"PAGE DIRECTORY\n"
1722 		"Page address %p\n"
1723 		"Directory stack top at offs: %lu; number of slots: %lu\n",
1724 		page, (ulong) page_offset(page_dir_get_nth_slot(page, n - 1)),
1725 		(ulong) n);
1726 	for (i = 0; i < n; i++) {
1727 		slot = page_dir_get_nth_slot(page, i);
1728 		if ((i == pr_n) && (i < n - pr_n)) {
1729 			fputs("    ...   \n", stderr);
1730 		}
1731 		if ((i < pr_n) || (i >= n - pr_n)) {
1732 			fprintf(stderr,
1733 				"Contents of slot: %lu: n_owned: %lu,"
1734 				" rec offs: %lu\n",
1735 				(ulong) i,
1736 				(ulong) page_dir_slot_get_n_owned(slot),
1737 				(ulong)
1738 				page_offset(page_dir_slot_get_rec(slot)));
1739 		}
1740 	}
1741 	fprintf(stderr, "Total of %lu records\n"
1742 		"--------------------------------\n",
1743 		(ulong) (PAGE_HEAP_NO_USER_LOW + page_get_n_recs(page)));
1744 }
1745 
1746 /***************************************************************//**
1747 This is used to print the contents of the page record list for
1748 debugging purposes. */
1749 void
page_print_list(buf_block_t * block,dict_index_t * index,ulint pr_n)1750 page_print_list(
1751 /*============*/
1752 	buf_block_t*	block,	/*!< in: index page */
1753 	dict_index_t*	index,	/*!< in: dictionary index of the page */
1754 	ulint		pr_n)	/*!< in: print n first and n last entries */
1755 {
1756 	page_t*		page		= block->frame;
1757 	page_cur_t	cur;
1758 	ulint		count;
1759 	ulint		n_recs;
1760 	mem_heap_t*	heap		= NULL;
1761 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
1762 	rec_offs*	offsets		= offsets_;
1763 	rec_offs_init(offsets_);
1764 
1765 	ut_a((ibool)!!page_is_comp(page) == dict_table_is_comp(index->table));
1766 
1767 	fprint(stderr,
1768 		"--------------------------------\n"
1769 		"PAGE RECORD LIST\n"
1770 		"Page address %p\n", page);
1771 
1772 	n_recs = page_get_n_recs(page);
1773 
1774 	page_cur_set_before_first(block, &cur);
1775 	count = 0;
1776 	for (;;) {
1777 		offsets = rec_get_offsets(cur.rec, index, offsets,
1778 					  page_rec_is_leaf(cur.rec),
1779 					  ULINT_UNDEFINED, &heap);
1780 		page_rec_print(cur.rec, offsets);
1781 
1782 		if (count == pr_n) {
1783 			break;
1784 		}
1785 		if (page_cur_is_after_last(&cur)) {
1786 			break;
1787 		}
1788 		page_cur_move_to_next(&cur);
1789 		count++;
1790 	}
1791 
1792 	if (n_recs > 2 * pr_n) {
1793 		fputs(" ... \n", stderr);
1794 	}
1795 
1796 	while (!page_cur_is_after_last(&cur)) {
1797 		page_cur_move_to_next(&cur);
1798 
1799 		if (count + pr_n >= n_recs) {
1800 			offsets = rec_get_offsets(cur.rec, index, offsets,
1801 						  page_rec_is_leaf(cur.rec),
1802 						  ULINT_UNDEFINED, &heap);
1803 			page_rec_print(cur.rec, offsets);
1804 		}
1805 		count++;
1806 	}
1807 
1808 	fprintf(stderr,
1809 		"Total of %lu records \n"
1810 		"--------------------------------\n",
1811 		(ulong) (count + 1));
1812 
1813 	if (UNIV_LIKELY_NULL(heap)) {
1814 		mem_heap_free(heap);
1815 	}
1816 }
1817 
1818 /***************************************************************//**
1819 Prints the info in a page header. */
1820 void
page_header_print(const page_t * page)1821 page_header_print(
1822 /*==============*/
1823 	const page_t*	page)
1824 {
1825 	fprintf(stderr,
1826 		"--------------------------------\n"
1827 		"PAGE HEADER INFO\n"
1828 		"Page address %p, n records %u (%s)\n"
1829 		"n dir slots %u, heap top %u\n"
1830 		"Page n heap %u, free %u, garbage %u\n"
1831 		"Page last insert %u, direction %u, n direction %u\n",
1832 		page, page_header_get_field(page, PAGE_N_RECS),
1833 		page_is_comp(page) ? "compact format" : "original format",
1834 		page_header_get_field(page, PAGE_N_DIR_SLOTS),
1835 		page_header_get_field(page, PAGE_HEAP_TOP),
1836 		page_dir_get_n_heap(page),
1837 		page_header_get_field(page, PAGE_FREE),
1838 		page_header_get_field(page, PAGE_GARBAGE),
1839 		page_header_get_field(page, PAGE_LAST_INSERT),
1840 		page_get_direction(page),
1841 		page_header_get_field(page, PAGE_N_DIRECTION));
1842 }
1843 
1844 /***************************************************************//**
1845 This is used to print the contents of the page for
1846 debugging purposes. */
1847 void
page_print(buf_block_t * block,dict_index_t * index,ulint dn,ulint rn)1848 page_print(
1849 /*=======*/
1850 	buf_block_t*	block,	/*!< in: index page */
1851 	dict_index_t*	index,	/*!< in: dictionary index of the page */
1852 	ulint		dn,	/*!< in: print dn first and last entries
1853 				in directory */
1854 	ulint		rn)	/*!< in: print rn first and last records
1855 				in directory */
1856 {
1857 	page_t*	page = block->frame;
1858 
1859 	page_header_print(page);
1860 	page_dir_print(page, dn);
1861 	page_print_list(block, index, rn);
1862 }
1863 #endif /* UNIV_BTR_PRINT */
1864 
1865 /***************************************************************//**
1866 The following is used to validate a record on a page. This function
1867 differs from rec_validate as it can also check the n_owned field and
1868 the heap_no field.
1869 @return TRUE if ok */
1870 ibool
page_rec_validate(const rec_t * rec,const rec_offs * offsets)1871 page_rec_validate(
1872 /*==============*/
1873 	const rec_t*	rec,	/*!< in: physical record */
1874 	const rec_offs*	offsets)/*!< in: array returned by rec_get_offsets() */
1875 {
1876 	ulint		n_owned;
1877 	ulint		heap_no;
1878 	const page_t*	page;
1879 
1880 	page = page_align(rec);
1881 	ut_a(!page_is_comp(page) == !rec_offs_comp(offsets));
1882 
1883 	page_rec_check(rec);
1884 	rec_validate(rec, offsets);
1885 
1886 	if (page_rec_is_comp(rec)) {
1887 		n_owned = rec_get_n_owned_new(rec);
1888 		heap_no = rec_get_heap_no_new(rec);
1889 	} else {
1890 		n_owned = rec_get_n_owned_old(rec);
1891 		heap_no = rec_get_heap_no_old(rec);
1892 	}
1893 
1894 	if (UNIV_UNLIKELY(!(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED))) {
1895 		ib::warn() << "Dir slot of rec " << page_offset(rec)
1896 			<< ", n owned too big " << n_owned;
1897 		return(FALSE);
1898 	}
1899 
1900 	if (UNIV_UNLIKELY(!(heap_no < page_dir_get_n_heap(page)))) {
1901 		ib::warn() << "Heap no of rec " << page_offset(rec)
1902 			<< " too big " << heap_no << " "
1903 			<< page_dir_get_n_heap(page);
1904 		return(FALSE);
1905 	}
1906 
1907 	return(TRUE);
1908 }
1909 
1910 #ifdef UNIV_DEBUG
1911 /***************************************************************//**
1912 Checks that the first directory slot points to the infimum record and
1913 the last to the supremum. This function is intended to track if the
1914 bug fixed in 4.0.14 has caused corruption to users' databases. */
1915 void
page_check_dir(const page_t * page)1916 page_check_dir(
1917 /*===========*/
1918 	const page_t*	page)	/*!< in: index page */
1919 {
1920 	ulint	n_slots;
1921 	ulint	infimum_offs;
1922 	ulint	supremum_offs;
1923 
1924 	n_slots = page_dir_get_n_slots(page);
1925 	infimum_offs = mach_read_from_2(page_dir_get_nth_slot(page, 0));
1926 	supremum_offs = mach_read_from_2(page_dir_get_nth_slot(page,
1927 							       n_slots - 1));
1928 
1929 	if (UNIV_UNLIKELY(!page_rec_is_infimum_low(infimum_offs))) {
1930 
1931 		ib::fatal() << "Page directory corruption: infimum not"
1932 			" pointed to";
1933 	}
1934 
1935 	if (UNIV_UNLIKELY(!page_rec_is_supremum_low(supremum_offs))) {
1936 
1937 		ib::fatal() << "Page directory corruption: supremum not"
1938 			" pointed to";
1939 	}
1940 }
1941 #endif /* UNIV_DEBUG */
1942 
1943 /***************************************************************//**
1944 This function checks the consistency of an index page when we do not
1945 know the index. This is also resilient so that this should never crash
1946 even if the page is total garbage.
1947 @return TRUE if ok */
1948 ibool
page_simple_validate_old(const page_t * page)1949 page_simple_validate_old(
1950 /*=====================*/
1951 	const page_t*	page)	/*!< in: index page in ROW_FORMAT=REDUNDANT */
1952 {
1953 	const page_dir_slot_t*	slot;
1954 	ulint			slot_no;
1955 	ulint			n_slots;
1956 	const rec_t*		rec;
1957 	const byte*		rec_heap_top;
1958 	ulint			count;
1959 	ulint			own_count;
1960 	ibool			ret	= FALSE;
1961 
1962 	ut_a(!page_is_comp(page));
1963 
1964 	/* Check first that the record heap and the directory do not
1965 	overlap. */
1966 
1967 	n_slots = page_dir_get_n_slots(page);
1968 
1969 	if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
1970 		ib::error() << "Nonsensical number of page dir slots: "
1971 			    << n_slots;
1972 		goto func_exit;
1973 	}
1974 
1975 	rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
1976 
1977 	if (UNIV_UNLIKELY(rec_heap_top
1978 			  > page_dir_get_nth_slot(page, n_slots - 1))) {
1979 		ib::error()
1980 			<< "Record heap and dir overlap on a page, heap top "
1981 			<< page_header_get_field(page, PAGE_HEAP_TOP)
1982 			<< ", dir "
1983 			<< page_offset(page_dir_get_nth_slot(page,
1984 							     n_slots - 1));
1985 
1986 		goto func_exit;
1987 	}
1988 
1989 	/* Validate the record list in a loop checking also that it is
1990 	consistent with the page record directory. */
1991 
1992 	count = 0;
1993 	own_count = 1;
1994 	slot_no = 0;
1995 	slot = page_dir_get_nth_slot(page, slot_no);
1996 
1997 	rec = page_get_infimum_rec(page);
1998 
1999 	for (;;) {
2000 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2001 			ib::error() << "Record " << (rec - page)
2002 				<< " is above rec heap top "
2003 				<< (rec_heap_top - page);
2004 
2005 			goto func_exit;
2006 		}
2007 
2008 		if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) != 0)) {
2009 			/* This is a record pointed to by a dir slot */
2010 			if (UNIV_UNLIKELY(rec_get_n_owned_old(rec)
2011 					  != own_count)) {
2012 
2013 				ib::error() << "Wrong owned count "
2014 					<< rec_get_n_owned_old(rec)
2015 					<< ", " << own_count << ", rec "
2016 					<< (rec - page);
2017 
2018 				goto func_exit;
2019 			}
2020 
2021 			if (UNIV_UNLIKELY
2022 			    (page_dir_slot_get_rec(slot) != rec)) {
2023 				ib::error() << "Dir slot does not point"
2024 					" to right rec " << (rec - page);
2025 
2026 				goto func_exit;
2027 			}
2028 
2029 			own_count = 0;
2030 
2031 			if (!page_rec_is_supremum(rec)) {
2032 				slot_no++;
2033 				slot = page_dir_get_nth_slot(page, slot_no);
2034 			}
2035 		}
2036 
2037 		if (page_rec_is_supremum(rec)) {
2038 
2039 			break;
2040 		}
2041 
2042 		if (UNIV_UNLIKELY
2043 		    (rec_get_next_offs(rec, FALSE) < FIL_PAGE_DATA
2044 		     || rec_get_next_offs(rec, FALSE) >= srv_page_size)) {
2045 
2046 			ib::error() << "Next record offset nonsensical "
2047 				<< rec_get_next_offs(rec, FALSE) << " for rec "
2048 				<< (rec - page);
2049 
2050 			goto func_exit;
2051 		}
2052 
2053 		count++;
2054 
2055 		if (UNIV_UNLIKELY(count > srv_page_size)) {
2056 			ib::error() << "Page record list appears"
2057 				" to be circular " << count;
2058 			goto func_exit;
2059 		}
2060 
2061 		rec = page_rec_get_next_const(rec);
2062 		own_count++;
2063 	}
2064 
2065 	if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
2066 		ib::error() << "n owned is zero in a supremum rec";
2067 
2068 		goto func_exit;
2069 	}
2070 
2071 	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2072 		ib::error() <<  "n slots wrong "
2073 			<< slot_no << ", " << (n_slots - 1);
2074 		goto func_exit;
2075 	}
2076 
2077 	if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
2078 			  + PAGE_HEAP_NO_USER_LOW
2079 			  != count + 1)) {
2080 		ib::error() <<  "n recs wrong "
2081 			<< page_header_get_field(page, PAGE_N_RECS)
2082 			+ PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2083 
2084 		goto func_exit;
2085 	}
2086 
2087 	/* Check then the free list */
2088 	rec = page_header_get_ptr(page, PAGE_FREE);
2089 
2090 	while (rec != NULL) {
2091 		if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
2092 				  || rec >= page + srv_page_size)) {
2093 			ib::error() << "Free list record has"
2094 				" a nonsensical offset " << (rec - page);
2095 
2096 			goto func_exit;
2097 		}
2098 
2099 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2100 			ib::error() << "Free list record " << (rec - page)
2101 				<< " is above rec heap top "
2102 				<< (rec_heap_top - page);
2103 
2104 			goto func_exit;
2105 		}
2106 
2107 		count++;
2108 
2109 		if (UNIV_UNLIKELY(count > srv_page_size)) {
2110 			ib::error() << "Page free list appears"
2111 				" to be circular " << count;
2112 			goto func_exit;
2113 		}
2114 
2115 		ulint offs = rec_get_next_offs(rec, FALSE);
2116 		if (!offs) {
2117 			break;
2118 		}
2119 		if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
2120 				  || offs >= srv_page_size)) {
2121 			ib::error() << "Page free list is corrupted " << count;
2122 			goto func_exit;
2123 		}
2124 
2125 		rec = page + offs;
2126 	}
2127 
2128 	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2129 
2130 		ib::error() <<  "N heap is wrong "
2131 			<< page_dir_get_n_heap(page) << ", " << (count + 1);
2132 
2133 		goto func_exit;
2134 	}
2135 
2136 	ret = TRUE;
2137 
2138 func_exit:
2139 	return(ret);
2140 }
2141 
2142 /***************************************************************//**
2143 This function checks the consistency of an index page when we do not
2144 know the index. This is also resilient so that this should never crash
2145 even if the page is total garbage.
2146 @return TRUE if ok */
2147 ibool
page_simple_validate_new(const page_t * page)2148 page_simple_validate_new(
2149 /*=====================*/
2150 	const page_t*	page)	/*!< in: index page in ROW_FORMAT!=REDUNDANT */
2151 {
2152 	const page_dir_slot_t*	slot;
2153 	ulint			slot_no;
2154 	ulint			n_slots;
2155 	const rec_t*		rec;
2156 	const byte*		rec_heap_top;
2157 	ulint			count;
2158 	ulint			own_count;
2159 	ibool			ret	= FALSE;
2160 
2161 	ut_a(page_is_comp(page));
2162 
2163 	/* Check first that the record heap and the directory do not
2164 	overlap. */
2165 
2166 	n_slots = page_dir_get_n_slots(page);
2167 
2168 	if (UNIV_UNLIKELY(n_slots < 2 || n_slots > srv_page_size / 4)) {
2169 		ib::error() << "Nonsensical number of page dir slots: "
2170 			    << n_slots;
2171 		goto func_exit;
2172 	}
2173 
2174 	rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
2175 
2176 	if (UNIV_UNLIKELY(rec_heap_top
2177 			  > page_dir_get_nth_slot(page, n_slots - 1))) {
2178 
2179 		ib::error() << "Record heap and dir overlap on a page,"
2180 			" heap top "
2181 			<< page_header_get_field(page, PAGE_HEAP_TOP)
2182 			<< ", dir " << page_offset(
2183 				page_dir_get_nth_slot(page, n_slots - 1));
2184 
2185 		goto func_exit;
2186 	}
2187 
2188 	/* Validate the record list in a loop checking also that it is
2189 	consistent with the page record directory. */
2190 
2191 	count = 0;
2192 	own_count = 1;
2193 	slot_no = 0;
2194 	slot = page_dir_get_nth_slot(page, slot_no);
2195 
2196 	rec = page_get_infimum_rec(page);
2197 
2198 	for (;;) {
2199 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2200 
2201 			ib::error() << "Record " << page_offset(rec)
2202 				<< " is above rec heap top "
2203 				<< page_offset(rec_heap_top);
2204 
2205 			goto func_exit;
2206 		}
2207 
2208 		if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) != 0)) {
2209 			/* This is a record pointed to by a dir slot */
2210 			if (UNIV_UNLIKELY(rec_get_n_owned_new(rec)
2211 					  != own_count)) {
2212 
2213 				ib::error() << "Wrong owned count "
2214 					<< rec_get_n_owned_new(rec) << ", "
2215 					<< own_count << ", rec "
2216 					<< page_offset(rec);
2217 
2218 				goto func_exit;
2219 			}
2220 
2221 			if (UNIV_UNLIKELY
2222 			    (page_dir_slot_get_rec(slot) != rec)) {
2223 				ib::error() << "Dir slot does not point"
2224 					" to right rec " << page_offset(rec);
2225 
2226 				goto func_exit;
2227 			}
2228 
2229 			own_count = 0;
2230 
2231 			if (!page_rec_is_supremum(rec)) {
2232 				slot_no++;
2233 				slot = page_dir_get_nth_slot(page, slot_no);
2234 			}
2235 		}
2236 
2237 		if (page_rec_is_supremum(rec)) {
2238 
2239 			break;
2240 		}
2241 
2242 		if (UNIV_UNLIKELY
2243 		    (rec_get_next_offs(rec, TRUE) < FIL_PAGE_DATA
2244 		     || rec_get_next_offs(rec, TRUE) >= srv_page_size)) {
2245 
2246 			ib::error() << "Next record offset nonsensical "
2247 				<< rec_get_next_offs(rec, TRUE)
2248 				<< " for rec " << page_offset(rec);
2249 
2250 			goto func_exit;
2251 		}
2252 
2253 		count++;
2254 
2255 		if (UNIV_UNLIKELY(count > srv_page_size)) {
2256 			ib::error() << "Page record list appears to be"
2257 				" circular " << count;
2258 			goto func_exit;
2259 		}
2260 
2261 		rec = page_rec_get_next_const(rec);
2262 		own_count++;
2263 	}
2264 
2265 	if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
2266 		ib::error() << "n owned is zero in a supremum rec";
2267 
2268 		goto func_exit;
2269 	}
2270 
2271 	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2272 		ib::error() << "n slots wrong " << slot_no << ", "
2273 			<< (n_slots - 1);
2274 		goto func_exit;
2275 	}
2276 
2277 	if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
2278 			  + PAGE_HEAP_NO_USER_LOW
2279 			  != count + 1)) {
2280 		ib::error() << "n recs wrong "
2281 			<< page_header_get_field(page, PAGE_N_RECS)
2282 			+ PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2283 
2284 		goto func_exit;
2285 	}
2286 
2287 	/* Check then the free list */
2288 	rec = page_header_get_ptr(page, PAGE_FREE);
2289 
2290 	while (rec != NULL) {
2291 		if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
2292 				  || rec >= page + srv_page_size)) {
2293 
2294 			ib::error() << "Free list record has"
2295 				" a nonsensical offset " << page_offset(rec);
2296 
2297 			goto func_exit;
2298 		}
2299 
2300 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2301 			ib::error() << "Free list record " << page_offset(rec)
2302 				<< " is above rec heap top "
2303 				<< page_offset(rec_heap_top);
2304 
2305 			goto func_exit;
2306 		}
2307 
2308 		count++;
2309 
2310 		if (UNIV_UNLIKELY(count > srv_page_size)) {
2311 			ib::error() << "Page free list appears to be"
2312 				" circular " << count;
2313 			goto func_exit;
2314 		}
2315 
2316 		const ulint offs = rec_get_next_offs(rec, TRUE);
2317 		if (!offs) {
2318 			break;
2319 		}
2320 		if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
2321 				  || offs >= srv_page_size)) {
2322 			ib::error() << "Page free list is corrupted " << count;
2323 			goto func_exit;
2324 		}
2325 
2326 		rec = page + offs;
2327 	}
2328 
2329 	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2330 
2331 		ib::error() << "N heap is wrong "
2332 			<< page_dir_get_n_heap(page) << ", " << (count + 1);
2333 
2334 		goto func_exit;
2335 	}
2336 
2337 	ret = TRUE;
2338 
2339 func_exit:
2340 	return(ret);
2341 }
2342 
2343 /** Check the consistency of an index page.
2344 @param[in]	page	index page
2345 @param[in]	index	B-tree or R-tree index
2346 @return	whether the page is valid */
page_validate(const page_t * page,const dict_index_t * index)2347 bool page_validate(const page_t* page, const dict_index_t* index)
2348 {
2349 	const page_dir_slot_t*	slot;
2350 	const rec_t*		rec;
2351 	const rec_t*		old_rec		= NULL;
2352 	const rec_t*		first_rec	= NULL;
2353 	ulint			offs = 0;
2354 	ulint			n_slots;
2355 	ibool			ret		= TRUE;
2356 	ulint			i;
2357 	rec_offs		offsets_1[REC_OFFS_NORMAL_SIZE];
2358 	rec_offs		offsets_2[REC_OFFS_NORMAL_SIZE];
2359 	rec_offs*		offsets		= offsets_1;
2360 	rec_offs*		old_offsets	= offsets_2;
2361 
2362 	rec_offs_init(offsets_1);
2363 	rec_offs_init(offsets_2);
2364 
2365 #ifdef UNIV_GIS_DEBUG
2366 	if (dict_index_is_spatial(index)) {
2367 		fprintf(stderr, "Page no: %lu\n", page_get_page_no(page));
2368 	}
2369 #endif /* UNIV_DEBUG */
2370 
2371 	if (UNIV_UNLIKELY((ibool) !!page_is_comp(page)
2372 			  != dict_table_is_comp(index->table))) {
2373 		ib::error() << "'compact format' flag mismatch";
2374 func_exit2:
2375 		ib::error() << "Apparent corruption in space "
2376 			    << page_get_space_id(page) << " page "
2377 			    << page_get_page_no(page)
2378 			    << " of index " << index->name
2379 			    << " of table " << index->table->name;
2380 		return FALSE;
2381 	}
2382 
2383 	if (page_is_comp(page)) {
2384 		if (UNIV_UNLIKELY(!page_simple_validate_new(page))) {
2385 			goto func_exit2;
2386 		}
2387 	} else {
2388 		if (UNIV_UNLIKELY(!page_simple_validate_old(page))) {
2389 			goto func_exit2;
2390 		}
2391 	}
2392 
2393 	/* Multiple transactions cannot simultaneously operate on the
2394 	same temp-table in parallel.
2395 	max_trx_id is ignored for temp tables because it not required
2396 	for MVCC. */
2397 	if (!page_is_leaf(page) || page_is_empty(page)
2398 	    || !dict_index_is_sec_or_ibuf(index)
2399 	    || index->table->is_temporary()) {
2400 	} else if (trx_id_t sys_max_trx_id = trx_sys.get_max_trx_id()) {
2401 		trx_id_t	max_trx_id	= page_get_max_trx_id(page);
2402 
2403 		if (max_trx_id == 0 || max_trx_id > sys_max_trx_id) {
2404 			ib::error() << "PAGE_MAX_TRX_ID out of bounds: "
2405 				<< max_trx_id << ", " << sys_max_trx_id;
2406 			ret = FALSE;
2407 		}
2408 	} else {
2409 		ut_ad(srv_force_recovery >= SRV_FORCE_NO_UNDO_LOG_SCAN);
2410 	}
2411 
2412 	/* Check first that the record heap and the directory do not
2413 	overlap. */
2414 
2415 	n_slots = page_dir_get_n_slots(page);
2416 
2417 	if (UNIV_UNLIKELY(!(page_header_get_ptr(page, PAGE_HEAP_TOP)
2418 			    <= page_dir_get_nth_slot(page, n_slots - 1)))) {
2419 
2420 		ib::warn() << "Record heap and directory overlap";
2421 		goto func_exit2;
2422 	}
2423 
2424 	switch (uint16_t type = fil_page_get_type(page)) {
2425 	case FIL_PAGE_RTREE:
2426 		if (!index->is_spatial()) {
2427 wrong_page_type:
2428 			ib::warn() << "Wrong page type " << type;
2429 			ret = FALSE;
2430 		}
2431 		break;
2432 	case FIL_PAGE_TYPE_INSTANT:
2433 		if (index->is_instant()
2434 		    && page_get_page_no(page) == index->page) {
2435 			break;
2436 		}
2437 		goto wrong_page_type;
2438 	case FIL_PAGE_INDEX:
2439 		if (index->is_spatial()) {
2440 			goto wrong_page_type;
2441 		}
2442 		if (index->is_instant()
2443 		    && page_get_page_no(page) == index->page) {
2444 			goto wrong_page_type;
2445 		}
2446 		break;
2447 	default:
2448 		goto wrong_page_type;
2449 	}
2450 
2451 	/* The following buffer is used to check that the
2452 	records in the page record heap do not overlap */
2453 	mem_heap_t* heap = mem_heap_create(srv_page_size + 200);;
2454 	byte* buf = static_cast<byte*>(mem_heap_zalloc(heap, srv_page_size));
2455 
2456 	/* Validate the record list in a loop checking also that
2457 	it is consistent with the directory. */
2458 	ulint count = 0, data_size = 0, own_count = 1, slot_no = 0;
2459 	ulint info_bits;
2460 	slot_no = 0;
2461 	slot = page_dir_get_nth_slot(page, slot_no);
2462 
2463 	rec = page_get_infimum_rec(page);
2464 
2465 	const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
2466 
2467 	for (;;) {
2468 		offsets = rec_get_offsets(rec, index, offsets, n_core,
2469 					  ULINT_UNDEFINED, &heap);
2470 
2471 		if (page_is_comp(page) && page_rec_is_user_rec(rec)
2472 		    && UNIV_UNLIKELY(rec_get_node_ptr_flag(rec)
2473 				     == page_is_leaf(page))) {
2474 			ib::error() << "'node_ptr' flag mismatch";
2475 			ret = FALSE;
2476 			goto next_rec;
2477 		}
2478 
2479 		if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2480 			ret = FALSE;
2481 			goto next_rec;
2482 		}
2483 
2484 		info_bits = rec_get_info_bits(rec, page_is_comp(page));
2485 		if (info_bits
2486 		    & ~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG)) {
2487 			ib::error() << "info_bits has an incorrect value "
2488 				    << info_bits;
2489 			ret = false;
2490 		}
2491 
2492 		if (rec == first_rec) {
2493 			if (info_bits & REC_INFO_MIN_REC_FLAG) {
2494 				if (page_has_prev(page)) {
2495 					ib::error() << "REC_INFO_MIN_REC_FLAG "
2496 						"is set on non-left page";
2497 					ret = false;
2498 				} else if (!page_is_leaf(page)) {
2499 					/* leftmost node pointer page */
2500 				} else if (!index->is_instant()) {
2501 					ib::error() << "REC_INFO_MIN_REC_FLAG "
2502 						"is set in a leaf-page record";
2503 					ret = false;
2504 				} else if (!(info_bits & REC_INFO_DELETED_FLAG)
2505 					   != !index->table->instant) {
2506 					ib::error() << (index->table->instant
2507 							? "Metadata record "
2508 							"is not delete-marked"
2509 							: "Metadata record "
2510 							"is delete-marked");
2511 					ret = false;
2512 				}
2513 			} else if (!page_has_prev(page)
2514 				   && index->is_instant()) {
2515 				ib::error() << "Metadata record is missing";
2516 				ret = false;
2517 			}
2518 		} else if (info_bits & REC_INFO_MIN_REC_FLAG) {
2519 			ib::error() << "REC_INFO_MIN_REC_FLAG record is not "
2520 				       "first in page";
2521 			ret = false;
2522 		}
2523 
2524 		if (page_is_comp(page)) {
2525 			const rec_comp_status_t status = rec_get_status(rec);
2526 			if (status != REC_STATUS_ORDINARY
2527 			    && status != REC_STATUS_NODE_PTR
2528 			    && status != REC_STATUS_INFIMUM
2529 			    && status != REC_STATUS_SUPREMUM
2530 			    && status != REC_STATUS_INSTANT) {
2531 				ib::error() << "impossible record status "
2532 					    << status;
2533 				ret = false;
2534 			} else if (page_rec_is_infimum(rec)) {
2535 				if (status != REC_STATUS_INFIMUM) {
2536 					ib::error()
2537 						<< "infimum record has status "
2538 						<< status;
2539 					ret = false;
2540 				}
2541 			} else if (page_rec_is_supremum(rec)) {
2542 				if (status != REC_STATUS_SUPREMUM) {
2543 					ib::error() << "supremum record has "
2544 						       "status "
2545 						    << status;
2546 					ret = false;
2547 				}
2548 			} else if (!page_is_leaf(page)) {
2549 				if (status != REC_STATUS_NODE_PTR) {
2550 					ib::error() << "node ptr record has "
2551 						       "status "
2552 						    << status;
2553 					ret = false;
2554 				}
2555 			} else if (!index->is_instant()
2556 				   && status == REC_STATUS_INSTANT) {
2557 				ib::error() << "instantly added record in a "
2558 					       "non-instant index";
2559 				ret = false;
2560 			}
2561 		}
2562 
2563 		/* Check that the records are in the ascending order */
2564 		if (count >= PAGE_HEAP_NO_USER_LOW
2565 		    && !page_rec_is_supremum(rec)) {
2566 
2567 			int	ret = cmp_rec_rec(
2568 				rec, old_rec, offsets, old_offsets, index);
2569 
2570 			/* For spatial index, on nonleaf leavel, we
2571 			allow recs to be equal. */
2572 			if (ret <= 0 && !(ret == 0 && index->is_spatial()
2573 					  && !page_is_leaf(page))) {
2574 
2575 				ib::error() << "Records in wrong order";
2576 
2577 				fputs("\nInnoDB: previous record ", stderr);
2578 				/* For spatial index, print the mbr info.*/
2579 				if (index->type & DICT_SPATIAL) {
2580 					putc('\n', stderr);
2581 					rec_print_mbr_rec(stderr,
2582 						old_rec, old_offsets);
2583 					fputs("\nInnoDB: record ", stderr);
2584 					putc('\n', stderr);
2585 					rec_print_mbr_rec(stderr, rec, offsets);
2586 					putc('\n', stderr);
2587 					putc('\n', stderr);
2588 
2589 				} else {
2590 					rec_print_new(stderr, old_rec, old_offsets);
2591 					fputs("\nInnoDB: record ", stderr);
2592 					rec_print_new(stderr, rec, offsets);
2593 					putc('\n', stderr);
2594 				}
2595 
2596 				ret = FALSE;
2597 			}
2598 		}
2599 
2600 		if (page_rec_is_user_rec(rec)) {
2601 
2602 			data_size += rec_offs_size(offsets);
2603 
2604 #if defined(UNIV_GIS_DEBUG)
2605 			/* For spatial index, print the mbr info.*/
2606 			if (index->type & DICT_SPATIAL) {
2607 				rec_print_mbr_rec(stderr, rec, offsets);
2608 				putc('\n', stderr);
2609 			}
2610 #endif /* UNIV_GIS_DEBUG */
2611 		}
2612 
2613 		offs = page_offset(rec_get_start(rec, offsets));
2614 		i = rec_offs_size(offsets);
2615 		if (UNIV_UNLIKELY(offs + i >= srv_page_size)) {
2616 			ib::error() << "Record offset out of bounds: "
2617 				    << offs << '+' << i;
2618 			ret = FALSE;
2619 			goto next_rec;
2620 		}
2621 		while (i--) {
2622 			if (UNIV_UNLIKELY(buf[offs + i])) {
2623 				ib::error() << "Record overlaps another: "
2624 					    << offs << '+' << i;
2625 				ret = FALSE;
2626 				break;
2627 			}
2628 			buf[offs + i] = 1;
2629 		}
2630 
2631 		if (ulint rec_own_count = page_is_comp(page)
2632 		    ? rec_get_n_owned_new(rec)
2633 		    : rec_get_n_owned_old(rec)) {
2634 			/* This is a record pointed to by a dir slot */
2635 			if (UNIV_UNLIKELY(rec_own_count != own_count)) {
2636 				ib::error() << "Wrong owned count at " << offs
2637 					    << ": " << rec_own_count
2638 					    << ", " << own_count;
2639 				ret = FALSE;
2640 			}
2641 
2642 			if (page_dir_slot_get_rec(slot) != rec) {
2643 				ib::error() << "Dir slot does not"
2644 					" point to right rec at " << offs;
2645 				ret = FALSE;
2646 			}
2647 
2648 			if (ret) {
2649 				page_dir_slot_check(slot);
2650 			}
2651 
2652 			own_count = 0;
2653 			if (!page_rec_is_supremum(rec)) {
2654 				slot_no++;
2655 				slot = page_dir_get_nth_slot(page, slot_no);
2656 			}
2657 		}
2658 
2659 next_rec:
2660 		if (page_rec_is_supremum(rec)) {
2661 			break;
2662 		}
2663 
2664 		count++;
2665 		own_count++;
2666 		old_rec = rec;
2667 		rec = page_rec_get_next_const(rec);
2668 
2669 		if (page_rec_is_infimum(old_rec)
2670 		    && page_rec_is_user_rec(rec)) {
2671 			first_rec = rec;
2672 		}
2673 
2674 		/* set old_offsets to offsets; recycle offsets */
2675 		std::swap(old_offsets, offsets);
2676 	}
2677 
2678 	if (page_is_comp(page)) {
2679 		if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
2680 
2681 			goto n_owned_zero;
2682 		}
2683 	} else if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
2684 n_owned_zero:
2685 		ib::error() <<  "n owned is zero at " << offs;
2686 		ret = FALSE;
2687 	}
2688 
2689 	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2690 		ib::error() << "n slots wrong " << slot_no << " "
2691 			<< (n_slots - 1);
2692 		ret = FALSE;
2693 	}
2694 
2695 	if (UNIV_UNLIKELY(ulint(page_header_get_field(page, PAGE_N_RECS))
2696 			  + PAGE_HEAP_NO_USER_LOW
2697 			  != count + 1)) {
2698 		ib::error() << "n recs wrong "
2699 			<< page_header_get_field(page, PAGE_N_RECS)
2700 			+ PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2701 		ret = FALSE;
2702 	}
2703 
2704 	if (UNIV_UNLIKELY(data_size != page_get_data_size(page))) {
2705 		ib::error() << "Summed data size " << data_size
2706 			<< ", returned by func " << page_get_data_size(page);
2707 		ret = FALSE;
2708 	}
2709 
2710 	/* Check then the free list */
2711 	rec = page_header_get_ptr(page, PAGE_FREE);
2712 
2713 	while (rec != NULL) {
2714 		offsets = rec_get_offsets(rec, index, offsets, n_core,
2715 					  ULINT_UNDEFINED, &heap);
2716 		if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2717 			ret = FALSE;
2718 next_free:
2719 			const ulint offs = rec_get_next_offs(
2720 				rec, page_is_comp(page));
2721 			if (!offs) {
2722 				break;
2723 			}
2724 			if (UNIV_UNLIKELY(offs < PAGE_OLD_INFIMUM
2725 					  || offs >= srv_page_size)) {
2726 				ib::error() << "Page free list is corrupted";
2727 				ret = FALSE;
2728 				break;
2729 			}
2730 
2731 			rec = page + offs;
2732 			continue;
2733 		}
2734 
2735 		count++;
2736 		offs = page_offset(rec_get_start(rec, offsets));
2737 		i = rec_offs_size(offsets);
2738 		if (UNIV_UNLIKELY(offs + i >= srv_page_size)) {
2739 			ib::error() << "Free record offset out of bounds: "
2740 				    << offs << '+' << i;
2741 			ret = FALSE;
2742 			goto next_free;
2743 		}
2744 		while (i--) {
2745 			if (UNIV_UNLIKELY(buf[offs + i])) {
2746 				ib::error() << "Free record overlaps another: "
2747 					    << offs << '+' << i;
2748 				ret = FALSE;
2749 				break;
2750 			}
2751 			buf[offs + i] = 1;
2752 		}
2753 
2754 		goto next_free;
2755 	}
2756 
2757 	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2758 		ib::error() << "N heap is wrong "
2759 			<< page_dir_get_n_heap(page) << " " << count + 1;
2760 		ret = FALSE;
2761 	}
2762 
2763 	mem_heap_free(heap);
2764 
2765 	if (UNIV_UNLIKELY(!ret)) {
2766 		goto func_exit2;
2767 	}
2768 
2769 	return(ret);
2770 }
2771 
2772 /***************************************************************//**
2773 Looks in the page record list for a record with the given heap number.
2774 @return record, NULL if not found */
2775 const rec_t*
page_find_rec_with_heap_no(const page_t * page,ulint heap_no)2776 page_find_rec_with_heap_no(
2777 /*=======================*/
2778 	const page_t*	page,	/*!< in: index page */
2779 	ulint		heap_no)/*!< in: heap number */
2780 {
2781 	const rec_t*	rec;
2782 
2783 	if (page_is_comp(page)) {
2784 		rec = page + PAGE_NEW_INFIMUM;
2785 
2786 		for (;;) {
2787 			ulint	rec_heap_no = rec_get_heap_no_new(rec);
2788 
2789 			if (rec_heap_no == heap_no) {
2790 
2791 				return(rec);
2792 			} else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2793 
2794 				return(NULL);
2795 			}
2796 
2797 			rec = page + rec_get_next_offs(rec, TRUE);
2798 		}
2799 	} else {
2800 		rec = page + PAGE_OLD_INFIMUM;
2801 
2802 		for (;;) {
2803 			ulint	rec_heap_no = rec_get_heap_no_old(rec);
2804 
2805 			if (rec_heap_no == heap_no) {
2806 
2807 				return(rec);
2808 			} else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2809 
2810 				return(NULL);
2811 			}
2812 
2813 			rec = page + rec_get_next_offs(rec, FALSE);
2814 		}
2815 	}
2816 }
2817 
2818 /*******************************************************//**
2819 Removes the record from a leaf page. This function does not log
2820 any changes. It is used by the IMPORT tablespace functions.
2821 The cursor is moved to the next record after the deleted one.
2822 @return true if success, i.e., the page did not become too empty */
2823 bool
page_delete_rec(const dict_index_t * index,page_cur_t * pcur,page_zip_des_t * page_zip,const rec_offs * offsets)2824 page_delete_rec(
2825 /*============*/
2826 	const dict_index_t*	index,	/*!< in: The index that the record
2827 					belongs to */
2828 	page_cur_t*		pcur,	/*!< in/out: page cursor on record
2829 					to delete */
2830 	page_zip_des_t*
2831 #ifdef UNIV_ZIP_DEBUG
2832 		page_zip/*!< in: compressed page descriptor */
2833 #endif
2834 	,
2835 	const rec_offs*		offsets)/*!< in: offsets for record */
2836 {
2837 	bool		no_compress_needed;
2838 	buf_block_t*	block = pcur->block;
2839 	page_t*		page = buf_block_get_frame(block);
2840 
2841 	ut_ad(page_is_leaf(page));
2842 
2843 	if (!rec_offs_any_extern(offsets)
2844 	    && ((page_get_data_size(page) - rec_offs_size(offsets)
2845 		< BTR_CUR_PAGE_COMPRESS_LIMIT(index))
2846 		|| !page_has_siblings(page)
2847 		|| (page_get_n_recs(page) < 2))) {
2848 
2849 		ulint	root_page_no = dict_index_get_page(index);
2850 
2851 		/* The page fillfactor will drop below a predefined
2852 		minimum value, OR the level in the B-tree contains just
2853 		one page, OR the page will become empty: we recommend
2854 		compression if this is not the root page. */
2855 
2856 		no_compress_needed = page_get_page_no(page) == root_page_no;
2857 	} else {
2858 		no_compress_needed = true;
2859 	}
2860 
2861 	if (no_compress_needed) {
2862 #ifdef UNIV_ZIP_DEBUG
2863 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2864 #endif /* UNIV_ZIP_DEBUG */
2865 
2866 		page_cur_delete_rec(pcur, index, offsets, 0);
2867 
2868 #ifdef UNIV_ZIP_DEBUG
2869 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2870 #endif /* UNIV_ZIP_DEBUG */
2871 	}
2872 
2873 	return(no_compress_needed);
2874 }
2875 
2876 /** Get the last non-delete-marked record on a page.
2877 @param[in]	page	index tree leaf page
2878 @return the last record, not delete-marked
2879 @retval infimum record if all records are delete-marked */
2880 const rec_t*
page_find_rec_max_not_deleted(const page_t * page)2881 page_find_rec_max_not_deleted(
2882 	const page_t*	page)
2883 {
2884 	const rec_t*	rec = page_get_infimum_rec(page);
2885 	const rec_t*	prev_rec = NULL; // remove warning
2886 
2887 	/* Because the page infimum is never delete-marked
2888 	and never the metadata pseudo-record (MIN_REC_FLAG)),
2889 	prev_rec will always be assigned to it first. */
2890 	ut_ad(!rec_get_info_bits(rec, page_rec_is_comp(rec)));
2891 	ut_ad(page_is_leaf(page));
2892 
2893 	if (page_is_comp(page)) {
2894 		do {
2895 			if (!(rec[-REC_NEW_INFO_BITS]
2896 			      & (REC_INFO_DELETED_FLAG
2897 				 | REC_INFO_MIN_REC_FLAG))) {
2898 				prev_rec = rec;
2899 			}
2900 			rec = page_rec_get_next_low(rec, true);
2901 		} while (rec != page + PAGE_NEW_SUPREMUM);
2902 	} else {
2903 		do {
2904 			if (!(rec[-REC_OLD_INFO_BITS]
2905 			      & (REC_INFO_DELETED_FLAG
2906 				 | REC_INFO_MIN_REC_FLAG))) {
2907 				prev_rec = rec;
2908 			}
2909 			rec = page_rec_get_next_low(rec, false);
2910 		} while (rec != page + PAGE_OLD_SUPREMUM);
2911 	}
2912 	return(prev_rec);
2913 }
2914