1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2021, Oracle and/or its affiliates.
4 Copyright (c) 2012, Facebook Inc.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License, version 2.0,
8 as published by the Free Software Foundation.
9 
10 This program is also distributed with certain software (including
11 but not limited to OpenSSL) that is licensed under separate terms,
12 as designated in a particular file or component or in included license
13 documentation.  The authors of MySQL hereby grant you an additional
14 permission to link the program and your derivative works with the
15 separately licensed software that they have included with MySQL.
16 
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 GNU General Public License, version 2.0, for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
25 
26 *****************************************************************************/
27 
28 /**************************************************//**
29 @file page/page0page.cc
30 Index page routines
31 
32 Created 2/2/1994 Heikki Tuuri
33 *******************************************************/
34 
35 #include "page0page.h"
36 #ifdef UNIV_NONINL
37 #include "page0page.ic"
38 #endif
39 
40 #include "page0cur.h"
41 #include "page0zip.h"
42 #include "buf0buf.h"
43 #include "btr0btr.h"
44 #include "row0trunc.h"
45 #ifndef UNIV_HOTBACKUP
46 # include "srv0srv.h"
47 # include "lock0lock.h"
48 # include "fut0lst.h"
49 # include "btr0sea.h"
50 #endif /* !UNIV_HOTBACKUP */
51 
52 /*			THE INDEX PAGE
53 			==============
54 
55 The index page consists of a page header which contains the page's
56 id and other information. On top of it are the index records
57 in a heap linked into a one way linear list according to alphabetic order.
58 
59 Just below page end is an array of pointers which we call page directory,
60 to about every sixth record in the list. The pointers are placed in
61 the directory in the alphabetical order of the records pointed to,
62 enabling us to make binary search using the array. Each slot n:o I
63 in the directory points to a record, where a 4-bit field contains a count
64 of those records which are in the linear list between pointer I and
65 the pointer I - 1 in the directory, including the record
66 pointed to by pointer I and not including the record pointed to by I - 1.
67 We say that the record pointed to by slot I, or that slot I, owns
68 these records. The count is always kept in the range 4 to 8, with
69 the exception that it is 1 for the first slot, and 1--8 for the second slot.
70 
71 An essentially binary search can be performed in the list of index
72 records, like we could do if we had pointer to every record in the
73 page directory. The data structure is, however, more efficient when
74 we are doing inserts, because most inserts are just pushed on a heap.
75 Only every 8th insert requires block move in the directory pointer
76 table, which itself is quite small. A record is deleted from the page
77 by just taking it off the linear list and updating the number of owned
78 records-field of the record which owns it, and updating the page directory,
79 if necessary. A special case is the one when the record owns itself.
80 Because the overhead of inserts is so small, we may also increase the
81 page size from the projected default of 8 kB to 64 kB without too
82 much loss of efficiency in inserts. Bigger page becomes actual
83 when the disk transfer rate compared to seek and latency time rises.
84 On the present system, the page size is set so that the page transfer
85 time (3 ms) is 20 % of the disk random access time (15 ms).
86 
87 When the page is split, merged, or becomes full but contains deleted
88 records, we have to reorganize the page.
89 
90 Assuming a page size of 8 kB, a typical index page of a secondary
91 index contains 300 index entries, and the size of the page directory
92 is 50 x 4 bytes = 200 bytes. */
93 
94 /***************************************************************//**
95 Looks for the directory slot which owns the given record.
96 @return the directory slot number */
97 ulint
page_dir_find_owner_slot(const rec_t * rec)98 page_dir_find_owner_slot(
99 /*=====================*/
100 	const rec_t*	rec)	/*!< in: the physical record */
101 {
102 	const page_t*			page;
103 	register uint16			rec_offs_bytes;
104 	register const page_dir_slot_t*	slot;
105 	register const page_dir_slot_t*	first_slot;
106 	register const rec_t*		r = rec;
107 
108 	ut_ad(page_rec_check(rec));
109 
110 	page = page_align(rec);
111 	first_slot = page_dir_get_nth_slot(page, 0);
112 	slot = page_dir_get_nth_slot(page, page_dir_get_n_slots(page) - 1);
113 
114 	if (page_is_comp(page)) {
115 		while (rec_get_n_owned_new(r) == 0) {
116 			r = rec_get_next_ptr_const(r, TRUE);
117 			ut_ad(r >= page + PAGE_NEW_SUPREMUM);
118 			ut_ad(r < page + (UNIV_PAGE_SIZE - PAGE_DIR));
119 		}
120 	} else {
121 		while (rec_get_n_owned_old(r) == 0) {
122 			r = rec_get_next_ptr_const(r, FALSE);
123 			ut_ad(r >= page + PAGE_OLD_SUPREMUM);
124 			ut_ad(r < page + (UNIV_PAGE_SIZE - PAGE_DIR));
125 		}
126 	}
127 
128 	rec_offs_bytes = mach_encode_2(r - page);
129 
130 	while (UNIV_LIKELY(*(uint16*) slot != rec_offs_bytes)) {
131 
132 		if (UNIV_UNLIKELY(slot == first_slot)) {
133 			ib::error() << "Probable data corruption on page "
134 				<< page_get_page_no(page)
135 				<< ". Original record on that page;";
136 
137 			if (page_is_comp(page)) {
138 				fputs("(compact record)", stderr);
139 			} else {
140 				rec_print_old(stderr, rec);
141 			}
142 
143 			ib::error() << "Cannot find the dir slot for this"
144 				" record on that page;";
145 
146 			if (page_is_comp(page)) {
147 				fputs("(compact record)", stderr);
148 			} else {
149 				rec_print_old(stderr, page
150 					      + mach_decode_2(rec_offs_bytes));
151 			}
152 
153 			ut_error;
154 		}
155 
156 		slot += PAGE_DIR_SLOT_SIZE;
157 	}
158 
159 	return(((ulint) (first_slot - slot)) / PAGE_DIR_SLOT_SIZE);
160 }
161 
162 /**************************************************************//**
163 Used to check the consistency of a directory slot.
164 @return TRUE if succeed */
165 static
166 ibool
page_dir_slot_check(const page_dir_slot_t * slot)167 page_dir_slot_check(
168 /*================*/
169 	const page_dir_slot_t*	slot)	/*!< in: slot */
170 {
171 	const page_t*	page;
172 	ulint		n_slots;
173 	ulint		n_owned;
174 
175 	ut_a(slot);
176 
177 	page = page_align(slot);
178 
179 	n_slots = page_dir_get_n_slots(page);
180 
181 	ut_a(slot <= page_dir_get_nth_slot(page, 0));
182 	ut_a(slot >= page_dir_get_nth_slot(page, n_slots - 1));
183 
184 	ut_a(page_rec_check(page_dir_slot_get_rec(slot)));
185 
186 	if (page_is_comp(page)) {
187 		n_owned = rec_get_n_owned_new(page_dir_slot_get_rec(slot));
188 	} else {
189 		n_owned = rec_get_n_owned_old(page_dir_slot_get_rec(slot));
190 	}
191 
192 	if (slot == page_dir_get_nth_slot(page, 0)) {
193 		ut_a(n_owned == 1);
194 	} else if (slot == page_dir_get_nth_slot(page, n_slots - 1)) {
195 		ut_a(n_owned >= 1);
196 		ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
197 	} else {
198 		ut_a(n_owned >= PAGE_DIR_SLOT_MIN_N_OWNED);
199 		ut_a(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED);
200 	}
201 
202 	return(TRUE);
203 }
204 
205 /*************************************************************//**
206 Sets the max trx id field value. */
207 void
page_set_max_trx_id(buf_block_t * block,page_zip_des_t * page_zip,trx_id_t trx_id,mtr_t * mtr)208 page_set_max_trx_id(
209 /*================*/
210 	buf_block_t*	block,	/*!< in/out: page */
211 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
212 	trx_id_t	trx_id,	/*!< in: transaction id */
213 	mtr_t*		mtr)	/*!< in/out: mini-transaction, or NULL */
214 {
215 	page_t*		page		= buf_block_get_frame(block);
216 #ifndef UNIV_HOTBACKUP
217 	ut_ad(!mtr || mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
218 #endif /* !UNIV_HOTBACKUP */
219 
220 	/* It is not necessary to write this change to the redo log, as
221 	during a database recovery we assume that the max trx id of every
222 	page is the maximum trx id assigned before the crash. */
223 
224 	if (page_zip) {
225 		mach_write_to_8(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), trx_id);
226 		page_zip_write_header(page_zip,
227 				      page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
228 				      8, mtr);
229 #ifndef UNIV_HOTBACKUP
230 	} else if (mtr) {
231 		mlog_write_ull(page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
232 			       trx_id, mtr);
233 #endif /* !UNIV_HOTBACKUP */
234 	} else {
235 		mach_write_to_8(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), trx_id);
236 	}
237 }
238 
239 /************************************************************//**
240 Allocates a block of memory from the heap of an index page.
241 @return pointer to start of allocated buffer, or NULL if allocation fails */
242 byte*
page_mem_alloc_heap(page_t * page,page_zip_des_t * page_zip,ulint need,ulint * heap_no)243 page_mem_alloc_heap(
244 /*================*/
245 	page_t*		page,	/*!< in/out: index page */
246 	page_zip_des_t*	page_zip,/*!< in/out: compressed page with enough
247 				space available for inserting the record,
248 				or NULL */
249 	ulint		need,	/*!< in: total number of bytes needed */
250 	ulint*		heap_no)/*!< out: this contains the heap number
251 				of the allocated record
252 				if allocation succeeds */
253 {
254 	byte*	block;
255 	ulint	avl_space;
256 
257 	ut_ad(page && heap_no);
258 
259 	avl_space = page_get_max_insert_size(page, 1);
260 
261 	if (avl_space >= need) {
262 		block = page_header_get_ptr(page, PAGE_HEAP_TOP);
263 
264 		page_header_set_ptr(page, page_zip, PAGE_HEAP_TOP,
265 				    block + need);
266 		*heap_no = page_dir_get_n_heap(page);
267 
268 		page_dir_set_n_heap(page, page_zip, 1 + *heap_no);
269 
270 		return(block);
271 	}
272 
273 	return(NULL);
274 }
275 
276 #ifndef UNIV_HOTBACKUP
277 /**********************************************************//**
278 Writes a log record of page creation. */
279 UNIV_INLINE
280 void
page_create_write_log(buf_frame_t * frame,mtr_t * mtr,ibool comp,bool is_rtree)281 page_create_write_log(
282 /*==================*/
283 	buf_frame_t*	frame,	/*!< in: a buffer frame where the page is
284 				created */
285 	mtr_t*		mtr,	/*!< in: mini-transaction handle */
286 	ibool		comp,	/*!< in: TRUE=compact page format */
287 	bool		is_rtree) /*!< in: whether it is R-tree */
288 {
289 	mlog_id_t	type;
290 
291 	if (is_rtree) {
292 		type = comp ? MLOG_COMP_PAGE_CREATE_RTREE
293 			    : MLOG_PAGE_CREATE_RTREE;
294 	} else {
295 		type = comp ? MLOG_COMP_PAGE_CREATE : MLOG_PAGE_CREATE;
296 	}
297 
298 	mlog_write_initial_log_record(frame, type, mtr);
299 }
300 #else /* !UNIV_HOTBACKUP */
301 # define page_create_write_log(frame,mtr,comp,is_rtree) ((void) 0)
302 #endif /* !UNIV_HOTBACKUP */
303 
304 /** The page infimum and supremum of an empty page in ROW_FORMAT=REDUNDANT */
305 static const byte infimum_supremum_redundant[] = {
306 	/* the infimum record */
307 	0x08/*end offset*/,
308 	0x01/*n_owned*/,
309 	0x00, 0x00/*heap_no=0*/,
310 	0x03/*n_fields=1, 1-byte offsets*/,
311 	0x00, 0x74/* pointer to supremum */,
312 	'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
313 	/* the supremum record */
314 	0x09/*end offset*/,
315 	0x01/*n_owned*/,
316 	0x00, 0x08/*heap_no=1*/,
317 	0x03/*n_fields=1, 1-byte offsets*/,
318 	0x00, 0x00/* end of record list */,
319 	's', 'u', 'p', 'r', 'e', 'm', 'u', 'm', 0
320 };
321 
322 /** The page infimum and supremum of an empty page in ROW_FORMAT=COMPACT */
323 static const byte infimum_supremum_compact[] = {
324 	/* the infimum record */
325 	0x01/*n_owned=1*/,
326 	0x00, 0x02/* heap_no=0, REC_STATUS_INFIMUM */,
327 	0x00, 0x0d/* pointer to supremum */,
328 	'i', 'n', 'f', 'i', 'm', 'u', 'm', 0,
329 	/* the supremum record */
330 	0x01/*n_owned=1*/,
331 	0x00, 0x0b/* heap_no=1, REC_STATUS_SUPREMUM */,
332 	0x00, 0x00/* end of record list */,
333 	's', 'u', 'p', 'r', 'e', 'm', 'u', 'm'
334 };
335 
336 /**********************************************************//**
337 The index page creation function.
338 @return pointer to the page */
339 static
340 page_t*
page_create_low(buf_block_t * block,ulint comp,bool is_rtree)341 page_create_low(
342 /*============*/
343 	buf_block_t*	block,		/*!< in: a buffer block where the
344 					page is created */
345 	ulint		comp,		/*!< in: nonzero=compact page format */
346 	bool		is_rtree)	/*!< in: if it is an R-Tree page */
347 {
348 	page_t*		page;
349 
350 #if PAGE_BTR_IBUF_FREE_LIST + FLST_BASE_NODE_SIZE > PAGE_DATA
351 # error "PAGE_BTR_IBUF_FREE_LIST + FLST_BASE_NODE_SIZE > PAGE_DATA"
352 #endif
353 #if PAGE_BTR_IBUF_FREE_LIST_NODE + FLST_NODE_SIZE > PAGE_DATA
354 # error "PAGE_BTR_IBUF_FREE_LIST_NODE + FLST_NODE_SIZE > PAGE_DATA"
355 #endif
356 
357 	buf_block_modify_clock_inc(block);
358 
359 	page = buf_block_get_frame(block);
360 
361 	if (is_rtree) {
362 		fil_page_set_type(page, FIL_PAGE_RTREE);
363 	} else {
364 		fil_page_set_type(page, FIL_PAGE_INDEX);
365 	}
366 
367 	memset(page + PAGE_HEADER, 0, PAGE_HEADER_PRIV_END);
368 	page[PAGE_HEADER + PAGE_N_DIR_SLOTS + 1] = 2;
369 	page[PAGE_HEADER + PAGE_DIRECTION + 1] = PAGE_NO_DIRECTION;
370 
371 	if (comp) {
372 		page[PAGE_HEADER + PAGE_N_HEAP] = 0x80;/*page_is_comp()*/
373 		page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
374 		page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_NEW_SUPREMUM_END;
375 		memcpy(page + PAGE_DATA, infimum_supremum_compact,
376 		       sizeof infimum_supremum_compact);
377 		memset(page
378 		       + PAGE_NEW_SUPREMUM_END, 0,
379 		       UNIV_PAGE_SIZE - PAGE_DIR - PAGE_NEW_SUPREMUM_END);
380 		page[UNIV_PAGE_SIZE - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
381 			= PAGE_NEW_SUPREMUM;
382 		page[UNIV_PAGE_SIZE - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
383 			= PAGE_NEW_INFIMUM;
384 	} else {
385 		page[PAGE_HEADER + PAGE_N_HEAP + 1] = PAGE_HEAP_NO_USER_LOW;
386 		page[PAGE_HEADER + PAGE_HEAP_TOP + 1] = PAGE_OLD_SUPREMUM_END;
387 		memcpy(page + PAGE_DATA, infimum_supremum_redundant,
388 		       sizeof infimum_supremum_redundant);
389 		memset(page
390 		       + PAGE_OLD_SUPREMUM_END, 0,
391 		       UNIV_PAGE_SIZE - PAGE_DIR - PAGE_OLD_SUPREMUM_END);
392 		page[UNIV_PAGE_SIZE - PAGE_DIR - PAGE_DIR_SLOT_SIZE * 2 + 1]
393 			= PAGE_OLD_SUPREMUM;
394 		page[UNIV_PAGE_SIZE - PAGE_DIR - PAGE_DIR_SLOT_SIZE + 1]
395 			= PAGE_OLD_INFIMUM;
396 	}
397 
398 	return(page);
399 }
400 
401 /** Parses a redo log record of creating a page.
402 @param[in,out]	block	buffer block, or NULL
403 @param[in]	comp	nonzero=compact page format
404 @param[in]	is_rtree whether it is rtree page */
405 void
page_parse_create(buf_block_t * block,ulint comp,bool is_rtree)406 page_parse_create(
407 	buf_block_t*	block,
408 	ulint		comp,
409 	bool		is_rtree)
410 {
411 	if (block != NULL) {
412 		page_create_low(block, comp, is_rtree);
413 	}
414 }
415 
416 /**********************************************************//**
417 Create an uncompressed B-tree or R-tree index page.
418 @return pointer to the page */
419 page_t*
page_create(buf_block_t * block,mtr_t * mtr,ulint comp,bool is_rtree)420 page_create(
421 /*========*/
422 	buf_block_t*	block,		/*!< in: a buffer block where the
423 					page is created */
424 	mtr_t*		mtr,		/*!< in: mini-transaction handle */
425 	ulint		comp,		/*!< in: nonzero=compact page format */
426 	bool		is_rtree)	/*!< in: whether it is a R-Tree page */
427 {
428 	ut_ad(mtr->is_named_space(block->page.id.space()));
429 	page_create_write_log(buf_block_get_frame(block), mtr, comp, is_rtree);
430 	return(page_create_low(block, comp, is_rtree));
431 }
432 
433 /**********************************************************//**
434 Create a compressed B-tree index page.
435 @return pointer to the page */
436 page_t*
page_create_zip(buf_block_t * block,dict_index_t * index,ulint level,trx_id_t max_trx_id,const redo_page_compress_t * page_comp_info,mtr_t * mtr)437 page_create_zip(
438 /*============*/
439 	buf_block_t*		block,		/*!< in/out: a buffer frame
440 						where the page is created */
441 	dict_index_t*		index,		/*!< in: the index of the
442 						page, or NULL when applying
443 						TRUNCATE log
444 						record during recovery */
445 	ulint			level,		/*!< in: the B-tree level
446 						of the page */
447 	trx_id_t		max_trx_id,	/*!< in: PAGE_MAX_TRX_ID */
448 	const redo_page_compress_t* page_comp_info,
449 						/*!< in: used for applying
450 						TRUNCATE log
451 						record during recovery */
452 	mtr_t*			mtr)		/*!< in/out: mini-transaction
453 						handle */
454 {
455 	page_t*			page;
456 	page_zip_des_t*		page_zip = buf_block_get_page_zip(block);
457 	bool			is_spatial;
458 
459 	ut_ad(block);
460 	ut_ad(page_zip);
461 	ut_ad(index == NULL || dict_table_is_comp(index->table));
462 	is_spatial = index ? dict_index_is_spatial(index)
463 			   : page_comp_info->type & DICT_SPATIAL;
464 
465 	page = page_create_low(block, TRUE, is_spatial);
466 	mach_write_to_2(PAGE_HEADER + PAGE_LEVEL + page, level);
467 	mach_write_to_8(PAGE_HEADER + PAGE_MAX_TRX_ID + page, max_trx_id);
468 
469 	if (truncate_t::s_fix_up_active) {
470 		/* Compress the index page created when applying
471 		TRUNCATE log during recovery */
472 		if (!page_zip_compress(page_zip, page, index, page_zip_level,
473 				       page_comp_info, NULL)) {
474 			/* The compression of a newly created
475 			page should always succeed. */
476 			ut_error;
477 		}
478 
479 	} else if (!page_zip_compress(page_zip, page, index,
480 				      page_zip_level, NULL, mtr)) {
481 		/* The compression of a newly created
482 		page should always succeed. */
483 		ut_error;
484 	}
485 
486 	return(page);
487 }
488 
489 /**********************************************************//**
490 Empty a previously created B-tree index page. */
491 void
page_create_empty(buf_block_t * block,dict_index_t * index,mtr_t * mtr)492 page_create_empty(
493 /*==============*/
494 	buf_block_t*	block,	/*!< in/out: B-tree block */
495 	dict_index_t*	index,	/*!< in: the index of the page */
496 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
497 {
498 	trx_id_t	max_trx_id = 0;
499 	const page_t*	page	= buf_block_get_frame(block);
500 	page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
501 
502 	ut_ad(fil_page_index_page_check(page));
503 
504 	/* Multiple transactions cannot simultaneously operate on the
505 	same temp-table in parallel.
506 	max_trx_id is ignored for temp tables because it not required
507 	for MVCC. */
508 	if (dict_index_is_sec_or_ibuf(index)
509 	    && !dict_table_is_temporary(index->table)
510 	    && page_is_leaf(page)) {
511 		max_trx_id = page_get_max_trx_id(page);
512 		ut_ad(max_trx_id);
513 	}
514 
515 	if (page_zip) {
516 		page_create_zip(block, index,
517 				page_header_get_field(page, PAGE_LEVEL),
518 				max_trx_id, NULL, mtr);
519 	} else {
520 		page_create(block, mtr, page_is_comp(page),
521 			    dict_index_is_spatial(index));
522 
523 		if (max_trx_id) {
524 			page_update_max_trx_id(
525 				block, page_zip, max_trx_id, mtr);
526 		}
527 	}
528 }
529 
530 /*************************************************************//**
531 Differs from page_copy_rec_list_end, because this function does not
532 touch the lock table and max trx id on page or compress the page.
533 
534 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
535 if new_block is a compressed leaf page in a secondary index.
536 This has to be done either within the same mini-transaction,
537 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
538 void
page_copy_rec_list_end_no_locks(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)539 page_copy_rec_list_end_no_locks(
540 /*============================*/
541 	buf_block_t*	new_block,	/*!< in: index page to copy to */
542 	buf_block_t*	block,		/*!< in: index page of rec */
543 	rec_t*		rec,		/*!< in: record on page */
544 	dict_index_t*	index,		/*!< in: record descriptor */
545 	mtr_t*		mtr)		/*!< in: mtr */
546 {
547 	page_t*		new_page	= buf_block_get_frame(new_block);
548 	page_cur_t	cur1;
549 	rec_t*		cur2;
550 	mem_heap_t*	heap		= NULL;
551 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
552 	ulint*		offsets		= offsets_;
553 	rec_offs_init(offsets_);
554 
555 	page_cur_position(rec, block, &cur1);
556 
557 	if (page_cur_is_before_first(&cur1)) {
558 
559 		page_cur_move_to_next(&cur1);
560 	}
561 
562 	btr_assert_not_corrupted(new_block, index);
563 	ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
564 	ut_a(mach_read_from_2(new_page + UNIV_PAGE_SIZE - 10) == (ulint)
565 	     (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
566 
567 	cur2 = page_get_infimum_rec(buf_block_get_frame(new_block));
568 
569 	/* Copy records from the original page to the new page */
570 
571 	while (!page_cur_is_after_last(&cur1)) {
572 		rec_t*	cur1_rec = page_cur_get_rec(&cur1);
573 		rec_t*	ins_rec;
574 		offsets = rec_get_offsets(cur1_rec, index, offsets,
575 					  ULINT_UNDEFINED, &heap);
576 		ins_rec = page_cur_insert_rec_low(cur2, index,
577 						  cur1_rec, offsets, mtr);
578 		if (UNIV_UNLIKELY(!ins_rec)) {
579 			ib::fatal() << "Rec offset " << page_offset(rec)
580 				<< ", cur1 offset "
581 				<< page_offset(page_cur_get_rec(&cur1))
582 				<< ", cur2 offset " << page_offset(cur2);
583 		}
584 
585 		page_cur_move_to_next(&cur1);
586 		cur2 = ins_rec;
587 	}
588 
589 	if (UNIV_LIKELY_NULL(heap)) {
590 		mem_heap_free(heap);
591 	}
592 }
593 
594 #ifndef UNIV_HOTBACKUP
595 /*************************************************************//**
596 Copies records from page to new_page, from a given record onward,
597 including that record. Infimum and supremum records are not copied.
598 The records are copied to the start of the record list on new_page.
599 
600 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
601 if new_block is a compressed leaf page in a secondary index.
602 This has to be done either within the same mini-transaction,
603 or by invoking ibuf_reset_free_bits() before mtr_commit().
604 
605 @return pointer to the original successor of the infimum record on
606 new_page, or NULL on zip overflow (new_block will be decompressed) */
607 rec_t*
page_copy_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)608 page_copy_rec_list_end(
609 /*===================*/
610 	buf_block_t*	new_block,	/*!< in/out: index page to copy to */
611 	buf_block_t*	block,		/*!< in: index page containing rec */
612 	rec_t*		rec,		/*!< in: record on page */
613 	dict_index_t*	index,		/*!< in: record descriptor */
614 	mtr_t*		mtr)		/*!< in: mtr */
615 {
616 	page_t*		new_page	= buf_block_get_frame(new_block);
617 	page_zip_des_t*	new_page_zip	= buf_block_get_page_zip(new_block);
618 	page_t*		page		= page_align(rec);
619 	rec_t*		ret		= page_rec_get_next(
620 		page_get_infimum_rec(new_page));
621 	ulint		num_moved	= 0;
622 	rtr_rec_move_t*	rec_move	= NULL;
623 	mem_heap_t*	heap		= NULL;
624 
625 #ifdef UNIV_ZIP_DEBUG
626 	if (new_page_zip) {
627 		page_zip_des_t*	page_zip = buf_block_get_page_zip(block);
628 		ut_a(page_zip);
629 
630 		/* Strict page_zip_validate() may fail here.
631 		Furthermore, btr_compress() may set FIL_PAGE_PREV to
632 		FIL_NULL on new_page while leaving it intact on
633 		new_page_zip.  So, we cannot validate new_page_zip. */
634 		ut_a(page_zip_validate_low(page_zip, page, index, TRUE));
635 	}
636 #endif /* UNIV_ZIP_DEBUG */
637 	ut_ad(buf_block_get_frame(block) == page);
638 	ut_ad(page_is_leaf(page) == page_is_leaf(new_page));
639 	ut_ad(page_is_comp(page) == page_is_comp(new_page));
640 	/* Here, "ret" may be pointing to a user record or the
641 	predefined supremum record. */
642 
643 	mtr_log_t	log_mode = MTR_LOG_NONE;
644 
645 	if (new_page_zip) {
646 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
647 	}
648 
649 	if (page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW) {
650 		page_copy_rec_list_end_to_created_page(new_page, rec,
651 						       index, mtr);
652 	} else {
653 		if (dict_index_is_spatial(index)) {
654 			ulint	max_to_move = page_get_n_recs(
655 						buf_block_get_frame(block));
656 			heap = mem_heap_create(256);
657 
658 			rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
659 					heap,
660 					sizeof (*rec_move) * max_to_move));
661 
662 			/* For spatial index, we need to insert recs one by one
663 			to keep recs ordered. */
664 			rtr_page_copy_rec_list_end_no_locks(new_block,
665 							    block, rec, index,
666 							    heap, rec_move,
667 							    max_to_move,
668 							    &num_moved,
669 							    mtr);
670 		} else {
671 			page_copy_rec_list_end_no_locks(new_block, block, rec,
672 							index, mtr);
673 		}
674 	}
675 
676 	/* Update PAGE_MAX_TRX_ID on the uncompressed page.
677 	Modifications will be redo logged and copied to the compressed
678 	page in page_zip_compress() or page_zip_reorganize() below.
679 	Multiple transactions cannot simultaneously operate on the
680 	same temp-table in parallel.
681 	max_trx_id is ignored for temp tables because it not required
682 	for MVCC. */
683 	if (dict_index_is_sec_or_ibuf(index)
684 	    && page_is_leaf(page)
685 	    && !dict_table_is_temporary(index->table)) {
686 		page_update_max_trx_id(new_block, NULL,
687 				       page_get_max_trx_id(page), mtr);
688 	}
689 
690 	if (new_page_zip) {
691 		mtr_set_log_mode(mtr, log_mode);
692 
693 		if (!page_zip_compress(new_page_zip,
694 				       new_page,
695 				       index,
696 				       page_zip_level,
697 				       NULL, mtr)) {
698 			/* Before trying to reorganize the page,
699 			store the number of preceding records on the page. */
700 			ulint	ret_pos
701 				= page_rec_get_n_recs_before(ret);
702 			/* Before copying, "ret" was the successor of
703 			the predefined infimum record.  It must still
704 			have at least one predecessor (the predefined
705 			infimum record, or a freshly copied record
706 			that is smaller than "ret"). */
707 			ut_a(ret_pos > 0);
708 
709 			if (!page_zip_reorganize(new_block, index, mtr)) {
710 
711 				if (!page_zip_decompress(new_page_zip,
712 							 new_page, FALSE)) {
713 					ut_error;
714 				}
715 				ut_ad(page_validate(new_page, index));
716 
717 				if (heap) {
718 					mem_heap_free(heap);
719 				}
720 
721 				return(NULL);
722 			} else {
723 				/* The page was reorganized:
724 				Seek to ret_pos. */
725 				ret = new_page + PAGE_NEW_INFIMUM;
726 
727 				do {
728 					ret = rec_get_next_ptr(ret, TRUE);
729 				} while (--ret_pos);
730 			}
731 		}
732 	}
733 
734 	/* Update the lock table and possible hash index */
735 
736 	if (dict_index_is_spatial(index) && rec_move) {
737 		lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
738 	} else if (!dict_table_is_locking_disabled(index->table)) {
739 		lock_move_rec_list_end(new_block, block, rec);
740 	}
741 
742 	if (heap) {
743 		mem_heap_free(heap);
744 	}
745 
746 	btr_search_move_or_delete_hash_entries(new_block, block, index);
747 
748 	return(ret);
749 }
750 
751 /*************************************************************//**
752 Copies records from page to new_page, up to the given record,
753 NOT including that record. Infimum and supremum records are not copied.
754 The records are copied to the end of the record list on new_page.
755 
756 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
757 if new_block is a compressed leaf page in a secondary index.
758 This has to be done either within the same mini-transaction,
759 or by invoking ibuf_reset_free_bits() before mtr_commit().
760 
761 @return pointer to the original predecessor of the supremum record on
762 new_page, or NULL on zip overflow (new_block will be decompressed) */
763 rec_t*
page_copy_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mtr_t * mtr)764 page_copy_rec_list_start(
765 /*=====================*/
766 	buf_block_t*	new_block,	/*!< in/out: index page to copy to */
767 	buf_block_t*	block,		/*!< in: index page containing rec */
768 	rec_t*		rec,		/*!< in: record on page */
769 	dict_index_t*	index,		/*!< in: record descriptor */
770 	mtr_t*		mtr)		/*!< in: mtr */
771 {
772 	page_t*		new_page	= buf_block_get_frame(new_block);
773 	page_zip_des_t*	new_page_zip	= buf_block_get_page_zip(new_block);
774 	page_cur_t	cur1;
775 	rec_t*		cur2;
776 	mem_heap_t*	heap		= NULL;
777 	ulint		num_moved	= 0;
778 	rtr_rec_move_t*	rec_move	= NULL;
779 	rec_t*		ret
780 		= page_rec_get_prev(page_get_supremum_rec(new_page));
781 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
782 	ulint*		offsets		= offsets_;
783 	rec_offs_init(offsets_);
784 
785 	/* Here, "ret" may be pointing to a user record or the
786 	predefined infimum record. */
787 
788 	if (page_rec_is_infimum(rec)) {
789 
790 		return(ret);
791 	}
792 
793 	mtr_log_t	log_mode = MTR_LOG_NONE;
794 
795 	if (new_page_zip) {
796 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
797 	}
798 
799 	page_cur_set_before_first(block, &cur1);
800 	page_cur_move_to_next(&cur1);
801 
802 	cur2 = ret;
803 
804 	/* Copy records from the original page to the new page */
805 	if (dict_index_is_spatial(index)) {
806 		ulint		max_to_move = page_get_n_recs(
807 						buf_block_get_frame(block));
808 		heap = mem_heap_create(256);
809 
810 		rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
811 					heap,
812 					sizeof (*rec_move) * max_to_move));
813 
814 		/* For spatial index, we need to insert recs one by one
815 		to keep recs ordered. */
816 		rtr_page_copy_rec_list_start_no_locks(new_block,
817 						      block, rec, index, heap,
818 						      rec_move, max_to_move,
819 						      &num_moved, mtr);
820 	} else {
821 
822 		while (page_cur_get_rec(&cur1) != rec) {
823 			rec_t*	cur1_rec = page_cur_get_rec(&cur1);
824 			offsets = rec_get_offsets(cur1_rec, index, offsets,
825 						  ULINT_UNDEFINED, &heap);
826 			cur2 = page_cur_insert_rec_low(cur2, index,
827 						       cur1_rec, offsets, mtr);
828 			ut_a(cur2);
829 
830 			page_cur_move_to_next(&cur1);
831 		}
832 	}
833 
834 	/* Update PAGE_MAX_TRX_ID on the uncompressed page.
835 	Modifications will be redo logged and copied to the compressed
836 	page in page_zip_compress() or page_zip_reorganize() below.
837 	Multiple transactions cannot simultaneously operate on the
838 	same temp-table in parallel.
839 	max_trx_id is ignored for temp tables because it not required
840 	for MVCC. */
841 	if (dict_index_is_sec_or_ibuf(index)
842 	    && page_is_leaf(page_align(rec))
843 	    && !dict_table_is_temporary(index->table)) {
844 		page_update_max_trx_id(new_block, NULL,
845 				       page_get_max_trx_id(page_align(rec)),
846 				       mtr);
847 	}
848 
849 	if (new_page_zip) {
850 		mtr_set_log_mode(mtr, log_mode);
851 
852 		DBUG_EXECUTE_IF("page_copy_rec_list_start_compress_fail",
853 				goto zip_reorganize;);
854 
855 		if (!page_zip_compress(new_page_zip, new_page, index,
856 				       page_zip_level, NULL, mtr)) {
857 			ulint	ret_pos;
858 #ifndef NDEBUG
859 zip_reorganize:
860 #endif /* NDEBUG */
861 			/* Before trying to reorganize the page,
862 			store the number of preceding records on the page. */
863 			ret_pos = page_rec_get_n_recs_before(ret);
864 			/* Before copying, "ret" was the predecessor
865 			of the predefined supremum record.  If it was
866 			the predefined infimum record, then it would
867 			still be the infimum, and we would have
868 			ret_pos == 0. */
869 
870 			if (UNIV_UNLIKELY
871 			    (!page_zip_reorganize(new_block, index, mtr))) {
872 
873 				if (UNIV_UNLIKELY
874 				    (!page_zip_decompress(new_page_zip,
875 							  new_page, FALSE))) {
876 					ut_error;
877 				}
878 				ut_ad(page_validate(new_page, index));
879 
880 				if (UNIV_LIKELY_NULL(heap)) {
881 					mem_heap_free(heap);
882 				}
883 
884 				return(NULL);
885 			}
886 
887 			/* The page was reorganized: Seek to ret_pos. */
888 			ret = page_rec_get_nth(new_page, ret_pos);
889 		}
890 	}
891 
892 	/* Update the lock table and possible hash index */
893 
894 	if (dict_index_is_spatial(index)) {
895 		lock_rtr_move_rec_list(new_block, block, rec_move, num_moved);
896 	} else if (!dict_table_is_locking_disabled(index->table)) {
897 		lock_move_rec_list_start(new_block, block, rec, ret);
898 	}
899 
900 	if (heap) {
901 		mem_heap_free(heap);
902 	}
903 
904 	btr_search_move_or_delete_hash_entries(new_block, block, index);
905 
906 	return(ret);
907 }
908 
909 /**********************************************************//**
910 Writes a log record of a record list end or start deletion. */
911 UNIV_INLINE
912 void
page_delete_rec_list_write_log(rec_t * rec,dict_index_t * index,mlog_id_t type,mtr_t * mtr)913 page_delete_rec_list_write_log(
914 /*===========================*/
915 	rec_t*		rec,	/*!< in: record on page */
916 	dict_index_t*	index,	/*!< in: record descriptor */
917 	mlog_id_t	type,	/*!< in: operation type:
918 				MLOG_LIST_END_DELETE, ... */
919 	mtr_t*		mtr)	/*!< in: mtr */
920 {
921 	byte*	log_ptr;
922 	ut_ad(type == MLOG_LIST_END_DELETE
923 	      || type == MLOG_LIST_START_DELETE
924 	      || type == MLOG_COMP_LIST_END_DELETE
925 	      || type == MLOG_COMP_LIST_START_DELETE);
926 
927 	log_ptr = mlog_open_and_write_index(mtr, rec, index, type, 2);
928 	if (log_ptr) {
929 		/* Write the parameter as a 2-byte ulint */
930 		mach_write_to_2(log_ptr, page_offset(rec));
931 		mlog_close(mtr, log_ptr + 2);
932 	}
933 }
934 #else /* !UNIV_HOTBACKUP */
935 # define page_delete_rec_list_write_log(rec,index,type,mtr) ((void) 0)
936 #endif /* !UNIV_HOTBACKUP */
937 
938 /**********************************************************//**
939 Parses a log record of a record list end or start deletion.
940 @return end of log record or NULL */
941 byte*
page_parse_delete_rec_list(mlog_id_t type,byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)942 page_parse_delete_rec_list(
943 /*=======================*/
944 	mlog_id_t	type,	/*!< in: MLOG_LIST_END_DELETE,
945 				MLOG_LIST_START_DELETE,
946 				MLOG_COMP_LIST_END_DELETE or
947 				MLOG_COMP_LIST_START_DELETE */
948 	byte*		ptr,	/*!< in: buffer */
949 	byte*		end_ptr,/*!< in: buffer end */
950 	buf_block_t*	block,	/*!< in/out: buffer block or NULL */
951 	dict_index_t*	index,	/*!< in: record descriptor */
952 	mtr_t*		mtr)	/*!< in: mtr or NULL */
953 {
954 	page_t*	page;
955 	ulint	offset;
956 
957 	ut_ad(type == MLOG_LIST_END_DELETE
958 	      || type == MLOG_LIST_START_DELETE
959 	      || type == MLOG_COMP_LIST_END_DELETE
960 	      || type == MLOG_COMP_LIST_START_DELETE);
961 
962 	/* Read the record offset as a 2-byte ulint */
963 
964 	if (end_ptr < ptr + 2) {
965 
966 		return(NULL);
967 	}
968 
969 	offset = mach_read_from_2(ptr);
970 	ptr += 2;
971 
972 	if (!block) {
973 
974 		return(ptr);
975 	}
976 
977 	page = buf_block_get_frame(block);
978 
979 	ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
980 
981 	if (type == MLOG_LIST_END_DELETE
982 	    || type == MLOG_COMP_LIST_END_DELETE) {
983 		page_delete_rec_list_end(page + offset, block, index,
984 					 ULINT_UNDEFINED, ULINT_UNDEFINED,
985 					 mtr);
986 	} else {
987 		page_delete_rec_list_start(page + offset, block, index, mtr);
988 	}
989 
990 	return(ptr);
991 }
992 
993 /*************************************************************//**
994 Deletes records from a page from a given record onward, including that record.
995 The infimum and supremum records are not deleted. */
996 void
page_delete_rec_list_end(rec_t * rec,buf_block_t * block,dict_index_t * index,ulint n_recs,ulint size,mtr_t * mtr)997 page_delete_rec_list_end(
998 /*=====================*/
999 	rec_t*		rec,	/*!< in: pointer to record on page */
1000 	buf_block_t*	block,	/*!< in: buffer block of the page */
1001 	dict_index_t*	index,	/*!< in: record descriptor */
1002 	ulint		n_recs,	/*!< in: number of records to delete,
1003 				or ULINT_UNDEFINED if not known */
1004 	ulint		size,	/*!< in: the sum of the sizes of the
1005 				records in the end of the chain to
1006 				delete, or ULINT_UNDEFINED if not known */
1007 	mtr_t*		mtr)	/*!< in: mtr */
1008 {
1009 	page_dir_slot_t*slot;
1010 	ulint		slot_index;
1011 	rec_t*		last_rec;
1012 	rec_t*		prev_rec;
1013 	ulint		n_owned;
1014 	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
1015 	page_t*		page		= page_align(rec);
1016 	mem_heap_t*	heap		= NULL;
1017 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1018 	ulint*		offsets		= offsets_;
1019 	rec_offs_init(offsets_);
1020 
1021 	ut_ad(size == ULINT_UNDEFINED || size < UNIV_PAGE_SIZE);
1022 	ut_ad(!page_zip || page_rec_is_comp(rec));
1023 #ifdef UNIV_ZIP_DEBUG
1024 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1025 #endif /* UNIV_ZIP_DEBUG */
1026 
1027 	if (page_rec_is_supremum(rec)) {
1028 		ut_ad(n_recs == 0 || n_recs == ULINT_UNDEFINED);
1029 		/* Nothing to do, there are no records bigger than the
1030 		page supremum. */
1031 		return;
1032 	}
1033 
1034 	if (recv_recovery_is_on()) {
1035 		/* If we are replaying a redo log record, we must
1036 		replay it exactly. Since MySQL 5.6.11, we should be
1037 		generating a redo log record for page creation if
1038 		the page would become empty. Thus, this branch should
1039 		only be executed when applying redo log that was
1040 		generated by an older version of MySQL. */
1041 	} else if (page_rec_is_infimum(rec)
1042 		   || n_recs == page_get_n_recs(page)) {
1043 delete_all:
1044 		/* We are deleting all records. */
1045 		page_create_empty(block, index, mtr);
1046 		return;
1047 	} else if (page_is_comp(page)) {
1048 		if (page_rec_get_next_low(page + PAGE_NEW_INFIMUM, 1) == rec) {
1049 			/* We are deleting everything from the first
1050 			user record onwards. */
1051 			goto delete_all;
1052 		}
1053 	} else {
1054 		if (page_rec_get_next_low(page + PAGE_OLD_INFIMUM, 0) == rec) {
1055 			/* We are deleting everything from the first
1056 			user record onwards. */
1057 			goto delete_all;
1058 		}
1059 	}
1060 
1061 	/* Reset the last insert info in the page header and increment
1062 	the modify clock for the frame */
1063 
1064 	page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
1065 
1066 	/* The page gets invalid for optimistic searches: increment the
1067 	frame modify clock */
1068 
1069 	buf_block_modify_clock_inc(block);
1070 
1071 	page_delete_rec_list_write_log(rec, index, page_is_comp(page)
1072 				       ? MLOG_COMP_LIST_END_DELETE
1073 				       : MLOG_LIST_END_DELETE, mtr);
1074 
1075 	if (page_zip) {
1076 		mtr_log_t	log_mode;
1077 
1078 		ut_a(page_is_comp(page));
1079 		/* Individual deletes are not logged */
1080 
1081 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1082 
1083 		do {
1084 			page_cur_t	cur;
1085 			page_cur_position(rec, block, &cur);
1086 
1087 			offsets = rec_get_offsets(rec, index, offsets,
1088 						  ULINT_UNDEFINED, &heap);
1089 			rec = rec_get_next_ptr(rec, TRUE);
1090 #ifdef UNIV_ZIP_DEBUG
1091 			ut_a(page_zip_validate(page_zip, page, index));
1092 #endif /* UNIV_ZIP_DEBUG */
1093 			page_cur_delete_rec(&cur, index, offsets, mtr);
1094 		} while (page_offset(rec) != PAGE_NEW_SUPREMUM);
1095 
1096 		if (UNIV_LIKELY_NULL(heap)) {
1097 			mem_heap_free(heap);
1098 		}
1099 
1100 		/* Restore log mode */
1101 
1102 		mtr_set_log_mode(mtr, log_mode);
1103 		return;
1104 	}
1105 
1106 	prev_rec = page_rec_get_prev(rec);
1107 
1108 	last_rec = page_rec_get_prev(page_get_supremum_rec(page));
1109 
1110 	const bool scrub = srv_immediate_scrub_data_uncompressed;
1111 
1112 	if (scrub || (size == ULINT_UNDEFINED) || (n_recs == ULINT_UNDEFINED)) {
1113 		rec_t*		rec2		= rec;
1114 		/* Calculate the sum of sizes and the number of records */
1115 		size = 0;
1116 		n_recs = 0;
1117 
1118 		do {
1119 			ulint	s;
1120 			offsets = rec_get_offsets(rec2, index, offsets,
1121 						  ULINT_UNDEFINED, &heap);
1122 			s = rec_offs_size(offsets);
1123 			ut_ad(rec2 - page + s - rec_offs_extra_size(offsets)
1124 			      < UNIV_PAGE_SIZE);
1125 			ut_ad(size + s < UNIV_PAGE_SIZE);
1126 			size += s;
1127 			n_recs++;
1128 
1129                         if (scrub) {
1130                                 /* scrub record */
1131                                 memset(rec2, 0, rec_offs_data_size(offsets));
1132                         }
1133 
1134 			rec2 = page_rec_get_next(rec2);
1135 		} while (!page_rec_is_supremum(rec2));
1136 
1137 		if (UNIV_LIKELY_NULL(heap)) {
1138 			mem_heap_free(heap);
1139 		}
1140 	}
1141 
1142 	ut_ad(size < UNIV_PAGE_SIZE);
1143 
1144 	/* Update the page directory; there is no need to balance the number
1145 	of the records owned by the supremum record, as it is allowed to be
1146 	less than PAGE_DIR_SLOT_MIN_N_OWNED */
1147 
1148 	if (page_is_comp(page)) {
1149 		rec_t*	rec2	= rec;
1150 		ulint	count	= 0;
1151 
1152 		while (rec_get_n_owned_new(rec2) == 0) {
1153 			count++;
1154 
1155 			rec2 = rec_get_next_ptr(rec2, TRUE);
1156 		}
1157 
1158 		ut_ad(rec_get_n_owned_new(rec2) > count);
1159 
1160 		n_owned = rec_get_n_owned_new(rec2) - count;
1161 		slot_index = page_dir_find_owner_slot(rec2);
1162 		ut_ad(slot_index > 0);
1163 		slot = page_dir_get_nth_slot(page, slot_index);
1164 	} else {
1165 		rec_t*	rec2	= rec;
1166 		ulint	count	= 0;
1167 
1168 		while (rec_get_n_owned_old(rec2) == 0) {
1169 			count++;
1170 
1171 			rec2 = rec_get_next_ptr(rec2, FALSE);
1172 		}
1173 
1174 		ut_ad(rec_get_n_owned_old(rec2) > count);
1175 
1176 		n_owned = rec_get_n_owned_old(rec2) - count;
1177 		slot_index = page_dir_find_owner_slot(rec2);
1178 		ut_ad(slot_index > 0);
1179 		slot = page_dir_get_nth_slot(page, slot_index);
1180 	}
1181 
1182 	page_dir_slot_set_rec(slot, page_get_supremum_rec(page));
1183 	page_dir_slot_set_n_owned(slot, NULL, n_owned);
1184 
1185 	page_dir_set_n_slots(page, NULL, slot_index + 1);
1186 
1187 	/* Remove the record chain segment from the record chain */
1188 	page_rec_set_next(prev_rec, page_get_supremum_rec(page));
1189 
1190 	/* Catenate the deleted chain segment to the page free list */
1191 
1192 	page_rec_set_next(last_rec, page_header_get_ptr(page, PAGE_FREE));
1193 	page_header_set_ptr(page, NULL, PAGE_FREE, rec);
1194 
1195 	page_header_set_field(page, NULL, PAGE_GARBAGE, size
1196 			      + page_header_get_field(page, PAGE_GARBAGE));
1197 
1198 	page_header_set_field(page, NULL, PAGE_N_RECS,
1199 			      (ulint)(page_get_n_recs(page) - n_recs));
1200 }
1201 
1202 /*************************************************************//**
1203 Deletes records from page, up to the given record, NOT including
1204 that record. Infimum and supremum records are not deleted. */
1205 void
page_delete_rec_list_start(rec_t * rec,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1206 page_delete_rec_list_start(
1207 /*=======================*/
1208 	rec_t*		rec,	/*!< in: record on page */
1209 	buf_block_t*	block,	/*!< in: buffer block of the page */
1210 	dict_index_t*	index,	/*!< in: record descriptor */
1211 	mtr_t*		mtr)	/*!< in: mtr */
1212 {
1213 	page_cur_t	cur1;
1214 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1215 	ulint*		offsets		= offsets_;
1216 	mem_heap_t*	heap		= NULL;
1217 
1218 	rec_offs_init(offsets_);
1219 
1220 	ut_ad((ibool) !!page_rec_is_comp(rec)
1221 	      == dict_table_is_comp(index->table));
1222 #ifdef UNIV_ZIP_DEBUG
1223 	{
1224 		page_zip_des_t*	page_zip= buf_block_get_page_zip(block);
1225 		page_t*		page	= buf_block_get_frame(block);
1226 
1227 		/* page_zip_validate() would detect a min_rec_mark mismatch
1228 		in btr_page_split_and_insert()
1229 		between btr_attach_half_pages() and insert_page = ...
1230 		when btr_page_get_split_rec_to_left() holds
1231 		(direction == FSP_DOWN). */
1232 		ut_a(!page_zip
1233 		     || page_zip_validate_low(page_zip, page, index, TRUE));
1234 	}
1235 #endif /* UNIV_ZIP_DEBUG */
1236 
1237 	if (page_rec_is_infimum(rec)) {
1238 		return;
1239 	}
1240 
1241 	if (page_rec_is_supremum(rec)) {
1242 		/* We are deleting all records. */
1243 		page_create_empty(block, index, mtr);
1244 		return;
1245 	}
1246 
1247 	mlog_id_t	type;
1248 
1249 	if (page_rec_is_comp(rec)) {
1250 		type = MLOG_COMP_LIST_START_DELETE;
1251 	} else {
1252 		type = MLOG_LIST_START_DELETE;
1253 	}
1254 
1255 	page_delete_rec_list_write_log(rec, index, type, mtr);
1256 
1257 	page_cur_set_before_first(block, &cur1);
1258 	page_cur_move_to_next(&cur1);
1259 
1260 	/* Individual deletes are not logged */
1261 
1262 	mtr_log_t	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1263 
1264 	while (page_cur_get_rec(&cur1) != rec) {
1265 		offsets = rec_get_offsets(page_cur_get_rec(&cur1), index,
1266 					  offsets, ULINT_UNDEFINED, &heap);
1267 		page_cur_delete_rec(&cur1, index, offsets, mtr);
1268 	}
1269 
1270 	if (UNIV_LIKELY_NULL(heap)) {
1271 		mem_heap_free(heap);
1272 	}
1273 
1274 	/* Restore log mode */
1275 
1276 	mtr_set_log_mode(mtr, log_mode);
1277 }
1278 
1279 #ifndef UNIV_HOTBACKUP
1280 /*************************************************************//**
1281 Moves record list end to another page. Moved records include
1282 split_rec.
1283 
1284 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1285 if new_block is a compressed leaf page in a secondary index.
1286 This has to be done either within the same mini-transaction,
1287 or by invoking ibuf_reset_free_bits() before mtr_commit().
1288 
1289 @return TRUE on success; FALSE on compression failure (new_block will
1290 be decompressed) */
1291 ibool
page_move_rec_list_end(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1292 page_move_rec_list_end(
1293 /*===================*/
1294 	buf_block_t*	new_block,	/*!< in/out: index page where to move */
1295 	buf_block_t*	block,		/*!< in: index page from where to move */
1296 	rec_t*		split_rec,	/*!< in: first record to move */
1297 	dict_index_t*	index,		/*!< in: record descriptor */
1298 	mtr_t*		mtr)		/*!< in: mtr */
1299 {
1300 	page_t*		new_page	= buf_block_get_frame(new_block);
1301 	ulint		old_data_size;
1302 	ulint		new_data_size;
1303 	ulint		old_n_recs;
1304 	ulint		new_n_recs;
1305 
1306 	ut_ad(!dict_index_is_spatial(index));
1307 
1308 	old_data_size = page_get_data_size(new_page);
1309 	old_n_recs = page_get_n_recs(new_page);
1310 #ifdef UNIV_ZIP_DEBUG
1311 	{
1312 		page_zip_des_t*	new_page_zip
1313 			= buf_block_get_page_zip(new_block);
1314 		page_zip_des_t*	page_zip
1315 			= buf_block_get_page_zip(block);
1316 		ut_a(!new_page_zip == !page_zip);
1317 		ut_a(!new_page_zip
1318 		     || page_zip_validate(new_page_zip, new_page, index));
1319 		ut_a(!page_zip
1320 		     || page_zip_validate(page_zip, page_align(split_rec),
1321 					  index));
1322 	}
1323 #endif /* UNIV_ZIP_DEBUG */
1324 
1325 	if (UNIV_UNLIKELY(!page_copy_rec_list_end(new_block, block,
1326 						  split_rec, index, mtr))) {
1327 		return(FALSE);
1328 	}
1329 
1330 	new_data_size = page_get_data_size(new_page);
1331 	new_n_recs = page_get_n_recs(new_page);
1332 
1333 	ut_ad(new_data_size >= old_data_size);
1334 
1335 	page_delete_rec_list_end(split_rec, block, index,
1336 				 new_n_recs - old_n_recs,
1337 				 new_data_size - old_data_size, mtr);
1338 
1339 	return(TRUE);
1340 }
1341 
1342 /*************************************************************//**
1343 Moves record list start to another page. Moved records do not include
1344 split_rec.
1345 
1346 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1347 if new_block is a compressed leaf page in a secondary index.
1348 This has to be done either within the same mini-transaction,
1349 or by invoking ibuf_reset_free_bits() before mtr_commit().
1350 
1351 @return TRUE on success; FALSE on compression failure */
1352 ibool
page_move_rec_list_start(buf_block_t * new_block,buf_block_t * block,rec_t * split_rec,dict_index_t * index,mtr_t * mtr)1353 page_move_rec_list_start(
1354 /*=====================*/
1355 	buf_block_t*	new_block,	/*!< in/out: index page where to move */
1356 	buf_block_t*	block,		/*!< in/out: page containing split_rec */
1357 	rec_t*		split_rec,	/*!< in: first record not to move */
1358 	dict_index_t*	index,		/*!< in: record descriptor */
1359 	mtr_t*		mtr)		/*!< in: mtr */
1360 {
1361 	if (UNIV_UNLIKELY(!page_copy_rec_list_start(new_block, block,
1362 						    split_rec, index, mtr))) {
1363 		return(FALSE);
1364 	}
1365 
1366 	page_delete_rec_list_start(split_rec, block, index, mtr);
1367 
1368 	return(TRUE);
1369 }
1370 #endif /* !UNIV_HOTBACKUP */
1371 
1372 /**************************************************************//**
1373 Used to delete n slots from the directory. This function updates
1374 also n_owned fields in the records, so that the first slot after
1375 the deleted ones inherits the records of the deleted slots. */
1376 UNIV_INLINE
1377 void
page_dir_delete_slot(page_t * page,page_zip_des_t * page_zip,ulint slot_no)1378 page_dir_delete_slot(
1379 /*=================*/
1380 	page_t*		page,	/*!< in/out: the index page */
1381 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
1382 	ulint		slot_no)/*!< in: slot to be deleted */
1383 {
1384 	page_dir_slot_t*	slot;
1385 	ulint			n_owned;
1386 	ulint			i;
1387 	ulint			n_slots;
1388 
1389 	ut_ad(!page_zip || page_is_comp(page));
1390 	ut_ad(slot_no > 0);
1391 	ut_ad(slot_no + 1 < page_dir_get_n_slots(page));
1392 
1393 	n_slots = page_dir_get_n_slots(page);
1394 
1395 	/* 1. Reset the n_owned fields of the slots to be
1396 	deleted */
1397 	slot = page_dir_get_nth_slot(page, slot_no);
1398 	n_owned = page_dir_slot_get_n_owned(slot);
1399 	page_dir_slot_set_n_owned(slot, page_zip, 0);
1400 
1401 	/* 2. Update the n_owned value of the first non-deleted slot */
1402 
1403 	slot = page_dir_get_nth_slot(page, slot_no + 1);
1404 	page_dir_slot_set_n_owned(slot, page_zip,
1405 				  n_owned + page_dir_slot_get_n_owned(slot));
1406 
1407 	/* 3. Destroy the slot by copying slots */
1408 	for (i = slot_no + 1; i < n_slots; i++) {
1409 		rec_t*	rec = (rec_t*)
1410 			page_dir_slot_get_rec(page_dir_get_nth_slot(page, i));
1411 		page_dir_slot_set_rec(page_dir_get_nth_slot(page, i - 1), rec);
1412 	}
1413 
1414 	/* 4. Zero out the last slot, which will be removed */
1415 	mach_write_to_2(page_dir_get_nth_slot(page, n_slots - 1), 0);
1416 
1417 	/* 5. Update the page header */
1418 	page_header_set_field(page, page_zip, PAGE_N_DIR_SLOTS, n_slots - 1);
1419 }
1420 
1421 /**************************************************************//**
1422 Used to add n slots to the directory. Does not set the record pointers
1423 in the added slots or update n_owned values: this is the responsibility
1424 of the caller. */
1425 UNIV_INLINE
1426 void
page_dir_add_slot(page_t * page,page_zip_des_t * page_zip,ulint start)1427 page_dir_add_slot(
1428 /*==============*/
1429 	page_t*		page,	/*!< in/out: the index page */
1430 	page_zip_des_t*	page_zip,/*!< in/out: comprssed page, or NULL */
1431 	ulint		start)	/*!< in: the slot above which the new slots
1432 				are added */
1433 {
1434 	page_dir_slot_t*	slot;
1435 	ulint			n_slots;
1436 
1437 	n_slots = page_dir_get_n_slots(page);
1438 
1439 	ut_ad(start < n_slots - 1);
1440 
1441 	/* Update the page header */
1442 	page_dir_set_n_slots(page, page_zip, n_slots + 1);
1443 
1444 	/* Move slots up */
1445 	slot = page_dir_get_nth_slot(page, n_slots);
1446 	memmove(slot, slot + PAGE_DIR_SLOT_SIZE,
1447 		(n_slots - 1 - start) * PAGE_DIR_SLOT_SIZE);
1448 }
1449 
1450 /****************************************************************//**
1451 Splits a directory slot which owns too many records. */
1452 void
page_dir_split_slot(page_t * page,page_zip_des_t * page_zip,ulint slot_no)1453 page_dir_split_slot(
1454 /*================*/
1455 	page_t*		page,	/*!< in/out: index page */
1456 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose
1457 				uncompressed part will be written, or NULL */
1458 	ulint		slot_no)/*!< in: the directory slot */
1459 {
1460 	rec_t*			rec;
1461 	page_dir_slot_t*	new_slot;
1462 	page_dir_slot_t*	prev_slot;
1463 	page_dir_slot_t*	slot;
1464 	ulint			i;
1465 	ulint			n_owned;
1466 
1467 	ut_ad(page);
1468 	ut_ad(!page_zip || page_is_comp(page));
1469 	ut_ad(slot_no > 0);
1470 
1471 	slot = page_dir_get_nth_slot(page, slot_no);
1472 
1473 	n_owned = page_dir_slot_get_n_owned(slot);
1474 	ut_ad(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED + 1);
1475 
1476 	/* 1. We loop to find a record approximately in the middle of the
1477 	records owned by the slot. */
1478 
1479 	prev_slot = page_dir_get_nth_slot(page, slot_no - 1);
1480 	rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
1481 
1482 	for (i = 0; i < n_owned / 2; i++) {
1483 		rec = page_rec_get_next(rec);
1484 	}
1485 
1486 	ut_ad(n_owned / 2 >= PAGE_DIR_SLOT_MIN_N_OWNED);
1487 
1488 	/* 2. We add one directory slot immediately below the slot to be
1489 	split. */
1490 
1491 	page_dir_add_slot(page, page_zip, slot_no - 1);
1492 
1493 	/* The added slot is now number slot_no, and the old slot is
1494 	now number slot_no + 1 */
1495 
1496 	new_slot = page_dir_get_nth_slot(page, slot_no);
1497 	slot = page_dir_get_nth_slot(page, slot_no + 1);
1498 
1499 	/* 3. We store the appropriate values to the new slot. */
1500 
1501 	page_dir_slot_set_rec(new_slot, rec);
1502 	page_dir_slot_set_n_owned(new_slot, page_zip, n_owned / 2);
1503 
1504 	/* 4. Finally, we update the number of records field of the
1505 	original slot */
1506 
1507 	page_dir_slot_set_n_owned(slot, page_zip, n_owned - (n_owned / 2));
1508 }
1509 
1510 /*************************************************************//**
1511 Tries to balance the given directory slot with too few records with the upper
1512 neighbor, so that there are at least the minimum number of records owned by
1513 the slot; this may result in the merging of two slots. */
1514 void
page_dir_balance_slot(page_t * page,page_zip_des_t * page_zip,ulint slot_no)1515 page_dir_balance_slot(
1516 /*==================*/
1517 	page_t*		page,	/*!< in/out: index page */
1518 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
1519 	ulint		slot_no)/*!< in: the directory slot */
1520 {
1521 	page_dir_slot_t*	slot;
1522 	page_dir_slot_t*	up_slot;
1523 	ulint			n_owned;
1524 	ulint			up_n_owned;
1525 	rec_t*			old_rec;
1526 	rec_t*			new_rec;
1527 
1528 	ut_ad(page);
1529 	ut_ad(!page_zip || page_is_comp(page));
1530 	ut_ad(slot_no > 0);
1531 
1532 	slot = page_dir_get_nth_slot(page, slot_no);
1533 
1534 	/* The last directory slot cannot be balanced with the upper
1535 	neighbor, as there is none. */
1536 
1537 	if (UNIV_UNLIKELY(slot_no == page_dir_get_n_slots(page) - 1)) {
1538 
1539 		return;
1540 	}
1541 
1542 	up_slot = page_dir_get_nth_slot(page, slot_no + 1);
1543 
1544 	n_owned = page_dir_slot_get_n_owned(slot);
1545 	up_n_owned = page_dir_slot_get_n_owned(up_slot);
1546 
1547 	ut_ad(n_owned == PAGE_DIR_SLOT_MIN_N_OWNED - 1);
1548 
1549 	/* If the upper slot has the minimum value of n_owned, we will merge
1550 	the two slots, therefore we assert: */
1551 	ut_ad(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1 <= PAGE_DIR_SLOT_MAX_N_OWNED);
1552 
1553 	if (up_n_owned > PAGE_DIR_SLOT_MIN_N_OWNED) {
1554 
1555 		/* In this case we can just transfer one record owned
1556 		by the upper slot to the property of the lower slot */
1557 		old_rec = (rec_t*) page_dir_slot_get_rec(slot);
1558 
1559 		if (page_is_comp(page)) {
1560 			new_rec = rec_get_next_ptr(old_rec, TRUE);
1561 
1562 			rec_set_n_owned_new(old_rec, page_zip, 0);
1563 			rec_set_n_owned_new(new_rec, page_zip, n_owned + 1);
1564 		} else {
1565 			new_rec = rec_get_next_ptr(old_rec, FALSE);
1566 
1567 			rec_set_n_owned_old(old_rec, 0);
1568 			rec_set_n_owned_old(new_rec, n_owned + 1);
1569 		}
1570 
1571 		page_dir_slot_set_rec(slot, new_rec);
1572 
1573 		page_dir_slot_set_n_owned(up_slot, page_zip, up_n_owned -1);
1574 	} else {
1575 		/* In this case we may merge the two slots */
1576 		page_dir_delete_slot(page, page_zip, slot_no);
1577 	}
1578 }
1579 
1580 /************************************************************//**
1581 Returns the nth record of the record list.
1582 This is the inverse function of page_rec_get_n_recs_before().
1583 @return nth record */
1584 const rec_t*
page_rec_get_nth_const(const page_t * page,ulint nth)1585 page_rec_get_nth_const(
1586 /*===================*/
1587 	const page_t*	page,	/*!< in: page */
1588 	ulint		nth)	/*!< in: nth record */
1589 {
1590 	const page_dir_slot_t*	slot;
1591 	ulint			i;
1592 	ulint			n_owned;
1593 	const rec_t*		rec;
1594 
1595 	if (nth == 0) {
1596 		return(page_get_infimum_rec(page));
1597 	}
1598 
1599 	ut_ad(nth < UNIV_PAGE_SIZE / (REC_N_NEW_EXTRA_BYTES + 1));
1600 
1601 	for (i = 0;; i++) {
1602 
1603 		slot = page_dir_get_nth_slot(page, i);
1604 		n_owned = page_dir_slot_get_n_owned(slot);
1605 
1606 		if (n_owned > nth) {
1607 			break;
1608 		} else {
1609 			nth -= n_owned;
1610 		}
1611 	}
1612 
1613 	ut_ad(i > 0);
1614 	slot = page_dir_get_nth_slot(page, i - 1);
1615 	rec = page_dir_slot_get_rec(slot);
1616 
1617 	if (page_is_comp(page)) {
1618 		do {
1619 			rec = page_rec_get_next_low(rec, TRUE);
1620 			ut_ad(rec);
1621 		} while (nth--);
1622 	} else {
1623 		do {
1624 			rec = page_rec_get_next_low(rec, FALSE);
1625 			ut_ad(rec);
1626 		} while (nth--);
1627 	}
1628 
1629 	return(rec);
1630 }
1631 
1632 /***************************************************************//**
1633 Returns the number of records before the given record in chain.
1634 The number includes infimum and supremum records.
1635 @return number of records */
1636 ulint
page_rec_get_n_recs_before(const rec_t * rec)1637 page_rec_get_n_recs_before(
1638 /*=======================*/
1639 	const rec_t*	rec)	/*!< in: the physical record */
1640 {
1641 	const page_dir_slot_t*	slot;
1642 	const rec_t*		slot_rec;
1643 	const page_t*		page;
1644 	ulint			i;
1645 	lint			n	= 0;
1646 
1647 	ut_ad(page_rec_check(rec));
1648 
1649 	page = page_align(rec);
1650 	if (page_is_comp(page)) {
1651 		while (rec_get_n_owned_new(rec) == 0) {
1652 
1653 			rec = rec_get_next_ptr_const(rec, TRUE);
1654 			n--;
1655 		}
1656 
1657 		for (i = 0; ; i++) {
1658 			slot = page_dir_get_nth_slot(page, i);
1659 			slot_rec = page_dir_slot_get_rec(slot);
1660 
1661 			n += rec_get_n_owned_new(slot_rec);
1662 
1663 			if (rec == slot_rec) {
1664 
1665 				break;
1666 			}
1667 		}
1668 	} else {
1669 		while (rec_get_n_owned_old(rec) == 0) {
1670 
1671 			rec = rec_get_next_ptr_const(rec, FALSE);
1672 			n--;
1673 		}
1674 
1675 		for (i = 0; ; i++) {
1676 			slot = page_dir_get_nth_slot(page, i);
1677 			slot_rec = page_dir_slot_get_rec(slot);
1678 
1679 			n += rec_get_n_owned_old(slot_rec);
1680 
1681 			if (rec == slot_rec) {
1682 
1683 				break;
1684 			}
1685 		}
1686 	}
1687 
1688 	n--;
1689 
1690 	ut_ad(n >= 0);
1691 	ut_ad((ulong) n < UNIV_PAGE_SIZE / (REC_N_NEW_EXTRA_BYTES + 1));
1692 
1693 	return((ulint) n);
1694 }
1695 
1696 #ifndef UNIV_HOTBACKUP
1697 /************************************************************//**
1698 Prints record contents including the data relevant only in
1699 the index page context. */
1700 void
page_rec_print(const rec_t * rec,const ulint * offsets)1701 page_rec_print(
1702 /*===========*/
1703 	const rec_t*	rec,	/*!< in: physical record */
1704 	const ulint*	offsets)/*!< in: record descriptor */
1705 {
1706 	ut_a(!page_rec_is_comp(rec) == !rec_offs_comp(offsets));
1707 	rec_print_new(stderr, rec, offsets);
1708 	if (page_rec_is_comp(rec)) {
1709 		ib::info() << "n_owned: " << rec_get_n_owned_new(rec)
1710 			<< "; heap_no: " << rec_get_heap_no_new(rec)
1711 			<< "; next rec: " << rec_get_next_offs(rec, TRUE);
1712 	} else {
1713 		ib::info() << "n_owned: " << rec_get_n_owned_old(rec)
1714 			<< "; heap_no: " << rec_get_heap_no_old(rec)
1715 			<< "; next rec: " << rec_get_next_offs(rec, FALSE);
1716 	}
1717 
1718 	page_rec_check(rec);
1719 	rec_validate(rec, offsets);
1720 }
1721 
1722 # ifdef UNIV_BTR_PRINT
1723 /***************************************************************//**
1724 This is used to print the contents of the directory for
1725 debugging purposes. */
1726 void
page_dir_print(page_t * page,ulint pr_n)1727 page_dir_print(
1728 /*===========*/
1729 	page_t*	page,	/*!< in: index page */
1730 	ulint	pr_n)	/*!< in: print n first and n last entries */
1731 {
1732 	ulint			n;
1733 	ulint			i;
1734 	page_dir_slot_t*	slot;
1735 
1736 	n = page_dir_get_n_slots(page);
1737 
1738 	fprintf(stderr, "--------------------------------\n"
1739 		"PAGE DIRECTORY\n"
1740 		"Page address %p\n"
1741 		"Directory stack top at offs: %lu; number of slots: %lu\n",
1742 		page, (ulong) page_offset(page_dir_get_nth_slot(page, n - 1)),
1743 		(ulong) n);
1744 	for (i = 0; i < n; i++) {
1745 		slot = page_dir_get_nth_slot(page, i);
1746 		if ((i == pr_n) && (i < n - pr_n)) {
1747 			fputs("    ...   \n", stderr);
1748 		}
1749 		if ((i < pr_n) || (i >= n - pr_n)) {
1750 			fprintf(stderr,
1751 				"Contents of slot: %lu: n_owned: %lu,"
1752 				" rec offs: %lu\n",
1753 				(ulong) i,
1754 				(ulong) page_dir_slot_get_n_owned(slot),
1755 				(ulong)
1756 				page_offset(page_dir_slot_get_rec(slot)));
1757 		}
1758 	}
1759 	fprintf(stderr, "Total of %lu records\n"
1760 		"--------------------------------\n",
1761 		(ulong) (PAGE_HEAP_NO_USER_LOW + page_get_n_recs(page)));
1762 }
1763 
1764 /***************************************************************//**
1765 This is used to print the contents of the page record list for
1766 debugging purposes. */
1767 void
page_print_list(buf_block_t * block,dict_index_t * index,ulint pr_n)1768 page_print_list(
1769 /*============*/
1770 	buf_block_t*	block,	/*!< in: index page */
1771 	dict_index_t*	index,	/*!< in: dictionary index of the page */
1772 	ulint		pr_n)	/*!< in: print n first and n last entries */
1773 {
1774 	page_t*		page		= block->frame;
1775 	page_cur_t	cur;
1776 	ulint		count;
1777 	ulint		n_recs;
1778 	mem_heap_t*	heap		= NULL;
1779 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1780 	ulint*		offsets		= offsets_;
1781 	rec_offs_init(offsets_);
1782 
1783 	ut_a((ibool)!!page_is_comp(page) == dict_table_is_comp(index->table));
1784 
1785 	fprint(stderr,
1786 		"--------------------------------\n"
1787 		"PAGE RECORD LIST\n"
1788 		"Page address %p\n", page);
1789 
1790 	n_recs = page_get_n_recs(page);
1791 
1792 	page_cur_set_before_first(block, &cur);
1793 	count = 0;
1794 	for (;;) {
1795 		offsets = rec_get_offsets(cur.rec, index, offsets,
1796 					  ULINT_UNDEFINED, &heap);
1797 		page_rec_print(cur.rec, offsets);
1798 
1799 		if (count == pr_n) {
1800 			break;
1801 		}
1802 		if (page_cur_is_after_last(&cur)) {
1803 			break;
1804 		}
1805 		page_cur_move_to_next(&cur);
1806 		count++;
1807 	}
1808 
1809 	if (n_recs > 2 * pr_n) {
1810 		fputs(" ... \n", stderr);
1811 	}
1812 
1813 	while (!page_cur_is_after_last(&cur)) {
1814 		page_cur_move_to_next(&cur);
1815 
1816 		if (count + pr_n >= n_recs) {
1817 			offsets = rec_get_offsets(cur.rec, index, offsets,
1818 						  ULINT_UNDEFINED, &heap);
1819 			page_rec_print(cur.rec, offsets);
1820 		}
1821 		count++;
1822 	}
1823 
1824 	fprintf(stderr,
1825 		"Total of %lu records \n"
1826 		"--------------------------------\n",
1827 		(ulong) (count + 1));
1828 
1829 	if (UNIV_LIKELY_NULL(heap)) {
1830 		mem_heap_free(heap);
1831 	}
1832 }
1833 
1834 /***************************************************************//**
1835 Prints the info in a page header. */
1836 void
page_header_print(const page_t * page)1837 page_header_print(
1838 /*==============*/
1839 	const page_t*	page)
1840 {
1841 	fprintf(stderr,
1842 		"--------------------------------\n"
1843 		"PAGE HEADER INFO\n"
1844 		"Page address %p, n records %lu (%s)\n"
1845 		"n dir slots %lu, heap top %lu\n"
1846 		"Page n heap %lu, free %lu, garbage %lu\n"
1847 		"Page last insert %lu, direction %lu, n direction %lu\n",
1848 		page, (ulong) page_header_get_field(page, PAGE_N_RECS),
1849 		page_is_comp(page) ? "compact format" : "original format",
1850 		(ulong) page_header_get_field(page, PAGE_N_DIR_SLOTS),
1851 		(ulong) page_header_get_field(page, PAGE_HEAP_TOP),
1852 		(ulong) page_dir_get_n_heap(page),
1853 		(ulong) page_header_get_field(page, PAGE_FREE),
1854 		(ulong) page_header_get_field(page, PAGE_GARBAGE),
1855 		(ulong) page_header_get_field(page, PAGE_LAST_INSERT),
1856 		(ulong) page_header_get_field(page, PAGE_DIRECTION),
1857 		(ulong) page_header_get_field(page, PAGE_N_DIRECTION));
1858 }
1859 
1860 /***************************************************************//**
1861 This is used to print the contents of the page for
1862 debugging purposes. */
1863 void
page_print(buf_block_t * block,dict_index_t * index,ulint dn,ulint rn)1864 page_print(
1865 /*=======*/
1866 	buf_block_t*	block,	/*!< in: index page */
1867 	dict_index_t*	index,	/*!< in: dictionary index of the page */
1868 	ulint		dn,	/*!< in: print dn first and last entries
1869 				in directory */
1870 	ulint		rn)	/*!< in: print rn first and last records
1871 				in directory */
1872 {
1873 	page_t*	page = block->frame;
1874 
1875 	page_header_print(page);
1876 	page_dir_print(page, dn);
1877 	page_print_list(block, index, rn);
1878 }
1879 # endif /* UNIV_BTR_PRINT */
1880 #endif /* !UNIV_HOTBACKUP */
1881 
1882 /***************************************************************//**
1883 The following is used to validate a record on a page. This function
1884 differs from rec_validate as it can also check the n_owned field and
1885 the heap_no field.
1886 @return TRUE if ok */
1887 ibool
page_rec_validate(const rec_t * rec,const ulint * offsets)1888 page_rec_validate(
1889 /*==============*/
1890 	const rec_t*	rec,	/*!< in: physical record */
1891 	const ulint*	offsets)/*!< in: array returned by rec_get_offsets() */
1892 {
1893 	ulint		n_owned;
1894 	ulint		heap_no;
1895 	const page_t*	page;
1896 
1897 	page = page_align(rec);
1898 	ut_a(!page_is_comp(page) == !rec_offs_comp(offsets));
1899 
1900 	page_rec_check(rec);
1901 	rec_validate(rec, offsets);
1902 
1903 	if (page_rec_is_comp(rec)) {
1904 		n_owned = rec_get_n_owned_new(rec);
1905 		heap_no = rec_get_heap_no_new(rec);
1906 	} else {
1907 		n_owned = rec_get_n_owned_old(rec);
1908 		heap_no = rec_get_heap_no_old(rec);
1909 	}
1910 
1911 	if (UNIV_UNLIKELY(!(n_owned <= PAGE_DIR_SLOT_MAX_N_OWNED))) {
1912 		ib::warn() << "Dir slot of rec " << page_offset(rec)
1913 			<< ", n owned too big " << n_owned;
1914 		return(FALSE);
1915 	}
1916 
1917 	if (UNIV_UNLIKELY(!(heap_no < page_dir_get_n_heap(page)))) {
1918 		ib::warn() << "Heap no of rec " << page_offset(rec)
1919 			<< " too big " << heap_no << " "
1920 			<< page_dir_get_n_heap(page);
1921 		return(FALSE);
1922 	}
1923 
1924 	return(TRUE);
1925 }
1926 
1927 #ifndef UNIV_HOTBACKUP
1928 #ifdef UNIV_DEBUG
1929 /***************************************************************//**
1930 Checks that the first directory slot points to the infimum record and
1931 the last to the supremum. This function is intended to track if the
1932 bug fixed in 4.0.14 has caused corruption to users' databases. */
1933 void
page_check_dir(const page_t * page)1934 page_check_dir(
1935 /*===========*/
1936 	const page_t*	page)	/*!< in: index page */
1937 {
1938 	ulint	n_slots;
1939 	ulint	infimum_offs;
1940 	ulint	supremum_offs;
1941 
1942 	n_slots = page_dir_get_n_slots(page);
1943 	infimum_offs = mach_read_from_2(page_dir_get_nth_slot(page, 0));
1944 	supremum_offs = mach_read_from_2(page_dir_get_nth_slot(page,
1945 							       n_slots - 1));
1946 
1947 	if (UNIV_UNLIKELY(!page_rec_is_infimum_low(infimum_offs))) {
1948 
1949 		ib::fatal() << "Page directory corruption: infimum not"
1950 			" pointed to";
1951 	}
1952 
1953 	if (UNIV_UNLIKELY(!page_rec_is_supremum_low(supremum_offs))) {
1954 
1955 		ib::fatal() << "Page directory corruption: supremum not"
1956 			" pointed to";
1957 	}
1958 }
1959 #endif /* UNIV_DEBUG */
1960 #endif /* !UNIV_HOTBACKUP */
1961 
1962 /***************************************************************//**
1963 This function checks the consistency of an index page when we do not
1964 know the index. This is also resilient so that this should never crash
1965 even if the page is total garbage.
1966 @return TRUE if ok */
1967 ibool
page_simple_validate_old(const page_t * page)1968 page_simple_validate_old(
1969 /*=====================*/
1970 	const page_t*	page)	/*!< in: index page in ROW_FORMAT=REDUNDANT */
1971 {
1972 	const page_dir_slot_t*	slot;
1973 	ulint			slot_no;
1974 	ulint			n_slots;
1975 	const rec_t*		rec;
1976 	const byte*		rec_heap_top;
1977 	ulint			count;
1978 	ulint			own_count;
1979 	ibool			ret	= FALSE;
1980 
1981 	ut_a(!page_is_comp(page));
1982 
1983 	/* Check first that the record heap and the directory do not
1984 	overlap. */
1985 
1986 	n_slots = page_dir_get_n_slots(page);
1987 
1988 	if (UNIV_UNLIKELY(n_slots > UNIV_PAGE_SIZE / 4)) {
1989 		ib::error() << "Nonsensical number " << n_slots
1990 			<< " of page dir slots";
1991 
1992 		goto func_exit;
1993 	}
1994 
1995 	rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
1996 
1997 	if (UNIV_UNLIKELY(rec_heap_top
1998 			  > page_dir_get_nth_slot(page, n_slots - 1))) {
1999 		ib::error()
2000 			<< "Record heap and dir overlap on a page, heap top "
2001 			<< page_header_get_field(page, PAGE_HEAP_TOP)
2002 			<< ", dir "
2003 			<< page_offset(page_dir_get_nth_slot(page,
2004 							     n_slots - 1));
2005 
2006 		goto func_exit;
2007 	}
2008 
2009 	/* Validate the record list in a loop checking also that it is
2010 	consistent with the page record directory. */
2011 
2012 	count = 0;
2013 	own_count = 1;
2014 	slot_no = 0;
2015 	slot = page_dir_get_nth_slot(page, slot_no);
2016 
2017 	rec = page_get_infimum_rec(page);
2018 
2019 	for (;;) {
2020 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2021 			ib::error() << "Record " << (rec - page)
2022 				<< " is above rec heap top "
2023 				<< (rec_heap_top - page);
2024 
2025 			goto func_exit;
2026 		}
2027 
2028 		if (UNIV_UNLIKELY(rec_get_n_owned_old(rec))) {
2029 			/* This is a record pointed to by a dir slot */
2030 			if (UNIV_UNLIKELY(rec_get_n_owned_old(rec)
2031 					  != own_count)) {
2032 
2033 				ib::error() << "Wrong owned count "
2034 					<< rec_get_n_owned_old(rec)
2035 					<< ", " << own_count << ", rec "
2036 					<< (rec - page);
2037 
2038 				goto func_exit;
2039 			}
2040 
2041 			if (UNIV_UNLIKELY
2042 			    (page_dir_slot_get_rec(slot) != rec)) {
2043 				ib::error() << "Dir slot does not point"
2044 					" to right rec " << (rec - page);
2045 
2046 				goto func_exit;
2047 			}
2048 
2049 			own_count = 0;
2050 
2051 			if (!page_rec_is_supremum(rec)) {
2052 				slot_no++;
2053 				slot = page_dir_get_nth_slot(page, slot_no);
2054 			}
2055 		}
2056 
2057 		if (page_rec_is_supremum(rec)) {
2058 
2059 			break;
2060 		}
2061 
2062 		if (UNIV_UNLIKELY
2063 		    (rec_get_next_offs(rec, FALSE) < FIL_PAGE_DATA
2064 		     || rec_get_next_offs(rec, FALSE) >= UNIV_PAGE_SIZE)) {
2065 
2066 			ib::error() << "Next record offset nonsensical "
2067 				<< rec_get_next_offs(rec, FALSE) << " for rec "
2068 				<< (rec - page);
2069 
2070 			goto func_exit;
2071 		}
2072 
2073 		count++;
2074 
2075 		if (UNIV_UNLIKELY(count > UNIV_PAGE_SIZE)) {
2076 			ib::error() << "Page record list appears"
2077 				" to be circular " << count;
2078 			goto func_exit;
2079 		}
2080 
2081 		rec = page_rec_get_next_const(rec);
2082 		own_count++;
2083 	}
2084 
2085 	if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
2086 		ib::error() << "n owned is zero in a supremum rec";
2087 
2088 		goto func_exit;
2089 	}
2090 
2091 	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2092 		ib::error() <<  "n slots wrong "
2093 			<< slot_no << ", " << (n_slots - 1);
2094 		goto func_exit;
2095 	}
2096 
2097 	if (UNIV_UNLIKELY(page_header_get_field(page, PAGE_N_RECS)
2098 			  + PAGE_HEAP_NO_USER_LOW
2099 			  != count + 1)) {
2100 		ib::error() <<  "n recs wrong "
2101 			<< page_header_get_field(page, PAGE_N_RECS)
2102 			+ PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2103 
2104 		goto func_exit;
2105 	}
2106 
2107 	/* Check then the free list */
2108 	rec = page_header_get_ptr(page, PAGE_FREE);
2109 
2110 	while (rec != NULL) {
2111 		if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
2112 				  || rec >= page + UNIV_PAGE_SIZE)) {
2113 			ib::error() << "Free list record has"
2114 				" a nonsensical offset " << (rec - page);
2115 
2116 			goto func_exit;
2117 		}
2118 
2119 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2120 			ib::error() << "Free list record " << (rec - page)
2121 				<< " is above rec heap top "
2122 				<< (rec_heap_top - page);
2123 
2124 			goto func_exit;
2125 		}
2126 
2127 		count++;
2128 
2129 		if (UNIV_UNLIKELY(count > UNIV_PAGE_SIZE)) {
2130 			ib::error() << "Page free list appears"
2131 				" to be circular " << count;
2132 			goto func_exit;
2133 		}
2134 
2135 		rec = page_rec_get_next_const(rec);
2136 	}
2137 
2138 	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2139 
2140 		ib::error() <<  "N heap is wrong "
2141 			<< page_dir_get_n_heap(page) << ", " << (count + 1);
2142 
2143 		goto func_exit;
2144 	}
2145 
2146 	ret = TRUE;
2147 
2148 func_exit:
2149 	return(ret);
2150 }
2151 
2152 /***************************************************************//**
2153 This function checks the consistency of an index page when we do not
2154 know the index. This is also resilient so that this should never crash
2155 even if the page is total garbage.
2156 @return TRUE if ok */
2157 ibool
page_simple_validate_new(const page_t * page)2158 page_simple_validate_new(
2159 /*=====================*/
2160 	const page_t*	page)	/*!< in: index page in ROW_FORMAT!=REDUNDANT */
2161 {
2162 	const page_dir_slot_t*	slot;
2163 	ulint			slot_no;
2164 	ulint			n_slots;
2165 	const rec_t*		rec;
2166 	const byte*		rec_heap_top;
2167 	ulint			count;
2168 	ulint			own_count;
2169 	ibool			ret	= FALSE;
2170 
2171 	ut_a(page_is_comp(page));
2172 
2173 	/* Check first that the record heap and the directory do not
2174 	overlap. */
2175 
2176 	n_slots = page_dir_get_n_slots(page);
2177 
2178 	if (UNIV_UNLIKELY(n_slots > UNIV_PAGE_SIZE / 4)) {
2179 		ib::error() << "Nonsensical number " << n_slots
2180 			<< " of page dir slots";
2181 
2182 		goto func_exit;
2183 	}
2184 
2185 	rec_heap_top = page_header_get_ptr(page, PAGE_HEAP_TOP);
2186 
2187 	if (UNIV_UNLIKELY(rec_heap_top
2188 			  > page_dir_get_nth_slot(page, n_slots - 1))) {
2189 
2190 		ib::error() << "Record heap and dir overlap on a page,"
2191 			" heap top "
2192 			<< page_header_get_field(page, PAGE_HEAP_TOP)
2193 			<< ", dir " << page_offset(
2194 				page_dir_get_nth_slot(page, n_slots - 1));
2195 
2196 		goto func_exit;
2197 	}
2198 
2199 	/* Validate the record list in a loop checking also that it is
2200 	consistent with the page record directory. */
2201 
2202 	count = 0;
2203 	own_count = 1;
2204 	slot_no = 0;
2205 	slot = page_dir_get_nth_slot(page, slot_no);
2206 
2207 	rec = page_get_infimum_rec(page);
2208 
2209 	for (;;) {
2210 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2211 
2212 			ib::error() << "Record " << page_offset(rec)
2213 				<< " is above rec heap top "
2214 				<< page_offset(rec_heap_top);
2215 
2216 			goto func_exit;
2217 		}
2218 
2219 		if (UNIV_UNLIKELY(rec_get_n_owned_new(rec))) {
2220 			/* This is a record pointed to by a dir slot */
2221 			if (UNIV_UNLIKELY(rec_get_n_owned_new(rec)
2222 					  != own_count)) {
2223 
2224 				ib::error() << "Wrong owned count "
2225 					<< rec_get_n_owned_new(rec) << ", "
2226 					<< own_count << ", rec "
2227 					<< page_offset(rec);
2228 
2229 				goto func_exit;
2230 			}
2231 
2232 			if (UNIV_UNLIKELY
2233 			    (page_dir_slot_get_rec(slot) != rec)) {
2234 				ib::error() << "Dir slot does not point"
2235 					" to right rec " << page_offset(rec);
2236 
2237 				goto func_exit;
2238 			}
2239 
2240 			own_count = 0;
2241 
2242 			if (!page_rec_is_supremum(rec)) {
2243 				slot_no++;
2244 				slot = page_dir_get_nth_slot(page, slot_no);
2245 			}
2246 		}
2247 
2248 		if (page_rec_is_supremum(rec)) {
2249 
2250 			break;
2251 		}
2252 
2253 		if (UNIV_UNLIKELY
2254 		    (rec_get_next_offs(rec, TRUE) < FIL_PAGE_DATA
2255 		     || rec_get_next_offs(rec, TRUE) >= UNIV_PAGE_SIZE)) {
2256 
2257 			ib::error() << "Next record offset nonsensical "
2258 				<< rec_get_next_offs(rec, TRUE)
2259 				<< " for rec " << page_offset(rec);
2260 
2261 			goto func_exit;
2262 		}
2263 
2264 		count++;
2265 
2266 		if (UNIV_UNLIKELY(count > UNIV_PAGE_SIZE)) {
2267 			ib::error() << "Page record list appears to be"
2268 				" circular " << count;
2269 			goto func_exit;
2270 		}
2271 
2272 		rec = page_rec_get_next_const(rec);
2273 		own_count++;
2274 	}
2275 
2276 	if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
2277 		ib::error() << "n owned is zero in a supremum rec";
2278 
2279 		goto func_exit;
2280 	}
2281 
2282 	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2283 		ib::error() << "n slots wrong " << slot_no << ", "
2284 			<< (n_slots - 1);
2285 		goto func_exit;
2286 	}
2287 
2288 	if (UNIV_UNLIKELY(page_header_get_field(page, PAGE_N_RECS)
2289 			  + PAGE_HEAP_NO_USER_LOW
2290 			  != count + 1)) {
2291 		ib::error() << "n recs wrong "
2292 			<< page_header_get_field(page, PAGE_N_RECS)
2293 			+ PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2294 
2295 		goto func_exit;
2296 	}
2297 
2298 	/* Check then the free list */
2299 	rec = page_header_get_ptr(page, PAGE_FREE);
2300 
2301 	while (rec != NULL) {
2302 		if (UNIV_UNLIKELY(rec < page + FIL_PAGE_DATA
2303 				  || rec >= page + UNIV_PAGE_SIZE)) {
2304 
2305 			ib::error() << "Free list record has"
2306 				" a nonsensical offset " << page_offset(rec);
2307 
2308 			goto func_exit;
2309 		}
2310 
2311 		if (UNIV_UNLIKELY(rec > rec_heap_top)) {
2312 			ib::error() << "Free list record " << page_offset(rec)
2313 				<< " is above rec heap top "
2314 				<< page_offset(rec_heap_top);
2315 
2316 			goto func_exit;
2317 		}
2318 
2319 		count++;
2320 
2321 		if (UNIV_UNLIKELY(count > UNIV_PAGE_SIZE)) {
2322 			ib::error() << "Page free list appears to be"
2323 				" circular " << count;
2324 			goto func_exit;
2325 		}
2326 
2327 		rec = page_rec_get_next_const(rec);
2328 	}
2329 
2330 	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2331 
2332 		ib::error() << "N heap is wrong "
2333 			<< page_dir_get_n_heap(page) << ", " << (count + 1);
2334 
2335 		goto func_exit;
2336 	}
2337 
2338 	ret = TRUE;
2339 
2340 func_exit:
2341 	return(ret);
2342 }
2343 
2344 /***************************************************************//**
2345 This function checks if the page in which record is present is a
2346 non-leaf node of a spatial index.
2347 param[in]	rec	Btree record
2348 param[in]	index	index
2349 @return TRUE if ok */
2350 bool
page_is_spatial_non_leaf(const rec_t * rec,dict_index_t * index)2351 page_is_spatial_non_leaf(
2352 /*====================*/
2353 	const rec_t*	rec,
2354 	dict_index_t*	index)
2355 {
2356      return (dict_index_is_spatial(index) && !page_is_leaf(page_align(rec)));
2357 }
2358 
2359 /***************************************************************//**
2360 This function checks the consistency of an index page.
2361 @return TRUE if ok */
2362 ibool
page_validate(const page_t * page,dict_index_t * index)2363 page_validate(
2364 /*==========*/
2365 	const page_t*	page,	/*!< in: index page */
2366 	dict_index_t*	index)	/*!< in: data dictionary index containing
2367 				the page record type definition */
2368 {
2369 	const page_dir_slot_t*	slot;
2370 	mem_heap_t*		heap;
2371 	byte*			buf;
2372 	ulint			count;
2373 	ulint			own_count;
2374 	ulint			rec_own_count;
2375 	ulint			slot_no;
2376 	ulint			data_size;
2377 	const rec_t*		rec;
2378 	const rec_t*		old_rec		= NULL;
2379 	ulint			offs;
2380 	ulint			n_slots;
2381 	ibool			ret		= FALSE;
2382 	ulint			i;
2383 	ulint*			offsets		= NULL;
2384 	ulint*			old_offsets	= NULL;
2385 
2386 #ifdef UNIV_GIS_DEBUG
2387 	if (dict_index_is_spatial(index)) {
2388 		fprintf(stderr, "Page no: %lu\n", page_get_page_no(page));
2389 	}
2390 #endif /* UNIV_DEBUG */
2391 
2392 	if (UNIV_UNLIKELY((ibool) !!page_is_comp(page)
2393 			  != dict_table_is_comp(index->table))) {
2394 		ib::error() << "'compact format' flag mismatch";
2395 		goto func_exit2;
2396 	}
2397 	if (page_is_comp(page)) {
2398 		if (UNIV_UNLIKELY(!page_simple_validate_new(page))) {
2399 			goto func_exit2;
2400 		}
2401 	} else {
2402 		if (UNIV_UNLIKELY(!page_simple_validate_old(page))) {
2403 			goto func_exit2;
2404 		}
2405 	}
2406 
2407 	/* Multiple transactions cannot simultaneously operate on the
2408 	same temp-table in parallel.
2409 	max_trx_id is ignored for temp tables because it not required
2410 	for MVCC. */
2411 	if (dict_index_is_sec_or_ibuf(index)
2412 	    && !dict_table_is_temporary(index->table)
2413 	    && page_is_leaf(page)
2414 	    && !page_is_empty(page)) {
2415 		trx_id_t	max_trx_id	= page_get_max_trx_id(page);
2416 		trx_id_t	sys_max_trx_id	= trx_sys_get_max_trx_id();
2417 
2418 		if (max_trx_id == 0 || max_trx_id > sys_max_trx_id) {
2419 			ib::error() << "PAGE_MAX_TRX_ID out of bounds: "
2420 				<< max_trx_id << ", " << sys_max_trx_id;
2421 			goto func_exit2;
2422 		}
2423 	}
2424 
2425 	heap = mem_heap_create(UNIV_PAGE_SIZE + 200);
2426 
2427 	/* The following buffer is used to check that the
2428 	records in the page record heap do not overlap */
2429 
2430 	buf = static_cast<byte*>(mem_heap_zalloc(heap, UNIV_PAGE_SIZE));
2431 
2432 	/* Check first that the record heap and the directory do not
2433 	overlap. */
2434 
2435 	n_slots = page_dir_get_n_slots(page);
2436 
2437 	if (UNIV_UNLIKELY(!(page_header_get_ptr(page, PAGE_HEAP_TOP)
2438 			    <= page_dir_get_nth_slot(page, n_slots - 1)))) {
2439 
2440 		ib::warn() << "Record heap and dir overlap on space "
2441 			<< page_get_space_id(page) << " page "
2442 			<< page_get_page_no(page) << " index " << index->name
2443 			<< ", " << page_header_get_ptr(page, PAGE_HEAP_TOP)
2444 			<< ", " << page_dir_get_nth_slot(page, n_slots - 1);
2445 
2446 		goto func_exit;
2447 	}
2448 
2449 	/* Validate the record list in a loop checking also that
2450 	it is consistent with the directory. */
2451 	count = 0;
2452 	data_size = 0;
2453 	own_count = 1;
2454 	slot_no = 0;
2455 	slot = page_dir_get_nth_slot(page, slot_no);
2456 
2457 	rec = page_get_infimum_rec(page);
2458 
2459 	for (;;) {
2460 		offsets = rec_get_offsets(rec, index, offsets,
2461 					  ULINT_UNDEFINED, &heap);
2462 
2463 		if (page_is_comp(page) && page_rec_is_user_rec(rec)
2464 		    && UNIV_UNLIKELY(rec_get_node_ptr_flag(rec)
2465 				     == page_is_leaf(page))) {
2466 			ib::error() << "'node_ptr' flag mismatch";
2467 			goto func_exit;
2468 		}
2469 
2470 		if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2471 			goto func_exit;
2472 		}
2473 
2474 #ifndef UNIV_HOTBACKUP
2475 		/* Check that the records are in the ascending order */
2476 		if (count >= PAGE_HEAP_NO_USER_LOW
2477 		    && !page_rec_is_supremum(rec)) {
2478 
2479 			int	ret = cmp_rec_rec(
2480 				rec, old_rec, offsets, old_offsets, index,
2481 				page_is_spatial_non_leaf(rec, index));
2482 
2483 			/* For spatial index, on nonleaf leavel, we
2484 			allow recs to be equal. */
2485 			bool rtr_equal_nodeptrs =
2486 				(ret == 0 && dict_index_is_spatial(index)
2487 				&& !page_is_leaf(page));
2488 
2489 			if (ret <= 0 && !rtr_equal_nodeptrs) {
2490 
2491 				ib::error() << "Records in wrong order on"
2492 					" space " << page_get_space_id(page)
2493 					<< " page " << page_get_page_no(page)
2494 					<< " index " << index->name;
2495 
2496 				fputs("\nInnoDB: previous record ", stderr);
2497 				/* For spatial index, print the mbr info.*/
2498 				if (index->type & DICT_SPATIAL) {
2499 					putc('\n', stderr);
2500 					rec_print_mbr_rec(stderr,
2501 						old_rec, old_offsets);
2502 					fputs("\nInnoDB: record ", stderr);
2503 					putc('\n', stderr);
2504 					rec_print_mbr_rec(stderr, rec, offsets);
2505 					putc('\n', stderr);
2506 					putc('\n', stderr);
2507 
2508 				} else {
2509 					rec_print_new(stderr, old_rec, old_offsets);
2510 					fputs("\nInnoDB: record ", stderr);
2511 					rec_print_new(stderr, rec, offsets);
2512 					putc('\n', stderr);
2513 				}
2514 
2515 				goto func_exit;
2516 			}
2517 		}
2518 #endif /* !UNIV_HOTBACKUP */
2519 
2520 		if (page_rec_is_user_rec(rec)) {
2521 
2522 			data_size += rec_offs_size(offsets);
2523 
2524 #if UNIV_GIS_DEBUG
2525 			/* For spatial index, print the mbr info.*/
2526 			if (index->type & DICT_SPATIAL) {
2527 				rec_print_mbr_rec(stderr, rec, offsets);
2528 				putc('\n', stderr);
2529 			}
2530 #endif /* UNIV_GIS_DEBUG */
2531 		}
2532 
2533 		offs = page_offset(rec_get_start(rec, offsets));
2534 		i = rec_offs_size(offsets);
2535 		if (UNIV_UNLIKELY(offs + i >= UNIV_PAGE_SIZE)) {
2536 			ib::error() << "Record offset out of bounds";
2537 			goto func_exit;
2538 		}
2539 
2540 		while (i--) {
2541 			if (UNIV_UNLIKELY(buf[offs + i])) {
2542 				/* No other record may overlap this */
2543 				ib::error() << "Record overlaps another";
2544 				goto func_exit;
2545 			}
2546 
2547 			buf[offs + i] = 1;
2548 		}
2549 
2550 		if (page_is_comp(page)) {
2551 			rec_own_count = rec_get_n_owned_new(rec);
2552 		} else {
2553 			rec_own_count = rec_get_n_owned_old(rec);
2554 		}
2555 
2556 		if (UNIV_UNLIKELY(rec_own_count)) {
2557 			/* This is a record pointed to by a dir slot */
2558 			if (UNIV_UNLIKELY(rec_own_count != own_count)) {
2559 				ib::error() << "Wrong owned count "
2560 					<< rec_own_count << ", " << own_count;
2561 				goto func_exit;
2562 			}
2563 
2564 			if (page_dir_slot_get_rec(slot) != rec) {
2565 				ib::error() << "Dir slot does not"
2566 					" point to right rec";
2567 				goto func_exit;
2568 			}
2569 
2570 			page_dir_slot_check(slot);
2571 
2572 			own_count = 0;
2573 			if (!page_rec_is_supremum(rec)) {
2574 				slot_no++;
2575 				slot = page_dir_get_nth_slot(page, slot_no);
2576 			}
2577 		}
2578 
2579 		if (page_rec_is_supremum(rec)) {
2580 			break;
2581 		}
2582 
2583 		count++;
2584 		own_count++;
2585 		old_rec = rec;
2586 		rec = page_rec_get_next_const(rec);
2587 
2588 		/* set old_offsets to offsets; recycle offsets */
2589 		{
2590 			ulint* offs = old_offsets;
2591 			old_offsets = offsets;
2592 			offsets = offs;
2593 		}
2594 	}
2595 
2596 	if (page_is_comp(page)) {
2597 		if (UNIV_UNLIKELY(rec_get_n_owned_new(rec) == 0)) {
2598 
2599 			goto n_owned_zero;
2600 		}
2601 	} else if (UNIV_UNLIKELY(rec_get_n_owned_old(rec) == 0)) {
2602 n_owned_zero:
2603 		ib::error() <<  "n owned is zero";
2604 		goto func_exit;
2605 	}
2606 
2607 	if (UNIV_UNLIKELY(slot_no != n_slots - 1)) {
2608 		ib::error() << "n slots wrong " << slot_no << " "
2609 			<< (n_slots - 1);
2610 		goto func_exit;
2611 	}
2612 
2613 	if (UNIV_UNLIKELY(page_header_get_field(page, PAGE_N_RECS)
2614 			  + PAGE_HEAP_NO_USER_LOW
2615 			  != count + 1)) {
2616 		ib::error() << "n recs wrong "
2617 			<< page_header_get_field(page, PAGE_N_RECS)
2618 			+ PAGE_HEAP_NO_USER_LOW << " " << (count + 1);
2619 		goto func_exit;
2620 	}
2621 
2622 	if (UNIV_UNLIKELY(data_size != page_get_data_size(page))) {
2623 		ib::error() << "Summed data size " << data_size
2624 			<< ", returned by func " << page_get_data_size(page);
2625 		goto func_exit;
2626 	}
2627 
2628 	/* Check then the free list */
2629 	rec = page_header_get_ptr(page, PAGE_FREE);
2630 
2631 	while (rec != NULL) {
2632 		offsets = rec_get_offsets(rec, index, offsets,
2633 					  ULINT_UNDEFINED, &heap);
2634 		if (UNIV_UNLIKELY(!page_rec_validate(rec, offsets))) {
2635 
2636 			goto func_exit;
2637 		}
2638 
2639 		count++;
2640 		offs = page_offset(rec_get_start(rec, offsets));
2641 		i = rec_offs_size(offsets);
2642 		if (UNIV_UNLIKELY(offs + i >= UNIV_PAGE_SIZE)) {
2643 			ib::error() << "Record offset out of bounds";
2644 			goto func_exit;
2645 		}
2646 
2647 		while (i--) {
2648 
2649 			if (UNIV_UNLIKELY(buf[offs + i])) {
2650 				ib::error() << "Record overlaps another"
2651 					" in free list";
2652 				goto func_exit;
2653 			}
2654 
2655 			buf[offs + i] = 1;
2656 		}
2657 
2658 		rec = page_rec_get_next_const(rec);
2659 	}
2660 
2661 	if (UNIV_UNLIKELY(page_dir_get_n_heap(page) != count + 1)) {
2662 		ib::error() << "N heap is wrong "
2663 			<< page_dir_get_n_heap(page) << " " << count + 1;
2664 		goto func_exit;
2665 	}
2666 
2667 	ret = TRUE;
2668 
2669 func_exit:
2670 	mem_heap_free(heap);
2671 
2672 	if (UNIV_UNLIKELY(ret == FALSE)) {
2673 func_exit2:
2674 		ib::error() << "Apparent corruption in space "
2675 			<< page_get_space_id(page) << " page "
2676 			<< page_get_page_no(page) << " index " << index->name;
2677 	}
2678 
2679 	return(ret);
2680 }
2681 
2682 #ifndef UNIV_HOTBACKUP
2683 /***************************************************************//**
2684 Looks in the page record list for a record with the given heap number.
2685 @return record, NULL if not found */
2686 const rec_t*
page_find_rec_with_heap_no(const page_t * page,ulint heap_no)2687 page_find_rec_with_heap_no(
2688 /*=======================*/
2689 	const page_t*	page,	/*!< in: index page */
2690 	ulint		heap_no)/*!< in: heap number */
2691 {
2692 	const rec_t*	rec;
2693 
2694 	if (page_is_comp(page)) {
2695 		rec = page + PAGE_NEW_INFIMUM;
2696 
2697 		for (;;) {
2698 			ulint	rec_heap_no = rec_get_heap_no_new(rec);
2699 
2700 			if (rec_heap_no == heap_no) {
2701 
2702 				return(rec);
2703 			} else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2704 
2705 				return(NULL);
2706 			}
2707 
2708 			rec = page + rec_get_next_offs(rec, TRUE);
2709 		}
2710 	} else {
2711 		rec = page + PAGE_OLD_INFIMUM;
2712 
2713 		for (;;) {
2714 			ulint	rec_heap_no = rec_get_heap_no_old(rec);
2715 
2716 			if (rec_heap_no == heap_no) {
2717 
2718 				return(rec);
2719 			} else if (rec_heap_no == PAGE_HEAP_NO_SUPREMUM) {
2720 
2721 				return(NULL);
2722 			}
2723 
2724 			rec = page + rec_get_next_offs(rec, FALSE);
2725 		}
2726 	}
2727 }
2728 #endif /* !UNIV_HOTBACKUP */
2729 
2730 /*******************************************************//**
2731 Removes the record from a leaf page. This function does not log
2732 any changes. It is used by the IMPORT tablespace functions.
2733 The cursor is moved to the next record after the deleted one.
2734 @return true if success, i.e., the page did not become too empty */
2735 bool
page_delete_rec(const dict_index_t * index,page_cur_t * pcur,page_zip_des_t * page_zip,const ulint * offsets)2736 page_delete_rec(
2737 /*============*/
2738 	const dict_index_t*	index,	/*!< in: The index that the record
2739 					belongs to */
2740 	page_cur_t*		pcur,	/*!< in/out: page cursor on record
2741 					to delete */
2742 	page_zip_des_t*		page_zip,/*!< in: compressed page descriptor */
2743 	const ulint*		offsets)/*!< in: offsets for record */
2744 {
2745 	bool		no_compress_needed;
2746 	buf_block_t*	block = pcur->block;
2747 	page_t*		page = buf_block_get_frame(block);
2748 
2749 	ut_ad(page_is_leaf(page));
2750 
2751 	if (!rec_offs_any_extern(offsets)
2752 	    && ((page_get_data_size(page) - rec_offs_size(offsets)
2753 		< BTR_CUR_PAGE_COMPRESS_LIMIT(index))
2754 		|| (mach_read_from_4(page + FIL_PAGE_NEXT) == FIL_NULL
2755 		    && mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL)
2756 		|| (page_get_n_recs(page) < 2))) {
2757 
2758 		ulint	root_page_no = dict_index_get_page(index);
2759 
2760 		/* The page fillfactor will drop below a predefined
2761 		minimum value, OR the level in the B-tree contains just
2762 		one page, OR the page will become empty: we recommend
2763 		compression if this is not the root page. */
2764 
2765 		no_compress_needed = page_get_page_no(page) == root_page_no;
2766 	} else {
2767 		no_compress_needed = true;
2768 	}
2769 
2770 	if (no_compress_needed) {
2771 #ifdef UNIV_ZIP_DEBUG
2772 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2773 #endif /* UNIV_ZIP_DEBUG */
2774 
2775 		page_cur_delete_rec(pcur, index, offsets, 0);
2776 
2777 #ifdef UNIV_ZIP_DEBUG
2778 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2779 #endif /* UNIV_ZIP_DEBUG */
2780 	}
2781 
2782 	return(no_compress_needed);
2783 }
2784 
2785 /** Get the last non-delete-marked record on a page.
2786 @param[in]	page	index tree leaf page
2787 @return the last record, not delete-marked
2788 @retval infimum record if all records are delete-marked */
2789 const rec_t*
page_find_rec_max_not_deleted(const page_t * page)2790 page_find_rec_max_not_deleted(
2791 	const page_t*	page)
2792 {
2793 	const rec_t*	rec = page_get_infimum_rec(page);
2794 	const rec_t*	prev_rec = NULL; // remove warning
2795 
2796 	/* Because the page infimum is never delete-marked,
2797 	prev_rec will always be assigned to it first. */
2798 	ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec)));
2799 	if (page_is_comp(page)) {
2800 		do {
2801 			if (!rec_get_deleted_flag(rec, true)) {
2802 				prev_rec = rec;
2803 			}
2804 			rec = page_rec_get_next_low(rec, true);
2805 		} while (rec != page + PAGE_NEW_SUPREMUM);
2806 	} else {
2807 		do {
2808 			if (!rec_get_deleted_flag(rec, false)) {
2809 				prev_rec = rec;
2810 			}
2811 			rec = page_rec_get_next_low(rec, false);
2812 		} while (rec != page + PAGE_OLD_SUPREMUM);
2813 	}
2814 	return(prev_rec);
2815 }
2816 
2817 /** Issue a warning when the checksum that is stored in the page is valid,
2818 but different than the global setting innodb_checksum_algorithm.
2819 @param[in]	current_algo	current checksum algorithm
2820 @param[in]	page_checksum	page valid checksum
2821 @param[in]	page_id		page identifier */
2822 void
page_warn_strict_checksum(srv_checksum_algorithm_t curr_algo,srv_checksum_algorithm_t page_checksum,const page_id_t & page_id)2823 page_warn_strict_checksum(
2824 	srv_checksum_algorithm_t	curr_algo,
2825 	srv_checksum_algorithm_t	page_checksum,
2826 	const page_id_t&		page_id)
2827 {
2828 	srv_checksum_algorithm_t	curr_algo_nonstrict = srv_checksum_algorithm_t();
2829 	switch (curr_algo) {
2830 	case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
2831 		curr_algo_nonstrict = SRV_CHECKSUM_ALGORITHM_CRC32;
2832 		break;
2833 	case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
2834 		curr_algo_nonstrict = SRV_CHECKSUM_ALGORITHM_INNODB;
2835 		break;
2836 	case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
2837 		curr_algo_nonstrict = SRV_CHECKSUM_ALGORITHM_NONE;
2838 		break;
2839 	default:
2840 		ut_error;
2841 	}
2842 
2843 	ib::warn() << "innodb_checksum_algorithm is set to \""
2844 		<< buf_checksum_algorithm_name(curr_algo) << "\""
2845 		<< " but the page " << page_id << " contains a valid checksum \""
2846 		<< buf_checksum_algorithm_name(page_checksum) << "\". "
2847 		<< " Accepting the page as valid. Change"
2848 		<< " innodb_checksum_algorithm to \""
2849 		<< buf_checksum_algorithm_name(curr_algo_nonstrict)
2850 		<< "\" to silently accept such pages or rewrite all pages"
2851 		<< " so that they contain \""
2852 		<< buf_checksum_algorithm_name(curr_algo_nonstrict)
2853 		<< "\" checksum.";
2854 }
2855