1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2014, 2021, MariaDB Corporation.
6 
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10 
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 
19 *****************************************************************************/
20 
21 /**************************************************//**
22 @file btr/btr0btr.cc
23 The B-tree
24 
25 Created 6/2/1994 Heikki Tuuri
26 *******************************************************/
27 
28 #include "btr0btr.h"
29 
30 #include "page0page.h"
31 #include "page0zip.h"
32 #include "gis0rtree.h"
33 
34 #include "btr0cur.h"
35 #include "btr0sea.h"
36 #include "btr0pcur.h"
37 #include "btr0defragment.h"
38 #include "rem0cmp.h"
39 #include "lock0lock.h"
40 #include "ibuf0ibuf.h"
41 #include "trx0trx.h"
42 #include "srv0mon.h"
43 #include "gis0geo.h"
44 #include "dict0boot.h"
45 #include "row0sel.h" /* row_search_max_autoinc() */
46 
47 Atomic_counter<uint32_t> btr_validate_index_running;
48 
49 /**************************************************************//**
50 Checks if the page in the cursor can be merged with given page.
51 If necessary, re-organize the merge_page.
52 @return	true if possible to merge. */
53 static
54 bool
55 btr_can_merge_with_page(
56 /*====================*/
57 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
58 	ulint		page_no,	/*!< in: a sibling page */
59 	buf_block_t**	merge_block,	/*!< out: the merge block */
60 	mtr_t*		mtr);		/*!< in: mini-transaction */
61 
62 /** Report that an index page is corrupted.
63 @param[in]	buffer block
64 @param[in]	index tree */
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)65 void btr_corruption_report(const buf_block_t* block, const dict_index_t* index)
66 {
67 	ib::fatal()
68 		<< "Flag mismatch in page " << block->page.id
69 		<< " index " << index->name
70 		<< " of table " << index->table->name;
71 }
72 
73 /*
74 Latching strategy of the InnoDB B-tree
75 --------------------------------------
76 
77 Node pointer page latches acquisition is protected by index->lock latch.
78 
79 Before MariaDB 10.2.2, all node pointer pages were protected by index->lock
80 either in S (shared) or X (exclusive) mode and block->lock was not acquired on
81 node pointer pages.
82 
83 After MariaDB 10.2.2, block->lock S-latch or X-latch is used to protect
84 node pointer pages and obtaiment of node pointer page latches is protected by
85 index->lock.
86 
87 (0) Definition: B-tree level.
88 
89 (0.1) The leaf pages of the B-tree are at level 0.
90 
91 (0.2) The parent of a page at level L has level L+1. (The level of the
92 root page is equal to the tree height.)
93 
94 (0.3) The B-tree lock (index->lock) is the parent of the root page and
95 has a level = tree height + 1.
96 
97 Index->lock has 3 possible locking modes:
98 
99 (1) S-latch:
100 
101 (1.1) All latches for pages must be obtained in descending order of tree level.
102 
103 (1.2) Before obtaining the first node pointer page latch at a given B-tree
104 level, parent latch must be held (at level +1 ).
105 
106 (1.3) If a node pointer page is already latched at the same level
107 we can only obtain latch to its right sibling page latch at the same level.
108 
109 (1.4) Release of the node pointer page latches must be done in
110 child-to-parent order. (Prevents deadlocks when obtained index->lock
111 in SX mode).
112 
113 (1.4.1) Level L node pointer page latch can be released only when
114 no latches at children level i.e. level < L are hold.
115 
116 (1.4.2) All latches from node pointer pages must be released so
117 that no latches are obtained between.
118 
119 (1.5) [implied by (1.1), (1.2)] Root page latch must be first node pointer
120 latch obtained.
121 
122 (2) SX-latch:
123 
124 In this case rules (1.2) and (1.3) from S-latch case are relaxed and
125 merged into (2.2) and rule (1.4) is removed. Thus, latch acquisition
126 can be skipped at some tree levels and latches can be obtained in
127 a less restricted order.
128 
129 (2.1) [identical to (1.1)]: All latches for pages must be obtained in descending
130 order of tree level.
131 
132 (2.2) When a node pointer latch at level L is obtained,
133 the left sibling page latch in the same level or some ancestor
134 page latch (at level > L) must be hold.
135 
136 (2.3) [implied by (2.1), (2.2)] The first node pointer page latch obtained can
137 be any node pointer page.
138 
139 (3) X-latch:
140 
141 Node pointer latches can be obtained in any order.
142 
143 NOTE: New rules after MariaDB 10.2.2 does not affect the latching rules of leaf pages:
144 
145 index->lock S-latch is needed in read for the node pointer traversal. When the leaf
146 level is reached, index-lock can be released (and with the MariaDB 10.2.2 changes, all
147 node pointer latches). Left to right index travelsal in leaf page level can be safely done
148 by obtaining right sibling leaf page latch and then releasing the old page latch.
149 
150 Single leaf page modifications (BTR_MODIFY_LEAF) are protected by index->lock
151 S-latch.
152 
153 B-tree operations involving page splits or merges (BTR_MODIFY_TREE) and page
154 allocations are protected by index->lock X-latch.
155 
156 Node pointers
157 -------------
158 Leaf pages of a B-tree contain the index records stored in the
159 tree. On levels n > 0 we store 'node pointers' to pages on level
160 n - 1. For each page there is exactly one node pointer stored:
161 thus the our tree is an ordinary B-tree, not a B-link tree.
162 
163 A node pointer contains a prefix P of an index record. The prefix
164 is long enough so that it determines an index record uniquely.
165 The file page number of the child page is added as the last
166 field. To the child page we can store node pointers or index records
167 which are >= P in the alphabetical order, but < P1 if there is
168 a next node pointer on the level, and P1 is its prefix.
169 
170 If a node pointer with a prefix P points to a non-leaf child,
171 then the leftmost record in the child must have the same
172 prefix P. If it points to a leaf node, the child is not required
173 to contain any record with a prefix equal to P. The leaf case
174 is decided this way to allow arbitrary deletions in a leaf node
175 without touching upper levels of the tree.
176 
177 We have predefined a special minimum record which we
178 define as the smallest record in any alphabetical order.
179 A minimum record is denoted by setting a bit in the record
180 header. A minimum record acts as the prefix of a node pointer
181 which points to a leftmost node on any level of the tree.
182 
183 File page allocation
184 --------------------
185 In the root node of a B-tree there are two file segment headers.
186 The leaf pages of a tree are allocated from one file segment, to
187 make them consecutive on disk if possible. From the other file segment
188 we allocate pages for the non-leaf levels of the tree.
189 */
190 
191 #ifdef UNIV_BTR_DEBUG
192 /**************************************************************//**
193 Checks a file segment header within a B-tree root page.
194 @return TRUE if valid */
195 static
196 ibool
btr_root_fseg_validate(const fseg_header_t * seg_header,ulint space)197 btr_root_fseg_validate(
198 /*===================*/
199 	const fseg_header_t*	seg_header,	/*!< in: segment header */
200 	ulint			space)		/*!< in: tablespace identifier */
201 {
202 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
203 
204 	ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
205 	ut_a(offset >= FIL_PAGE_DATA);
206 	ut_a(offset <= srv_page_size - FIL_PAGE_DATA_END);
207 	return(TRUE);
208 }
209 #endif /* UNIV_BTR_DEBUG */
210 
211 /**************************************************************//**
212 Gets the root node of a tree and x- or s-latches it.
213 @return root page, x- or s-latched */
214 buf_block_t*
btr_root_block_get(const dict_index_t * index,ulint mode,mtr_t * mtr)215 btr_root_block_get(
216 /*===============*/
217 	const dict_index_t*	index,	/*!< in: index tree */
218 	ulint			mode,	/*!< in: either RW_S_LATCH
219 					or RW_X_LATCH */
220 	mtr_t*			mtr)	/*!< in: mtr */
221 {
222 	if (!index->table || !index->table->space) {
223 		return NULL;
224 	}
225 
226 	buf_block_t*	block = btr_block_get(
227 		page_id_t(index->table->space_id, index->page),
228 		index->table->space->zip_size(), mode,
229 		index, mtr);
230 
231 	if (!block) {
232 		index->table->file_unreadable = true;
233 
234 		ib_push_warning(
235 			static_cast<THD*>(NULL), DB_DECRYPTION_FAILED,
236 			"Table %s in file %s is encrypted but encryption service or"
237 			" used key_id is not available. "
238 			" Can't continue reading table.",
239 			index->table->name.m_name,
240 			UT_LIST_GET_FIRST(index->table->space->chain)->name);
241 
242 		return NULL;
243 	}
244 
245 	btr_assert_not_corrupted(block, index);
246 
247 #ifdef UNIV_BTR_DEBUG
248 	if (!dict_index_is_ibuf(index)) {
249 		const page_t*	root = buf_block_get_frame(block);
250 
251 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
252 					    + root, index->table->space_id));
253 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
254 					    + root, index->table->space_id));
255 	}
256 #endif /* UNIV_BTR_DEBUG */
257 
258 	return(block);
259 }
260 
261 /**************************************************************//**
262 Gets the root node of a tree and sx-latches it for segment access.
263 @return root page, sx-latched */
264 page_t*
btr_root_get(const dict_index_t * index,mtr_t * mtr)265 btr_root_get(
266 /*=========*/
267 	const dict_index_t*	index,	/*!< in: index tree */
268 	mtr_t*			mtr)	/*!< in: mtr */
269 {
270 	/* Intended to be used for segment list access.
271 	SX lock doesn't block reading user data by other threads.
272 	And block the segment list access by others.*/
273 	buf_block_t* root = btr_root_block_get(index, RW_SX_LATCH,
274 					       mtr);
275 	return(root ? buf_block_get_frame(root) : NULL);
276 }
277 
278 /**************************************************************//**
279 Gets the height of the B-tree (the level of the root, when the leaf
280 level is assumed to be 0). The caller must hold an S or X latch on
281 the index.
282 @return tree height (level of the root) */
283 ulint
btr_height_get(const dict_index_t * index,mtr_t * mtr)284 btr_height_get(
285 /*===========*/
286 	const dict_index_t*	index,	/*!< in: index tree */
287 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
288 {
289 	ulint		height=0;
290 	buf_block_t*	root_block;
291 
292 	ut_ad(srv_read_only_mode
293 	      || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
294 					   MTR_MEMO_S_LOCK
295 					   | MTR_MEMO_X_LOCK
296 					   | MTR_MEMO_SX_LOCK));
297 
298 	/* S latches the page */
299 	root_block = btr_root_block_get(index, RW_S_LATCH, mtr);
300 
301 	if (root_block) {
302 		height = btr_page_get_level(buf_block_get_frame(root_block));
303 
304 		/* Release the S latch on the root page. */
305 		mtr->memo_release(root_block, MTR_MEMO_PAGE_S_FIX);
306 
307 		ut_d(sync_check_unlock(&root_block->lock));
308 	}
309 
310 	return(height);
311 }
312 
313 /**************************************************************//**
314 Checks a file segment header within a B-tree root page and updates
315 the segment header space id.
316 @return TRUE if valid */
317 static
318 bool
btr_root_fseg_adjust_on_import(fseg_header_t * seg_header,page_zip_des_t * page_zip,ulint space)319 btr_root_fseg_adjust_on_import(
320 /*===========================*/
321 	fseg_header_t*	seg_header,	/*!< in/out: segment header */
322 	page_zip_des_t*	page_zip,	/*!< in/out: compressed page,
323 					or NULL */
324 	ulint		space)		/*!< in: tablespace identifier */
325 {
326 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
327 
328 	if (offset < FIL_PAGE_DATA
329 	    || offset > srv_page_size - FIL_PAGE_DATA_END) {
330 		return false;
331 	}
332 
333 	seg_header += FSEG_HDR_SPACE;
334 
335 	mach_write_to_4(seg_header, space);
336 	if (UNIV_LIKELY_NULL(page_zip)) {
337 		memcpy(page_zip->data + page_offset(seg_header), seg_header,
338 		       4);
339 	}
340 
341 	return true;
342 }
343 
344 /**************************************************************//**
345 Checks and adjusts the root node of a tree during IMPORT TABLESPACE.
346 @return error code, or DB_SUCCESS */
347 dberr_t
btr_root_adjust_on_import(const dict_index_t * index)348 btr_root_adjust_on_import(
349 /*======================*/
350 	const dict_index_t*	index)	/*!< in: index tree */
351 {
352 	dberr_t			err;
353 	mtr_t			mtr;
354 	page_t*			page;
355 	buf_block_t*		block;
356 	page_zip_des_t*		page_zip;
357 	dict_table_t*		table = index->table;
358 	const page_id_t		page_id(table->space_id, index->page);
359 	const ulint		zip_size = table->space->zip_size();
360 
361 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_3",
362 			return(DB_CORRUPTION););
363 
364 	mtr_start(&mtr);
365 
366 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
367 
368 	block = btr_block_get(page_id, zip_size, RW_X_LATCH, index, &mtr);
369 
370 	page = buf_block_get_frame(block);
371 	page_zip = buf_block_get_page_zip(block);
372 
373 	if (!fil_page_index_page_check(page) || page_has_siblings(page)) {
374 		err = DB_CORRUPTION;
375 
376 	} else if (dict_index_is_clust(index)) {
377 		bool	page_is_compact_format;
378 
379 		page_is_compact_format = page_is_comp(page) > 0;
380 
381 		/* Check if the page format and table format agree. */
382 		if (page_is_compact_format != dict_table_is_comp(table)) {
383 			err = DB_CORRUPTION;
384 		} else {
385 			/* Check that the table flags and the tablespace
386 			flags match. */
387 			ulint tf = dict_tf_to_fsp_flags(table->flags);
388 			ulint sf = table->space->flags;
389 			sf &= ~FSP_FLAGS_MEM_MASK;
390 			tf &= ~FSP_FLAGS_MEM_MASK;
391 			if (fil_space_t::is_flags_equal(tf, sf)
392 			    || fil_space_t::is_flags_equal(sf, tf)) {
393 				mutex_enter(&fil_system.mutex);
394 				table->space->flags = (table->space->flags
395 						       & ~FSP_FLAGS_MEM_MASK)
396 					| (tf & FSP_FLAGS_MEM_MASK);
397 				mutex_exit(&fil_system.mutex);
398 				err = DB_SUCCESS;
399 			} else {
400 				err = DB_CORRUPTION;
401 			}
402 		}
403 	} else {
404 		err = DB_SUCCESS;
405 	}
406 
407 	/* Check and adjust the file segment headers, if all OK so far. */
408 	if (err == DB_SUCCESS
409 	    && (!btr_root_fseg_adjust_on_import(
410 			FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
411 			+ page, page_zip, table->space_id)
412 		|| !btr_root_fseg_adjust_on_import(
413 			FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
414 			+ page, page_zip, table->space_id))) {
415 
416 		err = DB_CORRUPTION;
417 	}
418 
419 	mtr_commit(&mtr);
420 
421 	return(err);
422 }
423 
424 /**************************************************************//**
425 Creates a new index page (not the root, and also not
426 used in page reorganization).  @see btr_page_empty(). */
427 void
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)428 btr_page_create(
429 /*============*/
430 	buf_block_t*	block,	/*!< in/out: page to be created */
431 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
432 	dict_index_t*	index,	/*!< in: index */
433 	ulint		level,	/*!< in: the B-tree level of the page */
434 	mtr_t*		mtr)	/*!< in: mtr */
435 {
436 	page_t*		page = buf_block_get_frame(block);
437 
438 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
439 
440 	if (page_zip) {
441 		page_create_zip(block, index, level, 0, mtr);
442 	} else {
443 		page_create(block, mtr, dict_table_is_comp(index->table),
444 			    dict_index_is_spatial(index));
445 		/* Set the level of the new index page */
446 		btr_page_set_level(page, NULL, level, mtr);
447 	}
448 
449 	/* For Spatial Index, initialize the Split Sequence Number */
450 	if (dict_index_is_spatial(index)) {
451 		page_set_ssn_id(block, page_zip, 0, mtr);
452 	}
453 
454 	btr_page_set_index_id(page, page_zip, index->id, mtr);
455 }
456 
457 /**************************************************************//**
458 Allocates a new file page to be used in an ibuf tree. Takes the page from
459 the free list of the tree, which must contain pages!
460 @return new allocated block, x-latched */
461 static
462 buf_block_t*
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)463 btr_page_alloc_for_ibuf(
464 /*====================*/
465 	dict_index_t*	index,	/*!< in: index tree */
466 	mtr_t*		mtr)	/*!< in: mtr */
467 {
468 	fil_addr_t	node_addr;
469 	page_t*		root;
470 	page_t*		new_page;
471 	buf_block_t*	new_block;
472 
473 	root = btr_root_get(index, mtr);
474 
475 	node_addr = flst_get_first(root + PAGE_HEADER
476 				   + PAGE_BTR_IBUF_FREE_LIST, mtr);
477 	ut_a(node_addr.page != FIL_NULL);
478 
479 	new_block = buf_page_get(
480 		page_id_t(index->table->space_id, node_addr.page),
481 		index->table->space->zip_size(),
482 		RW_X_LATCH, mtr);
483 
484 	new_page = buf_block_get_frame(new_block);
485 	buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
486 
487 	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
488 		    new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
489 		    mtr);
490 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
491 			    mtr));
492 
493 	return(new_block);
494 }
495 
496 /**************************************************************//**
497 Allocates a new file page to be used in an index tree. NOTE: we assume
498 that the caller has made the reservation for free extents!
499 @retval NULL if no page could be allocated
500 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
501 (init_mtr == mtr, or the page was not previously freed in mtr)
502 @retval block (not allocated or initialized) otherwise */
503 static MY_ATTRIBUTE((nonnull, warn_unused_result))
504 buf_block_t*
btr_page_alloc_low(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)505 btr_page_alloc_low(
506 /*===============*/
507 	dict_index_t*	index,		/*!< in: index */
508 	ulint		hint_page_no,	/*!< in: hint of a good page */
509 	byte		file_direction,	/*!< in: direction where a possible
510 					page split is made */
511 	ulint		level,		/*!< in: level where the page is placed
512 					in the tree */
513 	mtr_t*		mtr,		/*!< in/out: mini-transaction
514 					for the allocation */
515 	mtr_t*		init_mtr)	/*!< in/out: mtr or another
516 					mini-transaction in which the
517 					page should be initialized.
518 					If init_mtr!=mtr, but the page
519 					is already X-latched in mtr, do
520 					not initialize the page. */
521 {
522 	fseg_header_t*	seg_header;
523 	page_t*		root;
524 
525 	root = btr_root_get(index, mtr);
526 
527 	if (level == 0) {
528 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
529 	} else {
530 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
531 	}
532 
533 	/* Parameter TRUE below states that the caller has made the
534 	reservation for free extents, and thus we know that a page can
535 	be allocated: */
536 
537 	buf_block_t* block = fseg_alloc_free_page_general(
538 		seg_header, hint_page_no, file_direction,
539 		TRUE, mtr, init_mtr);
540 
541 #ifdef UNIV_DEBUG_SCRUBBING
542 	if (block != NULL) {
543 		fprintf(stderr,
544 			"alloc %lu:%lu to index: %lu root: %lu\n",
545 			buf_block_get_page_no(block),
546 			buf_block_get_space(block),
547 			index->id,
548 			dict_index_get_page(index));
549 	} else {
550 		fprintf(stderr,
551 			"failed alloc index: %lu root: %lu\n",
552 			index->id,
553 			dict_index_get_page(index));
554 	}
555 #endif /* UNIV_DEBUG_SCRUBBING */
556 
557 	return block;
558 }
559 
560 /**************************************************************//**
561 Allocates a new file page to be used in an index tree. NOTE: we assume
562 that the caller has made the reservation for free extents!
563 @retval NULL if no page could be allocated
564 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
565 (init_mtr == mtr, or the page was not previously freed in mtr)
566 @retval block (not allocated or initialized) otherwise */
567 buf_block_t*
btr_page_alloc(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)568 btr_page_alloc(
569 /*===========*/
570 	dict_index_t*	index,		/*!< in: index */
571 	ulint		hint_page_no,	/*!< in: hint of a good page */
572 	byte		file_direction,	/*!< in: direction where a possible
573 					page split is made */
574 	ulint		level,		/*!< in: level where the page is placed
575 					in the tree */
576 	mtr_t*		mtr,		/*!< in/out: mini-transaction
577 					for the allocation */
578 	mtr_t*		init_mtr)	/*!< in/out: mini-transaction
579 					for x-latching and initializing
580 					the page */
581 {
582 	buf_block_t*	new_block;
583 
584 	if (dict_index_is_ibuf(index)) {
585 
586 		return(btr_page_alloc_for_ibuf(index, mtr));
587 	}
588 
589 	new_block = btr_page_alloc_low(
590 		index, hint_page_no, file_direction, level, mtr, init_mtr);
591 
592 	if (new_block) {
593 		buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
594 	}
595 
596 	return(new_block);
597 }
598 
599 /**************************************************************//**
600 Gets the number of pages in a B-tree.
601 @return number of pages, or ULINT_UNDEFINED if the index is unavailable */
602 ulint
btr_get_size(const dict_index_t * index,ulint flag,mtr_t * mtr)603 btr_get_size(
604 /*=========*/
605 	const dict_index_t*	index,	/*!< in: index */
606 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
607 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
608 				is s-latched */
609 {
610 	fseg_header_t*	seg_header;
611 	page_t*		root;
612 	ulint		n=0;
613 	ulint		dummy;
614 
615 	ut_ad(srv_read_only_mode
616 	      || mtr_memo_contains(mtr, dict_index_get_lock(index),
617 				   MTR_MEMO_S_LOCK));
618 
619 	if (index->page == FIL_NULL
620 	    || dict_index_is_online_ddl(index)
621 	    || !index->is_committed()) {
622 		return(ULINT_UNDEFINED);
623 	}
624 
625 	root = btr_root_get(index, mtr);
626 
627 	if (root) {
628 		if (flag == BTR_N_LEAF_PAGES) {
629 			seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
630 
631 			fseg_n_reserved_pages(seg_header, &n, mtr);
632 
633 		} else if (flag == BTR_TOTAL_SIZE) {
634 			seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
635 
636 			n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
637 
638 			seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
639 
640 			n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
641 		} else {
642 			ut_error;
643 		}
644 	} else {
645 		n = ULINT_UNDEFINED;
646 	}
647 
648 	return(n);
649 }
650 
651 /**************************************************************//**
652 Gets the number of reserved and used pages in a B-tree.
653 @return	number of pages reserved, or ULINT_UNDEFINED if the index
654 is unavailable */
655 UNIV_INTERN
656 ulint
btr_get_size_and_reserved(dict_index_t * index,ulint flag,ulint * used,mtr_t * mtr)657 btr_get_size_and_reserved(
658 /*======================*/
659 	dict_index_t*	index,	/*!< in: index */
660 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
661 	ulint*		used,	/*!< out: number of pages used (<= reserved) */
662 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
663 				is s-latched */
664 {
665 	fseg_header_t*	seg_header;
666 	page_t*		root;
667 	ulint		n=ULINT_UNDEFINED;
668 	ulint		dummy;
669 
670 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
671 				MTR_MEMO_S_LOCK));
672 
673 	ut_a(flag == BTR_N_LEAF_PAGES || flag == BTR_TOTAL_SIZE);
674 
675 	if (index->page == FIL_NULL
676 	    || dict_index_is_online_ddl(index)
677 	    || !index->is_committed()) {
678 		return(ULINT_UNDEFINED);
679 	}
680 
681 	root = btr_root_get(index, mtr);
682 	*used = 0;
683 
684 	if (root) {
685 
686 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
687 
688 		n = fseg_n_reserved_pages(seg_header, used, mtr);
689 
690 		if (flag == BTR_TOTAL_SIZE) {
691 			seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
692 
693 			n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
694 			*used += dummy;
695 
696 		}
697 	}
698 
699 	return(n);
700 }
701 
702 /**************************************************************//**
703 Frees a page used in an ibuf tree. Puts the page to the free list of the
704 ibuf tree. */
705 static
706 void
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)707 btr_page_free_for_ibuf(
708 /*===================*/
709 	dict_index_t*	index,	/*!< in: index tree */
710 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
711 	mtr_t*		mtr)	/*!< in: mtr */
712 {
713 	page_t*		root;
714 
715 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
716 	root = btr_root_get(index, mtr);
717 
718 	flst_add_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
719 		       buf_block_get_frame(block)
720 		       + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
721 
722 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
723 			    mtr));
724 }
725 
726 /** Free an index page.
727 @param[in,out]	index	index tree
728 @param[in,out]	block	block to be freed
729 @param[in,out]	mtr	mini-transaction
730 @param[in]	blob	whether this is freeing a BLOB page */
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr,bool blob)731 void btr_page_free(dict_index_t* index, buf_block_t* block, mtr_t* mtr,
732 		   bool blob)
733 {
734 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
735 #ifdef BTR_CUR_HASH_ADAPT
736 	if (block->index && !block->index->freed()) {
737 		ut_ad(!blob);
738 		ut_ad(page_is_leaf(block->frame));
739 	}
740 #endif
741 	ut_ad(index->table->space_id == block->page.id.space());
742 	/* The root page is freed by btr_free_root(). */
743 	ut_ad(block->page.id.page_no() != index->page);
744 	ut_ad(mtr->is_named_space(index->table->space));
745 
746 	/* The page gets invalid for optimistic searches: increment the frame
747 	modify clock */
748 
749 	buf_block_modify_clock_inc(block);
750 
751 	if (dict_index_is_ibuf(index)) {
752 		btr_page_free_for_ibuf(index, block, mtr);
753 		return;
754 	}
755 
756 	/* TODO: Discard any operations for block from mtr->log.
757 	The page will be freed, so previous changes to it by this
758 	mini-transaction should not matter. */
759 	page_t* root = btr_root_get(index, mtr);
760 	fseg_header_t* seg_header = &root[blob || page_is_leaf(block->frame)
761 					  ? PAGE_HEADER + PAGE_BTR_SEG_LEAF
762 					  : PAGE_HEADER + PAGE_BTR_SEG_TOP];
763 	fseg_free_page(seg_header,
764 		       index->table->space, block->page.id.page_no(),
765 		       !block->page.flush_observer, mtr);
766 
767 	/* The page was marked free in the allocation bitmap, but it
768 	should remain exclusively latched until mtr_t::commit() or until it
769 	is explicitly freed from the mini-transaction. */
770 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
771 
772 	/* MDEV-15528 FIXME: Zero out the page after the redo log for
773 	this mini-transaction has been durably written.
774 	This must be done unconditionally if
775 	srv_immediate_scrub_data_uncompressed is set. */
776 }
777 
778 /**************************************************************//**
779 Sets the child node file address in a node pointer. */
780 UNIV_INLINE
781 void
btr_node_ptr_set_child_page_no(rec_t * rec,page_zip_des_t * page_zip,const rec_offs * offsets,ulint page_no,mtr_t * mtr)782 btr_node_ptr_set_child_page_no(
783 /*===========================*/
784 	rec_t*		rec,	/*!< in: node pointer record */
785 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
786 				part will be updated, or NULL */
787 	const rec_offs*	offsets,/*!< in: array returned by rec_get_offsets() */
788 	ulint		page_no,/*!< in: child node address */
789 	mtr_t*		mtr)	/*!< in: mtr */
790 {
791 	byte*	field;
792 	ulint	len;
793 
794 	ut_ad(rec_offs_validate(rec, NULL, offsets));
795 	ut_ad(!page_rec_is_leaf(rec));
796 	ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
797 
798 	/* The child address is in the last field */
799 	field = rec_get_nth_field(rec, offsets,
800 				  rec_offs_n_fields(offsets) - 1, &len);
801 
802 	ut_ad(len == REC_NODE_PTR_SIZE);
803 
804 	if (page_zip) {
805 		page_zip_write_node_ptr(page_zip, rec,
806 					rec_offs_data_size(offsets),
807 					page_no, mtr);
808 	} else {
809 		mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
810 	}
811 }
812 
813 /************************************************************//**
814 Returns the child page of a node pointer and sx-latches it.
815 @return child page, sx-latched */
816 static
817 buf_block_t*
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const rec_offs * offsets,mtr_t * mtr)818 btr_node_ptr_get_child(
819 /*===================*/
820 	const rec_t*	node_ptr,/*!< in: node pointer */
821 	dict_index_t*	index,	/*!< in: index */
822 	const rec_offs*	offsets,/*!< in: array returned by rec_get_offsets() */
823 	mtr_t*		mtr)	/*!< in: mtr */
824 {
825 	ut_ad(rec_offs_validate(node_ptr, index, offsets));
826 	ut_ad(index->table->space_id
827 	      == page_get_space_id(page_align(node_ptr)));
828 
829 	return btr_block_get(
830 		page_id_t(index->table->space_id,
831 			  btr_node_ptr_get_child_page_no(node_ptr, offsets)),
832 		index->table->space->zip_size(),
833 		RW_SX_LATCH, index, mtr);
834 }
835 
836 /************************************************************//**
837 Returns the upper level node pointer to a page. It is assumed that mtr holds
838 an sx-latch on the tree.
839 @return rec_get_offsets() of the node pointer record */
840 static
841 rec_offs*
btr_page_get_father_node_ptr_func(rec_offs * offsets,mem_heap_t * heap,btr_cur_t * cursor,ulint latch_mode,const char * file,unsigned line,mtr_t * mtr)842 btr_page_get_father_node_ptr_func(
843 /*==============================*/
844 	rec_offs*	offsets,/*!< in: work area for the return value */
845 	mem_heap_t*	heap,	/*!< in: memory heap to use */
846 	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
847 				out: cursor on node pointer record,
848 				its page x-latched */
849 	ulint		latch_mode,/*!< in: BTR_CONT_MODIFY_TREE
850 				or BTR_CONT_SEARCH_TREE */
851 	const char*	file,	/*!< in: file name */
852 	unsigned	line,	/*!< in: line where called */
853 	mtr_t*		mtr)	/*!< in: mtr */
854 {
855 	dtuple_t*	tuple;
856 	rec_t*		user_rec;
857 	rec_t*		node_ptr;
858 	ulint		level;
859 	ulint		page_no;
860 	dict_index_t*	index;
861 
862 	ut_ad(latch_mode == BTR_CONT_MODIFY_TREE
863 	      || latch_mode == BTR_CONT_SEARCH_TREE);
864 
865 	page_no = btr_cur_get_block(cursor)->page.id.page_no();
866 	index = btr_cur_get_index(cursor);
867 	ut_ad(!dict_index_is_spatial(index));
868 
869 	ut_ad(srv_read_only_mode
870 	      || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
871 					   MTR_MEMO_X_LOCK
872 					   | MTR_MEMO_SX_LOCK));
873 
874 	ut_ad(dict_index_get_page(index) != page_no);
875 
876 	level = btr_page_get_level(btr_cur_get_page(cursor));
877 
878 	user_rec = btr_cur_get_rec(cursor);
879 	ut_a(page_rec_is_user_rec(user_rec));
880 
881 	tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
882 	dberr_t err = DB_SUCCESS;
883 
884 	err = btr_cur_search_to_nth_level(
885 		index, level + 1, tuple,
886 		PAGE_CUR_LE, latch_mode, cursor, 0,
887 		file, line, mtr);
888 
889 	if (err != DB_SUCCESS) {
890 		ib::warn() << " Error code: " << err
891 			<< " btr_page_get_father_node_ptr_func "
892 			<< " level: " << level + 1
893 			<< " called from file: "
894 			<< file << " line: " << line
895 			<< " table: " << index->table->name
896 			<< " index: " << index->name();
897 	}
898 
899 	node_ptr = btr_cur_get_rec(cursor);
900 
901 	offsets = rec_get_offsets(node_ptr, index, offsets, 0,
902 				  ULINT_UNDEFINED, &heap);
903 
904 	if (btr_node_ptr_get_child_page_no(node_ptr, offsets) != page_no) {
905 		rec_t*	print_rec;
906 
907 		ib::error()
908 			<< "Corruption of an index tree: table "
909 			<< index->table->name
910 			<< " index " << index->name
911 			<< ", father ptr page no "
912 			<< btr_node_ptr_get_child_page_no(node_ptr, offsets)
913 			<< ", child page no " << page_no;
914 
915 		print_rec = page_rec_get_next(
916 			page_get_infimum_rec(page_align(user_rec)));
917 		offsets = rec_get_offsets(print_rec, index, offsets,
918 					  page_rec_is_leaf(user_rec)
919 					  ? index->n_core_fields : 0,
920 					  ULINT_UNDEFINED, &heap);
921 		page_rec_print(print_rec, offsets);
922 		offsets = rec_get_offsets(node_ptr, index, offsets, 0,
923 					  ULINT_UNDEFINED, &heap);
924 		page_rec_print(node_ptr, offsets);
925 
926 		ib::fatal()
927 			<< "You should dump + drop + reimport the table to"
928 			<< " fix the corruption. If the crash happens at"
929 			<< " database startup. " << FORCE_RECOVERY_MSG
930 			<< " Then dump + drop + reimport.";
931 	}
932 
933 	return(offsets);
934 }
935 
936 #define btr_page_get_father_node_ptr(of,heap,cur,mtr)			\
937 	btr_page_get_father_node_ptr_func(				\
938 		of,heap,cur,BTR_CONT_MODIFY_TREE,__FILE__,__LINE__,mtr)
939 
940 #define btr_page_get_father_node_ptr_for_validate(of,heap,cur,mtr)	\
941 	btr_page_get_father_node_ptr_func(				\
942 		of,heap,cur,BTR_CONT_SEARCH_TREE,__FILE__,__LINE__,mtr)
943 
944 /************************************************************//**
945 Returns the upper level node pointer to a page. It is assumed that mtr holds
946 an x-latch on the tree.
947 @return rec_get_offsets() of the node pointer record */
948 static
949 rec_offs*
btr_page_get_father_block(rec_offs * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)950 btr_page_get_father_block(
951 /*======================*/
952 	rec_offs*	offsets,/*!< in: work area for the return value */
953 	mem_heap_t*	heap,	/*!< in: memory heap to use */
954 	dict_index_t*	index,	/*!< in: b-tree index */
955 	buf_block_t*	block,	/*!< in: child page in the index */
956 	mtr_t*		mtr,	/*!< in: mtr */
957 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
958 				its page x-latched */
959 {
960 	rec_t*	rec
961 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
962 								 block)));
963 	btr_cur_position(index, rec, block, cursor);
964 	return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
965 }
966 
967 /** Seek to the parent page of a B-tree page.
968 @param[in,out]	index	b-tree
969 @param[in]	block	child page
970 @param[in,out]	mtr	mini-transaction
971 @param[out]	cursor	cursor pointing to the x-latched parent page */
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)972 void btr_page_get_father(dict_index_t* index, buf_block_t* block, mtr_t* mtr,
973 			 btr_cur_t* cursor)
974 {
975 	mem_heap_t*	heap;
976 	rec_t*		rec
977 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
978 								 block)));
979 	btr_cur_position(index, rec, block, cursor);
980 
981 	heap = mem_heap_create(100);
982 	btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
983 	mem_heap_free(heap);
984 }
985 
986 /** PAGE_INDEX_ID value for freed index B-trees */
987 static const index_id_t	BTR_FREED_INDEX_ID = 0;
988 
989 /** Free a B-tree root page. btr_free_but_not_root() must already
990 have been called.
991 In a persistent tablespace, the caller must invoke fsp_init_file_page()
992 before mtr.commit().
993 @param[in,out]	block		index root page
994 @param[in,out]	mtr		mini-transaction
995 @param[in]	invalidate	whether to invalidate PAGE_INDEX_ID */
btr_free_root(buf_block_t * block,mtr_t * mtr,bool invalidate)996 static void btr_free_root(buf_block_t* block, mtr_t* mtr, bool invalidate)
997 {
998 	fseg_header_t*	header;
999 
1000 	ut_ad(mtr_memo_contains_flagged(mtr, block, MTR_MEMO_PAGE_X_FIX
1001 					| MTR_MEMO_PAGE_SX_FIX));
1002 	ut_ad(mtr->is_named_space(block->page.id.space()));
1003 
1004 	btr_search_drop_page_hash_index(block);
1005 
1006 	header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1007 #ifdef UNIV_BTR_DEBUG
1008 	ut_a(btr_root_fseg_validate(header, block->page.id.space()));
1009 #endif /* UNIV_BTR_DEBUG */
1010 	if (invalidate) {
1011 		btr_page_set_index_id(
1012 			buf_block_get_frame(block),
1013 			buf_block_get_page_zip(block),
1014 			BTR_FREED_INDEX_ID, mtr);
1015 	}
1016 
1017 	while (!fseg_free_step(header, mtr)) {
1018 		/* Free the entire segment in small steps. */
1019 	}
1020 }
1021 
1022 /** Prepare to free a B-tree.
1023 @param[in]	page_id		page id
1024 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
1025 @param[in]	index_id	PAGE_INDEX_ID contents
1026 @param[in,out]	mtr		mini-transaction
1027 @return root block, to invoke btr_free_but_not_root() and btr_free_root()
1028 @retval NULL if the page is no longer a matching B-tree page */
1029 static MY_ATTRIBUTE((warn_unused_result))
1030 buf_block_t*
btr_free_root_check(const page_id_t page_id,ulint zip_size,index_id_t index_id,mtr_t * mtr)1031 btr_free_root_check(
1032 	const page_id_t		page_id,
1033 	ulint			zip_size,
1034 	index_id_t		index_id,
1035 	mtr_t*			mtr)
1036 {
1037 	ut_ad(page_id.space() != SRV_TMP_SPACE_ID);
1038 	ut_ad(index_id != BTR_FREED_INDEX_ID);
1039 
1040 	buf_block_t*	block = buf_page_get(
1041 		page_id, zip_size, RW_X_LATCH, mtr);
1042 
1043 	if (block) {
1044 		buf_block_dbg_add_level(block, SYNC_TREE_NODE);
1045 
1046 		if (fil_page_index_page_check(block->frame)
1047 		    && index_id == btr_page_get_index_id(block->frame)) {
1048 			/* This should be a root page.
1049 			It should not be possible to reassign the same
1050 			index_id for some other index in the tablespace. */
1051 			ut_ad(!page_has_siblings(block->frame));
1052 		} else {
1053 			block = NULL;
1054 		}
1055 	}
1056 
1057 	return(block);
1058 }
1059 
1060 /** Create the root node for a new index tree.
1061 @param[in]	type			type of the index
1062 @param[in]	index_id		index id
1063 @param[in,out]	space			tablespace where created
1064 @param[in]	index			index
1065 @param[in,out]	mtr			mini-transaction
1066 @return	page number of the created root
1067 @retval	FIL_NULL	if did not succeed */
1068 ulint
btr_create(ulint type,fil_space_t * space,index_id_t index_id,dict_index_t * index,mtr_t * mtr)1069 btr_create(
1070 	ulint			type,
1071 	fil_space_t*		space,
1072 	index_id_t		index_id,
1073 	dict_index_t*		index,
1074 	mtr_t*			mtr)
1075 {
1076 	buf_block_t*		block;
1077 	page_t*			page;
1078 	page_zip_des_t*		page_zip;
1079 
1080 	ut_ad(mtr->is_named_space(space));
1081 	ut_ad(index_id != BTR_FREED_INDEX_ID);
1082 
1083 	/* Create the two new segments (one, in the case of an ibuf tree) for
1084 	the index tree; the segment headers are put on the allocated root page
1085 	(for an ibuf tree, not in the root, but on a separate ibuf header
1086 	page) */
1087 
1088 	if (UNIV_UNLIKELY(type & DICT_IBUF)) {
1089 		/* Allocate first the ibuf header page */
1090 		buf_block_t*	ibuf_hdr_block = fseg_create(
1091 			space, IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
1092 
1093 		if (ibuf_hdr_block == NULL) {
1094 			return(FIL_NULL);
1095 		}
1096 
1097 		buf_block_dbg_add_level(
1098 			ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
1099 
1100 		ut_ad(ibuf_hdr_block->page.id.page_no()
1101 		      == IBUF_HEADER_PAGE_NO);
1102 		/* Allocate then the next page to the segment: it will be the
1103 		tree root page */
1104 
1105 		block = fseg_alloc_free_page(
1106 			buf_block_get_frame(ibuf_hdr_block)
1107 			+ IBUF_HEADER + IBUF_TREE_SEG_HEADER,
1108 			IBUF_TREE_ROOT_PAGE_NO,
1109 			FSP_UP, mtr);
1110 
1111 		if (block == NULL) {
1112 			return(FIL_NULL);
1113 		}
1114 
1115 		ut_ad(block->page.id.page_no() == IBUF_TREE_ROOT_PAGE_NO);
1116 
1117 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
1118 
1119 		flst_init(block, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
1120 	} else {
1121 		block = fseg_create(space,
1122 				    PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
1123 
1124 		if (block == NULL) {
1125 			return(FIL_NULL);
1126 		}
1127 
1128 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1129 
1130 		if (!fseg_create(space,
1131 				 PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr,
1132 				 false, block)) {
1133 			/* Not enough space for new segment, free root
1134 			segment before return. */
1135 			btr_free_root(block, mtr,
1136 				      !index->table->is_temporary());
1137 			return(FIL_NULL);
1138 		}
1139 
1140 		/* The fseg create acquires a second latch on the page,
1141 		therefore we must declare it: */
1142 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1143 	}
1144 
1145 	/* Create a new index page on the allocated segment page */
1146 	page_zip = buf_block_get_page_zip(block);
1147 
1148 	if (page_zip) {
1149 		page = page_create_zip(block, index, 0, 0, mtr);
1150 	} else {
1151 		page = page_create(block, mtr,
1152 				   dict_table_is_comp(index->table),
1153 				   dict_index_is_spatial(index));
1154 		/* Set the level of the new index page */
1155 		btr_page_set_level(page, NULL, 0, mtr);
1156 	}
1157 
1158 	/* Set the index id of the page */
1159 	btr_page_set_index_id(page, page_zip, index_id, mtr);
1160 
1161 	/* Set the next node and previous node fields */
1162 	compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4);
1163 	compile_time_assert(FIL_NULL == 0xffffffff);
1164 #if MYSQL_VERSION_ID < 100500
1165 	if (UNIV_LIKELY_NULL(page_zip)) {
1166 		/* Avoid tripping the ut_a() in mlog_parse_nbytes()
1167 		when crash-downgrading to an earlier MariaDB 10.4 version. */
1168 		btr_page_set_next(page, page_zip, FIL_NULL, mtr);
1169 		btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
1170 	} else {
1171 		mlog_memset(block, FIL_PAGE_PREV, 8, 0xff, mtr);
1172 	}
1173 #else
1174 	mlog_memset(block, FIL_PAGE_PREV, 8, 0xff, mtr);
1175 	if (UNIV_LIKELY_NULL(page_zip)) {
1176 		memset(page_zip->data + FIL_PAGE_PREV, 0xff, 8);
1177 	}
1178 #endif
1179 
1180 	/* We reset the free bits for the page in a separate
1181 	mini-transaction to allow creation of several trees in the
1182 	same mtr, otherwise the latch on a bitmap page would prevent
1183 	it because of the latching order.
1184 
1185 	Note: Insert Buffering is disabled for temporary tables given that
1186 	most temporary tables are smaller in size and short-lived. */
1187 	if (!(type & DICT_CLUSTERED) && !index->table->is_temporary()) {
1188 		ibuf_reset_free_bits(block);
1189 	}
1190 
1191 	/* In the following assertion we test that two records of maximum
1192 	allowed size fit on the root page: this fact is needed to ensure
1193 	correctness of split algorithms */
1194 
1195 	ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);
1196 
1197 	return(block->page.id.page_no());
1198 }
1199 
1200 /** Free a B-tree except the root page. The root page MUST be freed after
1201 this by calling btr_free_root.
1202 @param[in,out]	block		root page
1203 @param[in]	log_mode	mtr logging mode */
1204 static
1205 void
btr_free_but_not_root(buf_block_t * block,mtr_log_t log_mode)1206 btr_free_but_not_root(
1207 	buf_block_t*	block,
1208 	mtr_log_t	log_mode)
1209 {
1210 	ibool	finished;
1211 	mtr_t	mtr;
1212 
1213 	ut_ad(fil_page_index_page_check(block->frame));
1214 	ut_ad(!page_has_siblings(block->frame));
1215 leaf_loop:
1216 	mtr_start(&mtr);
1217 	mtr_set_log_mode(&mtr, log_mode);
1218 	mtr.set_named_space_id(block->page.id.space());
1219 
1220 	page_t*	root = block->frame;
1221 
1222 	if (!root) {
1223 		mtr_commit(&mtr);
1224 		return;
1225 	}
1226 
1227 #ifdef UNIV_BTR_DEBUG
1228 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1229 				    + root, block->page.id.space()));
1230 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1231 				    + root, block->page.id.space()));
1232 #endif /* UNIV_BTR_DEBUG */
1233 
1234 	/* NOTE: page hash indexes are dropped when a page is freed inside
1235 	fsp0fsp. */
1236 
1237 	finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
1238 				  &mtr);
1239 	mtr_commit(&mtr);
1240 
1241 	if (!finished) {
1242 
1243 		goto leaf_loop;
1244 	}
1245 top_loop:
1246 	mtr_start(&mtr);
1247 	mtr_set_log_mode(&mtr, log_mode);
1248 	mtr.set_named_space_id(block->page.id.space());
1249 
1250 	root = block->frame;
1251 
1252 #ifdef UNIV_BTR_DEBUG
1253 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1254 				    + root, block->page.id.space()));
1255 #endif /* UNIV_BTR_DEBUG */
1256 
1257 	finished = fseg_free_step_not_header(
1258 		root + PAGE_HEADER + PAGE_BTR_SEG_TOP, &mtr);
1259 	mtr_commit(&mtr);
1260 
1261 	if (!finished) {
1262 		goto top_loop;
1263 	}
1264 }
1265 
1266 /** Free a persistent index tree if it exists.
1267 @param[in]	page_id		root page id
1268 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
1269 @param[in]	index_id	PAGE_INDEX_ID contents
1270 @param[in,out]	mtr		mini-transaction */
1271 void
btr_free_if_exists(const page_id_t page_id,ulint zip_size,index_id_t index_id,mtr_t * mtr)1272 btr_free_if_exists(
1273 	const page_id_t		page_id,
1274 	ulint			zip_size,
1275 	index_id_t		index_id,
1276 	mtr_t*			mtr)
1277 {
1278 	buf_block_t* root = btr_free_root_check(
1279 		page_id, zip_size, index_id, mtr);
1280 
1281 	if (root == NULL) {
1282 		return;
1283 	}
1284 
1285 	btr_free_but_not_root(root, mtr->get_log_mode());
1286 	mtr->set_named_space_id(page_id.space());
1287 	btr_free_root(root, mtr, true);
1288 }
1289 
1290 /** Free an index tree in a temporary tablespace.
1291 @param[in]	page_id		root page id */
btr_free(const page_id_t page_id)1292 void btr_free(const page_id_t page_id)
1293 {
1294 	mtr_t		mtr;
1295 	mtr.start();
1296 	mtr.set_log_mode(MTR_LOG_NO_REDO);
1297 
1298 	buf_block_t*	block = buf_page_get(page_id, 0, RW_X_LATCH, &mtr);
1299 
1300 	if (block) {
1301 		btr_free_but_not_root(block, MTR_LOG_NO_REDO);
1302 		btr_free_root(block, &mtr, false);
1303 	}
1304 	mtr.commit();
1305 }
1306 
1307 /** Read the last used AUTO_INCREMENT value from PAGE_ROOT_AUTO_INC.
1308 @param[in,out]	index	clustered index
1309 @return	the last used AUTO_INCREMENT value
1310 @retval	0 on error or if no AUTO_INCREMENT value was used yet */
1311 ib_uint64_t
btr_read_autoinc(dict_index_t * index)1312 btr_read_autoinc(dict_index_t* index)
1313 {
1314 	ut_ad(index->is_primary());
1315 	ut_ad(index->table->persistent_autoinc);
1316 	ut_ad(!index->table->is_temporary());
1317 	mtr_t		mtr;
1318 	mtr.start();
1319 	ib_uint64_t	autoinc;
1320 	if (buf_block_t* block = buf_page_get(
1321 		    page_id_t(index->table->space_id, index->page),
1322 		    index->table->space->zip_size(),
1323 		    RW_S_LATCH, &mtr)) {
1324 		autoinc = page_get_autoinc(block->frame);
1325 	} else {
1326 		autoinc = 0;
1327 	}
1328 	mtr.commit();
1329 	return autoinc;
1330 }
1331 
1332 /** Read the last used AUTO_INCREMENT value from PAGE_ROOT_AUTO_INC,
1333 or fall back to MAX(auto_increment_column).
1334 @param[in]	table	table containing an AUTO_INCREMENT column
1335 @param[in]	col_no	index of the AUTO_INCREMENT column
1336 @return	the AUTO_INCREMENT value
1337 @retval	0 on error or if no AUTO_INCREMENT value was used yet */
1338 ib_uint64_t
btr_read_autoinc_with_fallback(const dict_table_t * table,unsigned col_no)1339 btr_read_autoinc_with_fallback(const dict_table_t* table, unsigned col_no)
1340 {
1341 	ut_ad(table->persistent_autoinc);
1342 	ut_ad(!table->is_temporary());
1343 
1344 	dict_index_t*	index = dict_table_get_first_index(table);
1345 
1346 	if (index == NULL) {
1347 		return 0;
1348 	}
1349 
1350 	mtr_t		mtr;
1351 	mtr.start();
1352 	buf_block_t*	block = buf_page_get(
1353 		page_id_t(index->table->space_id, index->page),
1354 		index->table->space->zip_size(),
1355 		RW_S_LATCH, &mtr);
1356 
1357 	ib_uint64_t	autoinc	= block ? page_get_autoinc(block->frame) : 0;
1358 	const bool	retry	= block && autoinc == 0
1359 		&& !page_is_empty(block->frame);
1360 	mtr.commit();
1361 
1362 	if (retry) {
1363 		/* This should be an old data file where
1364 		PAGE_ROOT_AUTO_INC was initialized to 0.
1365 		Fall back to reading MAX(autoinc_col).
1366 		There should be an index on it. */
1367 		const dict_col_t*	autoinc_col
1368 			= dict_table_get_nth_col(table, col_no);
1369 		while (index && index->fields[0].col != autoinc_col) {
1370 			index = dict_table_get_next_index(index);
1371 		}
1372 
1373 		if (index) {
1374 			autoinc = row_search_max_autoinc(index);
1375 		}
1376 	}
1377 
1378 	return autoinc;
1379 }
1380 
1381 /** Write the next available AUTO_INCREMENT value to PAGE_ROOT_AUTO_INC.
1382 @param[in,out]	index	clustered index
1383 @param[in]	autoinc	the AUTO_INCREMENT value
1384 @param[in]	reset	whether to reset the AUTO_INCREMENT
1385 			to a possibly smaller value than currently
1386 			exists in the page */
1387 void
btr_write_autoinc(dict_index_t * index,ib_uint64_t autoinc,bool reset)1388 btr_write_autoinc(dict_index_t* index, ib_uint64_t autoinc, bool reset)
1389 {
1390 	ut_ad(index->is_primary());
1391 	ut_ad(index->table->persistent_autoinc);
1392 	ut_ad(!index->table->is_temporary());
1393 
1394 	mtr_t		mtr;
1395 	mtr.start();
1396 	fil_space_t* space = index->table->space;
1397 	mtr.set_named_space(space);
1398 	page_set_autoinc(buf_page_get(page_id_t(space->id, index->page),
1399 				      space->zip_size(),
1400 				      RW_SX_LATCH, &mtr),
1401 			 index, autoinc, &mtr, reset);
1402 	mtr.commit();
1403 }
1404 
1405 /*************************************************************//**
1406 Reorganizes an index page.
1407 
1408 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1409 if this is a compressed leaf page in a secondary index. This has to
1410 be done either within the same mini-transaction, or by invoking
1411 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1412 IBUF_BITMAP_FREE is unaffected by reorganization.
1413 
1414 @retval true if the operation was successful
1415 @retval false if it is a compressed page, and recompression failed */
1416 bool
btr_page_reorganize_low(bool recovery,ulint z_level,page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1417 btr_page_reorganize_low(
1418 /*====================*/
1419 	bool		recovery,/*!< in: true if called in recovery:
1420 				locks should not be updated, i.e.,
1421 				there cannot exist locks on the
1422 				page, and a hash index should not be
1423 				dropped: it cannot exist */
1424 	ulint		z_level,/*!< in: compression level to be used
1425 				if dealing with compressed page */
1426 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1427 	dict_index_t*	index,	/*!< in: the index tree of the page */
1428 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1429 {
1430 	buf_block_t*	block		= page_cur_get_block(cursor);
1431 	buf_pool_t*	buf_pool	= buf_pool_from_bpage(&block->page);
1432 	page_t*		page		= buf_block_get_frame(block);
1433 	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
1434 	buf_block_t*	temp_block;
1435 	page_t*		temp_page;
1436 	ulint		data_size1;
1437 	ulint		data_size2;
1438 	ulint		max_ins_size1;
1439 	ulint		max_ins_size2;
1440 	bool		success		= false;
1441 	ulint		pos;
1442 	bool		log_compressed;
1443 	bool		is_spatial;
1444 
1445 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1446 	btr_assert_not_corrupted(block, index);
1447 	ut_ad(fil_page_index_page_check(block->frame));
1448 	ut_ad(index->is_dummy
1449 	      || block->page.id.space() == index->table->space->id);
1450 	ut_ad(index->is_dummy
1451 	      || block->page.id.page_no() != index->page
1452 	      || !page_has_siblings(page));
1453 #ifdef UNIV_ZIP_DEBUG
1454 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1455 #endif /* UNIV_ZIP_DEBUG */
1456 	data_size1 = page_get_data_size(page);
1457 	max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);
1458 	/* Turn logging off */
1459 	mtr_log_t	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1460 
1461 	temp_block = buf_block_alloc(buf_pool);
1462 	temp_page = temp_block->frame;
1463 
1464 	MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
1465 
1466 	/* This function can be called by log redo with a "dummy" index.
1467 	So we would trust more on the original page's type */
1468 	is_spatial = (fil_page_get_type(page) == FIL_PAGE_RTREE
1469 		      || dict_index_is_spatial(index));
1470 
1471 	/* Copy the old page to temporary space */
1472 	buf_frame_copy(temp_page, page);
1473 
1474 	if (!recovery) {
1475 		btr_search_drop_page_hash_index(block);
1476 	}
1477 
1478 	/* Save the cursor position. */
1479 	pos = page_rec_get_n_recs_before(page_cur_get_rec(cursor));
1480 
1481 	/* Recreate the page: note that global data on page (possible
1482 	segment headers, next page-field, etc.) is preserved intact */
1483 
1484 	page_create(block, mtr, dict_table_is_comp(index->table), is_spatial);
1485 
1486 	/* Copy the records from the temporary space to the recreated page;
1487 	do not copy the lock bits yet */
1488 
1489 	page_copy_rec_list_end_no_locks(block, temp_block,
1490 					page_get_infimum_rec(temp_page),
1491 					index, mtr);
1492 
1493 	/* Copy the PAGE_MAX_TRX_ID or PAGE_ROOT_AUTO_INC. */
1494 	memcpy(page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
1495 	       temp_page + (PAGE_HEADER + PAGE_MAX_TRX_ID), 8);
1496 	/* PAGE_MAX_TRX_ID is unused in clustered index pages
1497 	(other than the root where it is repurposed as PAGE_ROOT_AUTO_INC),
1498 	non-leaf pages, and in temporary tables. It was always
1499 	zero-initialized in page_create() in all InnoDB versions.
1500 	PAGE_MAX_TRX_ID must be nonzero on dict_index_is_sec_or_ibuf()
1501 	leaf pages.
1502 
1503 	During redo log apply, dict_index_is_sec_or_ibuf() always
1504 	holds, even for clustered indexes. */
1505 	ut_ad(recovery || index->table->is_temporary()
1506 	      || !page_is_leaf(temp_page)
1507 	      || !dict_index_is_sec_or_ibuf(index)
1508 	      || page_get_max_trx_id(page) != 0);
1509 	/* PAGE_MAX_TRX_ID must be zero on non-leaf pages other than
1510 	clustered index root pages. */
1511 	ut_ad(recovery
1512 	      || page_get_max_trx_id(page) == 0
1513 	      || (dict_index_is_sec_or_ibuf(index)
1514 		  ? page_is_leaf(temp_page)
1515 		  : block->page.id.page_no() == index->page));
1516 
1517 	/* If innodb_log_compressed_pages is ON, page reorganize should log the
1518 	compressed page image.*/
1519 	log_compressed = page_zip && page_zip_log_pages;
1520 
1521 	if (log_compressed) {
1522 		mtr_set_log_mode(mtr, log_mode);
1523 	}
1524 
1525 	if (page_zip
1526 	    && !page_zip_compress(page_zip, page, index, z_level, mtr)) {
1527 
1528 		/* Restore the old page and exit. */
1529 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1530 		/* Check that the bytes that we skip are identical. */
1531 		ut_a(!memcmp(page, temp_page, PAGE_HEADER));
1532 		ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
1533 			     PAGE_HEADER + PAGE_N_RECS + temp_page,
1534 			     PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
1535 		ut_a(!memcmp(srv_page_size - FIL_PAGE_DATA_END + page,
1536 			     srv_page_size - FIL_PAGE_DATA_END + temp_page,
1537 			     FIL_PAGE_DATA_END));
1538 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1539 
1540 		memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
1541 		       PAGE_N_RECS - PAGE_N_DIR_SLOTS);
1542 		memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
1543 		       srv_page_size - PAGE_DATA - FIL_PAGE_DATA_END);
1544 
1545 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1546 		ut_a(!memcmp(page, temp_page, srv_page_size));
1547 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1548 
1549 		goto func_exit;
1550 	}
1551 
1552 	data_size2 = page_get_data_size(page);
1553 	max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);
1554 
1555 	if (data_size1 != data_size2 || max_ins_size1 != max_ins_size2) {
1556 		ib::error()
1557 			<< "Page old data size " << data_size1
1558 			<< " new data size " << data_size2
1559 			<< ", page old max ins size " << max_ins_size1
1560 			<< " new max ins size " << max_ins_size2;
1561 
1562 		ib::error() << BUG_REPORT_MSG;
1563 		ut_ad(0);
1564 	} else {
1565 		success = true;
1566 	}
1567 
1568 	/* Restore the cursor position. */
1569 	if (pos > 0) {
1570 		cursor->rec = page_rec_get_nth(page, pos);
1571 	} else {
1572 		ut_ad(cursor->rec == page_get_infimum_rec(page));
1573 	}
1574 
1575 #ifdef UNIV_ZIP_DEBUG
1576 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1577 #endif /* UNIV_ZIP_DEBUG */
1578 
1579 	if (!recovery) {
1580 		if (block->page.id.page_no() == index->page
1581 		    && fil_page_get_type(temp_page) == FIL_PAGE_TYPE_INSTANT) {
1582 			/* Preserve the PAGE_INSTANT information. */
1583 			ut_ad(!page_zip);
1584 			ut_ad(index->is_instant());
1585 			memcpy(FIL_PAGE_TYPE + page,
1586 			       FIL_PAGE_TYPE + temp_page, 2);
1587 			memcpy(PAGE_HEADER + PAGE_INSTANT + page,
1588 			       PAGE_HEADER + PAGE_INSTANT + temp_page, 2);
1589 			if (!index->table->instant) {
1590 			} else if (page_is_comp(page)) {
1591 				memcpy(PAGE_NEW_INFIMUM + page,
1592 				       PAGE_NEW_INFIMUM + temp_page, 8);
1593 				memcpy(PAGE_NEW_SUPREMUM + page,
1594 				       PAGE_NEW_SUPREMUM + temp_page, 8);
1595 			} else {
1596 				memcpy(PAGE_OLD_INFIMUM + page,
1597 				       PAGE_OLD_INFIMUM + temp_page, 8);
1598 				memcpy(PAGE_OLD_SUPREMUM + page,
1599 				       PAGE_OLD_SUPREMUM + temp_page, 8);
1600 			}
1601 		}
1602 
1603 		if (!dict_table_is_locking_disabled(index->table)) {
1604 			/* Update the record lock bitmaps */
1605 			lock_move_reorganize_page(block, temp_block);
1606 		}
1607 	}
1608 
1609 func_exit:
1610 	buf_block_free(temp_block);
1611 
1612 	/* Restore logging mode */
1613 	mtr_set_log_mode(mtr, log_mode);
1614 
1615 	if (success) {
1616 		mlog_id_t	type;
1617 		byte*		log_ptr;
1618 
1619 		/* Write the log record */
1620 		if (page_zip) {
1621 			ut_ad(page_is_comp(page));
1622 			type = MLOG_ZIP_PAGE_REORGANIZE;
1623 		} else if (page_is_comp(page)) {
1624 			type = MLOG_COMP_PAGE_REORGANIZE;
1625 		} else {
1626 			type = MLOG_PAGE_REORGANIZE;
1627 		}
1628 
1629 		log_ptr = log_compressed
1630 			? NULL
1631 			: mlog_open_and_write_index(
1632 				mtr, page, index, type,
1633 				page_zip ? 1 : 0);
1634 
1635 		/* For compressed pages write the compression level. */
1636 		if (log_ptr && page_zip) {
1637 			mach_write_to_1(log_ptr, z_level);
1638 			mlog_close(mtr, log_ptr + 1);
1639 		}
1640 
1641 		MONITOR_INC(MONITOR_INDEX_REORG_SUCCESSFUL);
1642 	}
1643 
1644 	if (UNIV_UNLIKELY(fil_page_get_type(page) == FIL_PAGE_TYPE_INSTANT)) {
1645 		/* Log the PAGE_INSTANT information. */
1646 		ut_ad(!page_zip);
1647 		ut_ad(index->is_instant());
1648 		ut_ad(!recovery);
1649 		mlog_write_ulint(FIL_PAGE_TYPE + page, FIL_PAGE_TYPE_INSTANT,
1650 				 MLOG_2BYTES, mtr);
1651 		mlog_write_ulint(PAGE_HEADER + PAGE_INSTANT + page,
1652 				 mach_read_from_2(PAGE_HEADER + PAGE_INSTANT
1653 						  + page),
1654 				 MLOG_2BYTES, mtr);
1655 		if (!index->table->instant) {
1656 		} else if (page_is_comp(page)) {
1657 			mlog_log_string(PAGE_NEW_INFIMUM + page, 8, mtr);
1658 			mlog_log_string(PAGE_NEW_SUPREMUM + page, 8, mtr);
1659 		} else {
1660 			mlog_log_string(PAGE_OLD_INFIMUM + page, 8, mtr);
1661 			mlog_log_string(PAGE_OLD_SUPREMUM + page, 8, mtr);
1662 		}
1663 	}
1664 
1665 	return(success);
1666 }
1667 
1668 /*************************************************************//**
1669 Reorganizes an index page.
1670 
1671 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1672 if this is a compressed leaf page in a secondary index. This has to
1673 be done either within the same mini-transaction, or by invoking
1674 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1675 IBUF_BITMAP_FREE is unaffected by reorganization.
1676 
1677 @retval true if the operation was successful
1678 @retval false if it is a compressed page, and recompression failed */
1679 bool
btr_page_reorganize_block(bool recovery,ulint z_level,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1680 btr_page_reorganize_block(
1681 /*======================*/
1682 	bool		recovery,/*!< in: true if called in recovery:
1683 				locks should not be updated, i.e.,
1684 				there cannot exist locks on the
1685 				page, and a hash index should not be
1686 				dropped: it cannot exist */
1687 	ulint		z_level,/*!< in: compression level to be used
1688 				if dealing with compressed page */
1689 	buf_block_t*	block,	/*!< in/out: B-tree page */
1690 	dict_index_t*	index,	/*!< in: the index tree of the page */
1691 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1692 {
1693 	page_cur_t	cur;
1694 	page_cur_set_before_first(block, &cur);
1695 
1696 	return(btr_page_reorganize_low(recovery, z_level, &cur, index, mtr));
1697 }
1698 
1699 /*************************************************************//**
1700 Reorganizes an index page.
1701 
1702 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1703 if this is a compressed leaf page in a secondary index. This has to
1704 be done either within the same mini-transaction, or by invoking
1705 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1706 IBUF_BITMAP_FREE is unaffected by reorganization.
1707 
1708 @retval true if the operation was successful
1709 @retval false if it is a compressed page, and recompression failed */
1710 bool
btr_page_reorganize(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1711 btr_page_reorganize(
1712 /*================*/
1713 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1714 	dict_index_t*	index,	/*!< in: the index tree of the page */
1715 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1716 {
1717 	return(btr_page_reorganize_low(false, page_zip_level,
1718 				       cursor, index, mtr));
1719 }
1720 
1721 /***********************************************************//**
1722 Parses a redo log record of reorganizing a page.
1723 @return end of log record or NULL */
1724 byte*
btr_parse_page_reorganize(byte * ptr,byte * end_ptr,dict_index_t * index,bool compressed,buf_block_t * block,mtr_t * mtr)1725 btr_parse_page_reorganize(
1726 /*======================*/
1727 	byte*		ptr,	/*!< in: buffer */
1728 	byte*		end_ptr,/*!< in: buffer end */
1729 	dict_index_t*	index,	/*!< in: record descriptor */
1730 	bool		compressed,/*!< in: true if compressed page */
1731 	buf_block_t*	block,	/*!< in: page to be reorganized, or NULL */
1732 	mtr_t*		mtr)	/*!< in: mtr or NULL */
1733 {
1734 	ulint	level = page_zip_level;
1735 
1736 	ut_ad(ptr != NULL);
1737 	ut_ad(end_ptr != NULL);
1738 	ut_ad(index != NULL);
1739 
1740 	/* If dealing with a compressed page the record has the
1741 	compression level used during original compression written in
1742 	one byte. Otherwise record is empty. */
1743 	if (compressed) {
1744 		if (ptr == end_ptr) {
1745 			return(NULL);
1746 		}
1747 
1748 		level = mach_read_from_1(ptr);
1749 
1750 		ut_a(level <= 9);
1751 		++ptr;
1752 	} else {
1753 		level = page_zip_level;
1754 	}
1755 
1756 	if (block != NULL) {
1757 		btr_page_reorganize_block(true, level, block, index, mtr);
1758 	}
1759 
1760 	return(ptr);
1761 }
1762 
1763 /** Empty an index page (possibly the root page). @see btr_page_create().
1764 @param[in,out]	block		page to be emptied
1765 @param[in,out]	page_zip	compressed page frame, or NULL
1766 @param[in]	index		index of the page
1767 @param[in]	level		B-tree level of the page (0=leaf)
1768 @param[in,out]	mtr		mini-transaction */
1769 void
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1770 btr_page_empty(
1771 	buf_block_t*	block,
1772 	page_zip_des_t*	page_zip,
1773 	dict_index_t*	index,
1774 	ulint		level,
1775 	mtr_t*		mtr)
1776 {
1777 	page_t*	page = buf_block_get_frame(block);
1778 
1779 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1780 	ut_ad(page_zip == buf_block_get_page_zip(block));
1781 	ut_ad(!index->is_dummy);
1782 	ut_ad(index->table->space->id == block->page.id.space());
1783 #ifdef UNIV_ZIP_DEBUG
1784 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1785 #endif /* UNIV_ZIP_DEBUG */
1786 
1787 	btr_search_drop_page_hash_index(block);
1788 
1789 	/* Recreate the page: note that global data on page (possible
1790 	segment headers, next page-field, etc.) is preserved intact */
1791 
1792 	/* Preserve PAGE_ROOT_AUTO_INC when creating a clustered index
1793 	root page. */
1794 	const ib_uint64_t	autoinc
1795 		= dict_index_is_clust(index)
1796 		&& index->page == block->page.id.page_no()
1797 		? page_get_autoinc(page)
1798 		: 0;
1799 
1800 	if (page_zip) {
1801 		page_create_zip(block, index, level, autoinc, mtr);
1802 	} else {
1803 		page_create(block, mtr, dict_table_is_comp(index->table),
1804 			    dict_index_is_spatial(index));
1805 		btr_page_set_level(page, NULL, level, mtr);
1806 		if (autoinc) {
1807 			mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID + page,
1808 				       autoinc, mtr);
1809 		}
1810 	}
1811 }
1812 
1813 /** Write instant ALTER TABLE metadata to a root page.
1814 @param[in,out]	root	clustered index root page
1815 @param[in]	index	clustered index with instant ALTER TABLE
1816 @param[in,out]	mtr	mini-transaction */
btr_set_instant(buf_block_t * root,const dict_index_t & index,mtr_t * mtr)1817 void btr_set_instant(buf_block_t* root, const dict_index_t& index, mtr_t* mtr)
1818 {
1819 	ut_ad(index.n_core_fields > 0);
1820 	ut_ad(index.n_core_fields < REC_MAX_N_FIELDS);
1821 	ut_ad(index.is_instant());
1822 	ut_ad(fil_page_get_type(root->frame) == FIL_PAGE_TYPE_INSTANT
1823 	      || fil_page_get_type(root->frame) == FIL_PAGE_INDEX);
1824 	ut_ad(!page_has_siblings(root->frame));
1825 	ut_ad(root->page.id.page_no() == index.page);
1826 
1827 	rec_t* infimum = page_get_infimum_rec(root->frame);
1828 	rec_t* supremum = page_get_supremum_rec(root->frame);
1829 	byte* page_type = root->frame + FIL_PAGE_TYPE;
1830 	uint16_t i = page_header_get_field(root->frame, PAGE_INSTANT);
1831 
1832 	switch (mach_read_from_2(page_type)) {
1833 	case FIL_PAGE_TYPE_INSTANT:
1834 		ut_ad(page_get_instant(root->frame) == index.n_core_fields);
1835 		if (memcmp(infimum, "infimum", 8)
1836 		    || memcmp(supremum, "supremum", 8)) {
1837 			ut_ad(index.table->instant);
1838 			ut_ad(!memcmp(infimum, field_ref_zero, 8));
1839 			ut_ad(!memcmp(supremum, field_ref_zero, 7));
1840 			/* The n_core_null_bytes only matters for
1841 			ROW_FORMAT=COMPACT and ROW_FORMAT=DYNAMIC tables. */
1842 			ut_ad(supremum[7] == index.n_core_null_bytes
1843 			      || !index.table->not_redundant());
1844 			return;
1845 		}
1846 		break;
1847 	default:
1848 		ut_ad(!"wrong page type");
1849 		/* fall through */
1850 	case FIL_PAGE_INDEX:
1851 		ut_ad(!page_is_comp(root->frame)
1852 		      || !page_get_instant(root->frame));
1853 		ut_ad(!memcmp(infimum, "infimum", 8));
1854 		ut_ad(!memcmp(supremum, "supremum", 8));
1855 		mlog_write_ulint(page_type, FIL_PAGE_TYPE_INSTANT,
1856 				 MLOG_2BYTES, mtr);
1857 		ut_ad(i <= PAGE_NO_DIRECTION);
1858 		i |= index.n_core_fields << 3;
1859 		mlog_write_ulint(PAGE_HEADER + PAGE_INSTANT + root->frame, i,
1860 				 MLOG_2BYTES, mtr);
1861 		break;
1862 	}
1863 
1864 	if (index.table->instant) {
1865 		mlog_memset(root, infimum - root->frame, 8, 0, mtr);
1866 		mlog_memset(root, supremum - root->frame, 7, 0, mtr);
1867 		mlog_write_ulint(&supremum[7], index.n_core_null_bytes,
1868 				 MLOG_1BYTE, mtr);
1869 	}
1870 }
1871 
1872 /*************************************************************//**
1873 Makes tree one level higher by splitting the root, and inserts
1874 the tuple. It is assumed that mtr contains an x-latch on the tree.
1875 NOTE that the operation of this function must always succeed,
1876 we cannot reverse it: therefore enough free disk space must be
1877 guaranteed to be available before this function is called.
1878 @return inserted record */
1879 rec_t*
btr_root_raise_and_insert(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1880 btr_root_raise_and_insert(
1881 /*======================*/
1882 	ulint		flags,	/*!< in: undo logging and locking flags */
1883 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert: must be
1884 				on the root page; when the function returns,
1885 				the cursor is positioned on the predecessor
1886 				of the inserted record */
1887 	rec_offs**	offsets,/*!< out: offsets on inserted record */
1888 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1889 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
1890 	ulint		n_ext,	/*!< in: number of externally stored columns */
1891 	mtr_t*		mtr)	/*!< in: mtr */
1892 {
1893 	dict_index_t*	index;
1894 	page_t*		root;
1895 	page_t*		new_page;
1896 	ulint		new_page_no;
1897 	rec_t*		rec;
1898 	dtuple_t*	node_ptr;
1899 	ulint		level;
1900 	rec_t*		node_ptr_rec;
1901 	page_cur_t*	page_cursor;
1902 	page_zip_des_t*	root_page_zip;
1903 	page_zip_des_t*	new_page_zip;
1904 	buf_block_t*	root_block;
1905 	buf_block_t*	new_block;
1906 
1907 	root = btr_cur_get_page(cursor);
1908 	root_block = btr_cur_get_block(cursor);
1909 	root_page_zip = buf_block_get_page_zip(root_block);
1910 	ut_ad(!page_is_empty(root));
1911 	index = btr_cur_get_index(cursor);
1912 	ut_ad(index->n_core_null_bytes <= UT_BITS_IN_BYTES(index->n_nullable));
1913 #ifdef UNIV_ZIP_DEBUG
1914 	ut_a(!root_page_zip || page_zip_validate(root_page_zip, root, index));
1915 #endif /* UNIV_ZIP_DEBUG */
1916 #ifdef UNIV_BTR_DEBUG
1917 	if (!dict_index_is_ibuf(index)) {
1918 		ulint	space = index->table->space_id;
1919 
1920 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1921 					    + root, space));
1922 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1923 					    + root, space));
1924 	}
1925 
1926 	ut_a(dict_index_get_page(index) == page_get_page_no(root));
1927 #endif /* UNIV_BTR_DEBUG */
1928 	ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
1929 					MTR_MEMO_X_LOCK
1930 					| MTR_MEMO_SX_LOCK));
1931 	ut_ad(mtr_memo_contains(mtr, root_block, MTR_MEMO_PAGE_X_FIX));
1932 
1933 	/* Allocate a new page to the tree. Root splitting is done by first
1934 	moving the root records to the new page, emptying the root, putting
1935 	a node pointer to the new page, and then splitting the new page. */
1936 
1937 	level = btr_page_get_level(root);
1938 
1939 	new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1940 
1941 	if (new_block == NULL && os_has_said_disk_full) {
1942 		return(NULL);
1943 	}
1944 
1945 	new_page = buf_block_get_frame(new_block);
1946 	new_page_zip = buf_block_get_page_zip(new_block);
1947 	ut_a(!new_page_zip == !root_page_zip);
1948 	ut_a(!new_page_zip
1949 	     || page_zip_get_size(new_page_zip)
1950 	     == page_zip_get_size(root_page_zip));
1951 
1952 	btr_page_create(new_block, new_page_zip, index, level, mtr);
1953 
1954 	/* Set the next node and previous node fields of new page */
1955 	compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4);
1956 	compile_time_assert(FIL_NULL == 0xffffffff);
1957 #if MYSQL_VERSION_ID < 100500
1958 	if (UNIV_LIKELY_NULL(new_page_zip)) {
1959 		/* Avoid tripping the ut_a() in mlog_parse_nbytes()
1960 		when crash-downgrading to an earlier MariaDB 10.4 version. */
1961 		btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
1962 		btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
1963 	} else {
1964 		mlog_memset(new_block, FIL_PAGE_PREV, 8, 0xff, mtr);
1965 	}
1966 #else
1967 	mlog_memset(new_block, FIL_PAGE_PREV, 8, 0xff, mtr);
1968 	if (UNIV_LIKELY_NULL(new_page_zip)) {
1969 		memset(new_page_zip->data + FIL_PAGE_PREV, 0xff, 8);
1970 	}
1971 #endif
1972 
1973 	/* Copy the records from root to the new page one by one. */
1974 
1975 	if (0
1976 #ifdef UNIV_ZIP_COPY
1977 	    || new_page_zip
1978 #endif /* UNIV_ZIP_COPY */
1979 	    || !page_copy_rec_list_end(new_block, root_block,
1980 				       page_get_infimum_rec(root),
1981 				       index, mtr)) {
1982 		ut_a(new_page_zip);
1983 
1984 		/* Copy the page byte for byte. */
1985 		page_zip_copy_recs(new_page_zip, new_page,
1986 				   root_page_zip, root, index, mtr);
1987 
1988 		/* Update the lock table and possible hash index. */
1989 		lock_move_rec_list_end(new_block, root_block,
1990 				       page_get_infimum_rec(root));
1991 
1992 		/* Move any existing predicate locks */
1993 		if (dict_index_is_spatial(index)) {
1994 			lock_prdt_rec_move(new_block, root_block);
1995 		} else {
1996 			btr_search_move_or_delete_hash_entries(
1997 				new_block, root_block);
1998 		}
1999 	}
2000 
2001 	if (dict_index_is_sec_or_ibuf(index)) {
2002 		/* In secondary indexes and the change buffer,
2003 		PAGE_MAX_TRX_ID can be reset on the root page, because
2004 		the field only matters on leaf pages, and the root no
2005 		longer is a leaf page. (Older versions of InnoDB did
2006 		set PAGE_MAX_TRX_ID on all secondary index pages.) */
2007 		if (root_page_zip) {
2008 			byte* p = PAGE_HEADER + PAGE_MAX_TRX_ID + root;
2009 			memset(p, 0, 8);
2010 			page_zip_write_header(root_page_zip, p, 8, mtr);
2011 		} else {
2012 			mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID
2013 				       + root, 0, mtr);
2014 		}
2015 	} else {
2016 		/* PAGE_ROOT_AUTO_INC is only present in the clustered index
2017 		root page; on other clustered index pages, we want to reserve
2018 		the field PAGE_MAX_TRX_ID for future use. */
2019 		if (new_page_zip) {
2020 			byte* p = PAGE_HEADER + PAGE_MAX_TRX_ID + new_page;
2021 			memset(p, 0, 8);
2022 			page_zip_write_header(new_page_zip, p, 8, mtr);
2023 		} else {
2024 			mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID
2025 				       + new_page, 0, mtr);
2026 		}
2027 	}
2028 
2029 	/* If this is a pessimistic insert which is actually done to
2030 	perform a pessimistic update then we have stored the lock
2031 	information of the record to be inserted on the infimum of the
2032 	root page: we cannot discard the lock structs on the root page */
2033 
2034 	if (!dict_table_is_locking_disabled(index->table)) {
2035 		lock_update_root_raise(new_block, root_block);
2036 	}
2037 
2038 	/* Create a memory heap where the node pointer is stored */
2039 	if (!*heap) {
2040 		*heap = mem_heap_create(1000);
2041 	}
2042 
2043 	rec = page_rec_get_next(page_get_infimum_rec(new_page));
2044 	new_page_no = new_block->page.id.page_no();
2045 
2046 	/* Build the node pointer (= node key and page address) for the
2047 	child */
2048 	if (dict_index_is_spatial(index)) {
2049 		rtr_mbr_t		new_mbr;
2050 
2051 		rtr_page_cal_mbr(index, new_block, &new_mbr, *heap);
2052 		node_ptr = rtr_index_build_node_ptr(
2053 			index, &new_mbr, rec, new_page_no, *heap);
2054 	} else {
2055 		node_ptr = dict_index_build_node_ptr(
2056 			index, rec, new_page_no, *heap, level);
2057 	}
2058 	/* The node pointer must be marked as the predefined minimum record,
2059 	as there is no lower alphabetical limit to records in the leftmost
2060 	node of a level: */
2061 	dtuple_set_info_bits(node_ptr,
2062 			     dtuple_get_info_bits(node_ptr)
2063 			     | REC_INFO_MIN_REC_FLAG);
2064 
2065 	/* Rebuild the root page to get free space */
2066 	btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
2067 	/* btr_page_empty() is supposed to zero-initialize the field. */
2068 	ut_ad(!page_get_instant(root_block->frame));
2069 
2070 	if (index->is_instant()) {
2071 		ut_ad(!root_page_zip);
2072 		btr_set_instant(root_block, *index, mtr);
2073 	}
2074 
2075 	ut_ad(!page_has_siblings(root));
2076 
2077 	page_cursor = btr_cur_get_page_cur(cursor);
2078 
2079 	/* Insert node pointer to the root */
2080 
2081 	page_cur_set_before_first(root_block, page_cursor);
2082 
2083 	node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
2084 					     index, offsets, heap, 0, mtr);
2085 
2086 	/* The root page should only contain the node pointer
2087 	to new_page at this point.  Thus, the data should fit. */
2088 	ut_a(node_ptr_rec);
2089 
2090 	/* We play safe and reset the free bits for the new page */
2091 
2092 	if (!dict_index_is_clust(index)
2093 	    && !index->table->is_temporary()) {
2094 		ibuf_reset_free_bits(new_block);
2095 	}
2096 
2097 	if (tuple != NULL) {
2098 		/* Reposition the cursor to the child node */
2099 		page_cur_search(new_block, index, tuple, page_cursor);
2100 	} else {
2101 		/* Set cursor to first record on child node */
2102 		page_cur_set_before_first(new_block, page_cursor);
2103 	}
2104 
2105 	/* Split the child and insert tuple */
2106 	if (dict_index_is_spatial(index)) {
2107 		/* Split rtree page and insert tuple */
2108 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2109 						 tuple, n_ext, mtr));
2110 	} else {
2111 		return(btr_page_split_and_insert(flags, cursor, offsets, heap,
2112 						 tuple, n_ext, mtr));
2113 	}
2114 }
2115 
2116 /** Decide if the page should be split at the convergence point of inserts
2117 converging to the left.
2118 @param[in]	cursor	insert position
2119 @return the first record to be moved to the right half page
2120 @retval	NULL if no split is recommended */
btr_page_get_split_rec_to_left(const btr_cur_t * cursor)2121 rec_t* btr_page_get_split_rec_to_left(const btr_cur_t* cursor)
2122 {
2123 	rec_t* split_rec = btr_cur_get_rec(cursor);
2124 	const page_t* page = page_align(split_rec);
2125 
2126 	if (page_header_get_ptr(page, PAGE_LAST_INSERT)
2127 	    != page_rec_get_next(split_rec)) {
2128 		return NULL;
2129 	}
2130 
2131 	/* The metadata record must be present in the leftmost leaf page
2132 	of the clustered index, if and only if index->is_instant().
2133 	However, during innobase_instant_try(), index->is_instant()
2134 	would already hold when row_ins_clust_index_entry_low()
2135 	is being invoked to insert the the metadata record.
2136 	So, we can only assert that when the metadata record exists,
2137 	index->is_instant() must hold. */
2138 	ut_ad(!page_is_leaf(page) || page_has_prev(page)
2139 	      || cursor->index->is_instant()
2140 	      || !(rec_get_info_bits(page_rec_get_next_const(
2141 					     page_get_infimum_rec(page)),
2142 				     cursor->index->table->not_redundant())
2143 		   & REC_INFO_MIN_REC_FLAG));
2144 
2145 	const rec_t* infimum = page_get_infimum_rec(page);
2146 
2147 	/* If the convergence is in the middle of a page, include also
2148 	the record immediately before the new insert to the upper
2149 	page. Otherwise, we could repeatedly move from page to page
2150 	lots of records smaller than the convergence point. */
2151 
2152 	if (split_rec == infimum
2153 	    || split_rec == page_rec_get_next_const(infimum)) {
2154 		split_rec = page_rec_get_next(split_rec);
2155 	}
2156 
2157 	return split_rec;
2158 }
2159 
2160 /** Decide if the page should be split at the convergence point of inserts
2161 converging to the right.
2162 @param[in]	cursor		insert position
2163 @param[out]	split_rec	if split recommended, the first record
2164 				on the right half page, or
2165 				NULL if the to-be-inserted record
2166 				should be first
2167 @return whether split is recommended */
2168 bool
btr_page_get_split_rec_to_right(const btr_cur_t * cursor,rec_t ** split_rec)2169 btr_page_get_split_rec_to_right(const btr_cur_t* cursor, rec_t** split_rec)
2170 {
2171 	rec_t* insert_point = btr_cur_get_rec(cursor);
2172 	const page_t* page = page_align(insert_point);
2173 
2174 	/* We use eager heuristics: if the new insert would be right after
2175 	the previous insert on the same page, we assume that there is a
2176 	pattern of sequential inserts here. */
2177 
2178 	if (page_header_get_ptr(page, PAGE_LAST_INSERT) != insert_point) {
2179 		return false;
2180 	}
2181 
2182 	insert_point = page_rec_get_next(insert_point);
2183 
2184 	if (page_rec_is_supremum(insert_point)) {
2185 		insert_point = NULL;
2186 	} else {
2187 		insert_point = page_rec_get_next(insert_point);
2188 		if (page_rec_is_supremum(insert_point)) {
2189 			insert_point = NULL;
2190 		}
2191 
2192 		/* If there are >= 2 user records up from the insert
2193 		point, split all but 1 off. We want to keep one because
2194 		then sequential inserts can use the adaptive hash
2195 		index, as they can do the necessary checks of the right
2196 		search position just by looking at the records on this
2197 		page. */
2198 	}
2199 
2200 	*split_rec = insert_point;
2201 	return true;
2202 }
2203 
2204 /*************************************************************//**
2205 Calculates a split record such that the tuple will certainly fit on
2206 its half-page when the split is performed. We assume in this function
2207 only that the cursor page has at least one user record.
2208 @return split record, or NULL if tuple will be the first record on
2209 the lower or upper half-page (determined by btr_page_tuple_smaller()) */
2210 static
2211 rec_t*
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext)2212 btr_page_get_split_rec(
2213 /*===================*/
2214 	btr_cur_t*	cursor,	/*!< in: cursor at which insert should be made */
2215 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2216 	ulint		n_ext)	/*!< in: number of externally stored columns */
2217 {
2218 	page_t*		page;
2219 	page_zip_des_t*	page_zip;
2220 	ulint		insert_size;
2221 	ulint		free_space;
2222 	ulint		total_data;
2223 	ulint		total_n_recs;
2224 	ulint		total_space;
2225 	ulint		incl_data;
2226 	rec_t*		ins_rec;
2227 	rec_t*		rec;
2228 	rec_t*		next_rec;
2229 	ulint		n;
2230 	mem_heap_t*	heap;
2231 	rec_offs*	offsets;
2232 
2233 	page = btr_cur_get_page(cursor);
2234 
2235 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2236 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2237 
2238 	page_zip = btr_cur_get_page_zip(cursor);
2239 	if (page_zip) {
2240 		/* Estimate the free space of an empty compressed page. */
2241 		ulint	free_space_zip = page_zip_empty_size(
2242 			cursor->index->n_fields,
2243 			page_zip_get_size(page_zip));
2244 
2245 		if (free_space > (ulint) free_space_zip) {
2246 			free_space = (ulint) free_space_zip;
2247 		}
2248 	}
2249 
2250 	/* free_space is now the free space of a created new page */
2251 
2252 	total_data   = page_get_data_size(page) + insert_size;
2253 	total_n_recs = ulint(page_get_n_recs(page)) + 1;
2254 	ut_ad(total_n_recs >= 2);
2255 	total_space  = total_data + page_dir_calc_reserved_space(total_n_recs);
2256 
2257 	n = 0;
2258 	incl_data = 0;
2259 	ins_rec = btr_cur_get_rec(cursor);
2260 	rec = page_get_infimum_rec(page);
2261 
2262 	heap = NULL;
2263 	offsets = NULL;
2264 
2265 	/* We start to include records to the left half, and when the
2266 	space reserved by them exceeds half of total_space, then if
2267 	the included records fit on the left page, they will be put there
2268 	if something was left over also for the right page,
2269 	otherwise the last included record will be the first on the right
2270 	half page */
2271 
2272 	do {
2273 		/* Decide the next record to include */
2274 		if (rec == ins_rec) {
2275 			rec = NULL;	/* NULL denotes that tuple is
2276 					now included */
2277 		} else if (rec == NULL) {
2278 			rec = page_rec_get_next(ins_rec);
2279 		} else {
2280 			rec = page_rec_get_next(rec);
2281 		}
2282 
2283 		if (rec == NULL) {
2284 			/* Include tuple */
2285 			incl_data += insert_size;
2286 		} else {
2287 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2288 						  page_is_leaf(page)
2289 						  ? cursor->index->n_core_fields
2290 						  : 0,
2291 						  ULINT_UNDEFINED, &heap);
2292 			incl_data += rec_offs_size(offsets);
2293 		}
2294 
2295 		n++;
2296 	} while (incl_data + page_dir_calc_reserved_space(n)
2297 		 < total_space / 2);
2298 
2299 	if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
2300 		/* The next record will be the first on
2301 		the right half page if it is not the
2302 		supremum record of page */
2303 
2304 		if (rec == ins_rec) {
2305 			rec = NULL;
2306 
2307 			goto func_exit;
2308 		} else if (rec == NULL) {
2309 			next_rec = page_rec_get_next(ins_rec);
2310 		} else {
2311 			next_rec = page_rec_get_next(rec);
2312 		}
2313 		ut_ad(next_rec);
2314 		if (!page_rec_is_supremum(next_rec)) {
2315 			rec = next_rec;
2316 		}
2317 	}
2318 
2319 func_exit:
2320 	if (heap) {
2321 		mem_heap_free(heap);
2322 	}
2323 	return(rec);
2324 }
2325 
2326 /*************************************************************//**
2327 Returns TRUE if the insert fits on the appropriate half-page with the
2328 chosen split_rec.
2329 @return true if fits */
2330 static MY_ATTRIBUTE((nonnull(1,3,4,6), warn_unused_result))
2331 bool
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,rec_offs ** offsets,const dtuple_t * tuple,ulint n_ext,mem_heap_t ** heap)2332 btr_page_insert_fits(
2333 /*=================*/
2334 	btr_cur_t*	cursor,	/*!< in: cursor at which insert
2335 				should be made */
2336 	const rec_t*	split_rec,/*!< in: suggestion for first record
2337 				on upper half-page, or NULL if
2338 				tuple to be inserted should be first */
2339 	rec_offs**	offsets,/*!< in: rec_get_offsets(
2340 				split_rec, cursor->index); out: garbage */
2341 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2342 	ulint		n_ext,	/*!< in: number of externally stored columns */
2343 	mem_heap_t**	heap)	/*!< in: temporary memory heap */
2344 {
2345 	page_t*		page;
2346 	ulint		insert_size;
2347 	ulint		free_space;
2348 	ulint		total_data;
2349 	ulint		total_n_recs;
2350 	const rec_t*	rec;
2351 	const rec_t*	end_rec;
2352 
2353 	page = btr_cur_get_page(cursor);
2354 
2355 	ut_ad(!split_rec
2356 	      || !page_is_comp(page) == !rec_offs_comp(*offsets));
2357 	ut_ad(!split_rec
2358 	      || rec_offs_validate(split_rec, cursor->index, *offsets));
2359 
2360 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2361 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2362 
2363 	/* free_space is now the free space of a created new page */
2364 
2365 	total_data   = page_get_data_size(page) + insert_size;
2366 	total_n_recs = ulint(page_get_n_recs(page)) + 1;
2367 
2368 	/* We determine which records (from rec to end_rec, not including
2369 	end_rec) will end up on the other half page from tuple when it is
2370 	inserted. */
2371 
2372 	if (split_rec == NULL) {
2373 		rec = page_rec_get_next(page_get_infimum_rec(page));
2374 		end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
2375 
2376 	} else if (cmp_dtuple_rec(tuple, split_rec, *offsets) >= 0) {
2377 
2378 		rec = page_rec_get_next(page_get_infimum_rec(page));
2379 		end_rec = split_rec;
2380 	} else {
2381 		rec = split_rec;
2382 		end_rec = page_get_supremum_rec(page);
2383 	}
2384 
2385 	if (total_data + page_dir_calc_reserved_space(total_n_recs)
2386 	    <= free_space) {
2387 
2388 		/* Ok, there will be enough available space on the
2389 		half page where the tuple is inserted */
2390 
2391 		return(true);
2392 	}
2393 
2394 	while (rec != end_rec) {
2395 		/* In this loop we calculate the amount of reserved
2396 		space after rec is removed from page. */
2397 
2398 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2399 					   page_is_leaf(page)
2400 					   ? cursor->index->n_core_fields
2401 					   : 0,
2402 					   ULINT_UNDEFINED, heap);
2403 
2404 		total_data -= rec_offs_size(*offsets);
2405 		total_n_recs--;
2406 
2407 		if (total_data + page_dir_calc_reserved_space(total_n_recs)
2408 		    <= free_space) {
2409 
2410 			/* Ok, there will be enough available space on the
2411 			half page where the tuple is inserted */
2412 
2413 			return(true);
2414 		}
2415 
2416 		rec = page_rec_get_next_const(rec);
2417 	}
2418 
2419 	return(false);
2420 }
2421 
2422 /*******************************************************//**
2423 Inserts a data tuple to a tree on a non-leaf level. It is assumed
2424 that mtr holds an x-latch on the tree. */
2425 void
btr_insert_on_non_leaf_level_func(ulint flags,dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,unsigned line,mtr_t * mtr)2426 btr_insert_on_non_leaf_level_func(
2427 /*==============================*/
2428 	ulint		flags,	/*!< in: undo logging and locking flags */
2429 	dict_index_t*	index,	/*!< in: index */
2430 	ulint		level,	/*!< in: level, must be > 0 */
2431 	dtuple_t*	tuple,	/*!< in: the record to be inserted */
2432 	const char*	file,	/*!< in: file name */
2433 	unsigned	line,	/*!< in: line where called */
2434 	mtr_t*		mtr)	/*!< in: mtr */
2435 {
2436 	big_rec_t*	dummy_big_rec;
2437 	btr_cur_t	cursor;
2438 	dberr_t		err;
2439 	rec_t*		rec;
2440 	mem_heap_t*	heap = NULL;
2441 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
2442 	rec_offs*	offsets         = offsets_;
2443 	rec_offs_init(offsets_);
2444 	rtr_info_t	rtr_info;
2445 
2446 	ut_ad(level > 0);
2447 
2448 	if (!dict_index_is_spatial(index)) {
2449 		dberr_t err = btr_cur_search_to_nth_level(
2450 			index, level, tuple, PAGE_CUR_LE,
2451 			BTR_CONT_MODIFY_TREE,
2452 			&cursor, 0, file, line, mtr);
2453 
2454 		if (err != DB_SUCCESS) {
2455 			ib::warn() << " Error code: " << err
2456 				   << " btr_page_get_father_node_ptr_func "
2457 				   << " level: " << level
2458 				   << " called from file: "
2459 				   << file << " line: " << line
2460 				   << " table: " << index->table->name
2461 				   << " index: " << index->name;
2462 		}
2463 	} else {
2464 		/* For spatial index, initialize structures to track
2465 		its parents etc. */
2466 		rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2467 
2468 		rtr_info_update_btr(&cursor, &rtr_info);
2469 
2470 		btr_cur_search_to_nth_level(index, level, tuple,
2471 					    PAGE_CUR_RTREE_INSERT,
2472 					    BTR_CONT_MODIFY_TREE,
2473 					    &cursor, 0, file, line, mtr);
2474 	}
2475 
2476 	ut_ad(cursor.flag == BTR_CUR_BINARY);
2477 
2478 	err = btr_cur_optimistic_insert(
2479 		flags
2480 		| BTR_NO_LOCKING_FLAG
2481 		| BTR_KEEP_SYS_FLAG
2482 		| BTR_NO_UNDO_LOG_FLAG,
2483 		&cursor, &offsets, &heap,
2484 		tuple, &rec, &dummy_big_rec, 0, NULL, mtr);
2485 
2486 	if (err == DB_FAIL) {
2487 		err = btr_cur_pessimistic_insert(flags
2488 						 | BTR_NO_LOCKING_FLAG
2489 						 | BTR_KEEP_SYS_FLAG
2490 						 | BTR_NO_UNDO_LOG_FLAG,
2491 						 &cursor, &offsets, &heap,
2492 						 tuple, &rec,
2493 						 &dummy_big_rec, 0, NULL, mtr);
2494 		ut_a(err == DB_SUCCESS);
2495 	}
2496 
2497 	if (heap != NULL) {
2498 		mem_heap_free(heap);
2499 	}
2500 
2501 	if (dict_index_is_spatial(index)) {
2502 		ut_ad(cursor.rtr_info);
2503 
2504 		rtr_clean_rtr_info(&rtr_info, true);
2505 	}
2506 }
2507 
2508 /**************************************************************//**
2509 Attaches the halves of an index page on the appropriate level in an
2510 index tree. */
2511 static MY_ATTRIBUTE((nonnull))
2512 void
btr_attach_half_pages(ulint flags,dict_index_t * index,buf_block_t * block,const rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2513 btr_attach_half_pages(
2514 /*==================*/
2515 	ulint		flags,		/*!< in: undo logging and
2516 					locking flags */
2517 	dict_index_t*	index,		/*!< in: the index tree */
2518 	buf_block_t*	block,		/*!< in/out: page to be split */
2519 	const rec_t*	split_rec,	/*!< in: first record on upper
2520 					half page */
2521 	buf_block_t*	new_block,	/*!< in/out: the new half page */
2522 	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
2523 	mtr_t*		mtr)		/*!< in: mtr */
2524 {
2525 	ulint		prev_page_no;
2526 	ulint		next_page_no;
2527 	ulint		level;
2528 	page_t*		page		= buf_block_get_frame(block);
2529 	page_t*		lower_page;
2530 	page_t*		upper_page;
2531 	ulint		lower_page_no;
2532 	ulint		upper_page_no;
2533 	page_zip_des_t*	lower_page_zip;
2534 	page_zip_des_t*	upper_page_zip;
2535 	dtuple_t*	node_ptr_upper;
2536 	mem_heap_t*	heap;
2537 	buf_block_t*	prev_block = NULL;
2538 	buf_block_t*	next_block = NULL;
2539 
2540 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2541 	ut_ad(mtr_memo_contains(mtr, new_block, MTR_MEMO_PAGE_X_FIX));
2542 
2543 	/* Create a memory heap where the data tuple is stored */
2544 	heap = mem_heap_create(1024);
2545 
2546 	/* Based on split direction, decide upper and lower pages */
2547 	if (direction == FSP_DOWN) {
2548 
2549 		btr_cur_t	cursor;
2550 		rec_offs*	offsets;
2551 
2552 		lower_page = buf_block_get_frame(new_block);
2553 		lower_page_no = new_block->page.id.page_no();
2554 		lower_page_zip = buf_block_get_page_zip(new_block);
2555 		upper_page = buf_block_get_frame(block);
2556 		upper_page_no = block->page.id.page_no();
2557 		upper_page_zip = buf_block_get_page_zip(block);
2558 
2559 		/* Look up the index for the node pointer to page */
2560 		offsets = btr_page_get_father_block(NULL, heap, index,
2561 						    block, mtr, &cursor);
2562 
2563 		/* Replace the address of the old child node (= page) with the
2564 		address of the new lower half */
2565 
2566 		btr_node_ptr_set_child_page_no(
2567 			btr_cur_get_rec(&cursor),
2568 			btr_cur_get_page_zip(&cursor),
2569 			offsets, lower_page_no, mtr);
2570 		mem_heap_empty(heap);
2571 	} else {
2572 		lower_page = buf_block_get_frame(block);
2573 		lower_page_no = block->page.id.page_no();
2574 		lower_page_zip = buf_block_get_page_zip(block);
2575 		upper_page = buf_block_get_frame(new_block);
2576 		upper_page_no = new_block->page.id.page_no();
2577 		upper_page_zip = buf_block_get_page_zip(new_block);
2578 	}
2579 
2580 	/* Get the previous and next pages of page */
2581 	prev_page_no = btr_page_get_prev(page);
2582 	next_page_no = btr_page_get_next(page);
2583 
2584 	const ulint	space = block->page.id.space();
2585 
2586 	/* for consistency, both blocks should be locked, before change */
2587 	if (prev_page_no != FIL_NULL && direction == FSP_DOWN) {
2588 		prev_block = btr_block_get(
2589 			page_id_t(space, prev_page_no), block->zip_size(),
2590 			RW_X_LATCH, index, mtr);
2591 	}
2592 	if (next_page_no != FIL_NULL && direction != FSP_DOWN) {
2593 		next_block = btr_block_get(
2594 			page_id_t(space, next_page_no), block->zip_size(),
2595 			RW_X_LATCH, index, mtr);
2596 	}
2597 
2598 	/* Get the level of the split pages */
2599 	level = btr_page_get_level(buf_block_get_frame(block));
2600 	ut_ad(level == btr_page_get_level(buf_block_get_frame(new_block)));
2601 
2602 	/* Build the node pointer (= node key and page address) for the upper
2603 	half */
2604 
2605 	node_ptr_upper = dict_index_build_node_ptr(index, split_rec,
2606 						   upper_page_no, heap, level);
2607 
2608 	/* Insert it next to the pointer to the lower half. Note that this
2609 	may generate recursion leading to a split on the higher level. */
2610 
2611 	btr_insert_on_non_leaf_level(flags, index, level + 1,
2612 				     node_ptr_upper, mtr);
2613 
2614 	/* Free the memory heap */
2615 	mem_heap_free(heap);
2616 
2617 	/* Update page links of the level */
2618 
2619 	if (prev_block) {
2620 #ifdef UNIV_BTR_DEBUG
2621 		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
2622 		ut_a(btr_page_get_next(prev_block->frame)
2623 		     == block->page.id.page_no());
2624 #endif /* UNIV_BTR_DEBUG */
2625 
2626 		btr_page_set_next(buf_block_get_frame(prev_block),
2627 				  buf_block_get_page_zip(prev_block),
2628 				  lower_page_no, mtr);
2629 	}
2630 
2631 	if (next_block) {
2632 #ifdef UNIV_BTR_DEBUG
2633 		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
2634 		ut_a(btr_page_get_prev(next_block->frame)
2635 		     == page_get_page_no(page));
2636 #endif /* UNIV_BTR_DEBUG */
2637 
2638 		btr_page_set_prev(buf_block_get_frame(next_block),
2639 				  buf_block_get_page_zip(next_block),
2640 				  upper_page_no, mtr);
2641 	}
2642 
2643 	if (direction == FSP_DOWN) {
2644 		/* lower_page is new */
2645 		btr_page_set_prev(lower_page, lower_page_zip,
2646 				  prev_page_no, mtr);
2647 	} else {
2648 		ut_ad(btr_page_get_prev(lower_page) == prev_page_no);
2649 	}
2650 
2651 	btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
2652 	btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
2653 
2654 	if (direction != FSP_DOWN) {
2655 		/* upper_page is new */
2656 		btr_page_set_next(upper_page, upper_page_zip,
2657 				  next_page_no, mtr);
2658 	} else {
2659 		ut_ad(btr_page_get_next(upper_page) == next_page_no);
2660 	}
2661 }
2662 
2663 /*************************************************************//**
2664 Determine if a tuple is smaller than any record on the page.
2665 @return TRUE if smaller */
2666 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2667 bool
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,rec_offs ** offsets,ulint n_uniq,mem_heap_t ** heap)2668 btr_page_tuple_smaller(
2669 /*===================*/
2670 	btr_cur_t*	cursor,	/*!< in: b-tree cursor */
2671 	const dtuple_t*	tuple,	/*!< in: tuple to consider */
2672 	rec_offs**	offsets,/*!< in/out: temporary storage */
2673 	ulint		n_uniq,	/*!< in: number of unique fields
2674 				in the index page records */
2675 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2676 {
2677 	buf_block_t*	block;
2678 	const rec_t*	first_rec;
2679 	page_cur_t	pcur;
2680 
2681 	/* Read the first user record in the page. */
2682 	block = btr_cur_get_block(cursor);
2683 	page_cur_set_before_first(block, &pcur);
2684 	page_cur_move_to_next(&pcur);
2685 	first_rec = page_cur_get_rec(&pcur);
2686 
2687 	*offsets = rec_get_offsets(
2688 		first_rec, cursor->index, *offsets,
2689 		page_is_leaf(block->frame) ? cursor->index->n_core_fields : 0,
2690 		n_uniq, heap);
2691 
2692 	return(cmp_dtuple_rec(tuple, first_rec, *offsets) < 0);
2693 }
2694 
2695 /** Insert the tuple into the right sibling page, if the cursor is at the end
2696 of a page.
2697 @param[in]	flags	undo logging and locking flags
2698 @param[in,out]	cursor	cursor at which to insert; when the function succeeds,
2699 			the cursor is positioned before the insert point.
2700 @param[out]	offsets	offsets on inserted record
2701 @param[in,out]	heap	memory heap for allocating offsets
2702 @param[in]	tuple	tuple to insert
2703 @param[in]	n_ext	number of externally stored columns
2704 @param[in,out]	mtr	mini-transaction
2705 @return	inserted record (first record on the right sibling page);
2706 	the cursor will be positioned on the page infimum
2707 @retval	NULL if the operation was not performed */
2708 static
2709 rec_t*
btr_insert_into_right_sibling(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t * heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2710 btr_insert_into_right_sibling(
2711 	ulint		flags,
2712 	btr_cur_t*	cursor,
2713 	rec_offs**	offsets,
2714 	mem_heap_t*	heap,
2715 	const dtuple_t*	tuple,
2716 	ulint		n_ext,
2717 	mtr_t*		mtr)
2718 {
2719 	buf_block_t*	block = btr_cur_get_block(cursor);
2720 	page_t*		page = buf_block_get_frame(block);
2721 	const uint32_t	next_page_no = btr_page_get_next(page);
2722 
2723 	ut_ad(mtr_memo_contains_flagged(
2724 		      mtr, dict_index_get_lock(cursor->index),
2725 		      MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2726 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2727 	ut_ad(heap);
2728 
2729 	if (next_page_no == FIL_NULL || !page_rec_is_supremum(
2730 			page_rec_get_next(btr_cur_get_rec(cursor)))) {
2731 
2732 		return(NULL);
2733 	}
2734 
2735 	page_cur_t	next_page_cursor;
2736 	buf_block_t*	next_block;
2737 	page_t*		next_page;
2738 	btr_cur_t	next_father_cursor;
2739 	rec_t*		rec = NULL;
2740 	ulint		max_size;
2741 
2742 	const ulint	space = block->page.id.space();
2743 
2744 	next_block = btr_block_get(
2745 		page_id_t(space, next_page_no), block->zip_size(),
2746 		RW_X_LATCH, cursor->index, mtr);
2747 	next_page = buf_block_get_frame(next_block);
2748 
2749 	bool	is_leaf = page_is_leaf(next_page);
2750 
2751 	btr_page_get_father(
2752 		cursor->index, next_block, mtr, &next_father_cursor);
2753 
2754 	page_cur_search(
2755 		next_block, cursor->index, tuple, PAGE_CUR_LE,
2756 		&next_page_cursor);
2757 
2758 	max_size = page_get_max_insert_size_after_reorganize(next_page, 1);
2759 
2760 	/* Extends gap lock for the next page */
2761 	if (!dict_table_is_locking_disabled(cursor->index->table)) {
2762 		lock_update_split_left(next_block, block);
2763 	}
2764 
2765 	rec = page_cur_tuple_insert(
2766 		&next_page_cursor, tuple, cursor->index, offsets, &heap,
2767 		n_ext, mtr);
2768 
2769 	if (rec == NULL) {
2770 		if (is_leaf
2771 		    && next_block->page.zip.ssize
2772 		    && !dict_index_is_clust(cursor->index)
2773 		    && !cursor->index->table->is_temporary()) {
2774 			/* Reset the IBUF_BITMAP_FREE bits, because
2775 			page_cur_tuple_insert() will have attempted page
2776 			reorganize before failing. */
2777 			ibuf_reset_free_bits(next_block);
2778 		}
2779 		return(NULL);
2780 	}
2781 
2782 	ibool	compressed;
2783 	dberr_t	err;
2784 	ulint	level = btr_page_get_level(next_page);
2785 
2786 	/* adjust cursor position */
2787 	*btr_cur_get_page_cur(cursor) = next_page_cursor;
2788 
2789 	ut_ad(btr_cur_get_rec(cursor) == page_get_infimum_rec(next_page));
2790 	ut_ad(page_rec_get_next(page_get_infimum_rec(next_page)) == rec);
2791 
2792 	/* We have to change the parent node pointer */
2793 
2794 	compressed = btr_cur_pessimistic_delete(
2795 		&err, TRUE, &next_father_cursor,
2796 		BTR_CREATE_FLAG, false, mtr);
2797 
2798 	ut_a(err == DB_SUCCESS);
2799 
2800 	if (!compressed) {
2801 		btr_cur_compress_if_useful(&next_father_cursor, FALSE, mtr);
2802 	}
2803 
2804 	dtuple_t*	node_ptr = dict_index_build_node_ptr(
2805 		cursor->index, rec, next_block->page.id.page_no(),
2806 		heap, level);
2807 
2808 	btr_insert_on_non_leaf_level(
2809 		flags, cursor->index, level + 1, node_ptr, mtr);
2810 
2811 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2812 
2813 	if (is_leaf
2814 	    && !dict_index_is_clust(cursor->index)
2815 	    && !cursor->index->table->is_temporary()) {
2816 		/* Update the free bits of the B-tree page in the
2817 		insert buffer bitmap. */
2818 
2819 		if (next_block->page.zip.ssize) {
2820 			ibuf_update_free_bits_zip(next_block, mtr);
2821 		} else {
2822 			ibuf_update_free_bits_if_full(
2823 				next_block, max_size,
2824 				rec_offs_size(*offsets) + PAGE_DIR_SLOT_SIZE);
2825 		}
2826 	}
2827 
2828 	return(rec);
2829 }
2830 
2831 /*************************************************************//**
2832 Splits an index page to halves and inserts the tuple. It is assumed
2833 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2834 released within this function! NOTE that the operation of this
2835 function must always succeed, we cannot reverse it: therefore enough
2836 free disk space (2 pages) must be guaranteed to be available before
2837 this function is called.
2838 NOTE: jonaso added support for calling function with tuple == NULL
2839 which cause it to only split a page.
2840 
2841 @return inserted record or NULL if run out of space */
2842 rec_t*
btr_page_split_and_insert(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2843 btr_page_split_and_insert(
2844 /*======================*/
2845 	ulint		flags,	/*!< in: undo logging and locking flags */
2846 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
2847 				function returns, the cursor is positioned
2848 				on the predecessor of the inserted record */
2849 	rec_offs**	offsets,/*!< out: offsets on inserted record */
2850 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
2851 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2852 	ulint		n_ext,	/*!< in: number of externally stored columns */
2853 	mtr_t*		mtr)	/*!< in: mtr */
2854 {
2855 	buf_block_t*	block;
2856 	page_t*		page;
2857 	page_zip_des_t*	page_zip;
2858 	buf_block_t*	new_block;
2859 	page_t*		new_page;
2860 	page_zip_des_t*	new_page_zip;
2861 	rec_t*		split_rec;
2862 	buf_block_t*	left_block;
2863 	buf_block_t*	right_block;
2864 	page_cur_t*	page_cursor;
2865 	rec_t*		first_rec;
2866 	byte*		buf = 0; /* remove warning */
2867 	rec_t*		move_limit;
2868 	ulint		n_iterations = 0;
2869 	ulint		n_uniq;
2870 
2871 	if (cursor->index->is_spatial()) {
2872 		/* Split rtree page and update parent */
2873 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2874 						 tuple, n_ext, mtr));
2875 	}
2876 
2877 	if (!*heap) {
2878 		*heap = mem_heap_create(1024);
2879 	}
2880 	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
2881 func_start:
2882 	mem_heap_empty(*heap);
2883 	*offsets = NULL;
2884 
2885 	ut_ad(mtr_memo_contains_flagged(mtr,
2886 					dict_index_get_lock(cursor->index),
2887 					MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2888 	ut_ad(!dict_index_is_online_ddl(cursor->index)
2889 	      || (flags & BTR_CREATE_FLAG)
2890 	      || dict_index_is_clust(cursor->index));
2891 	ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
2892 				  RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
2893 
2894 	block = btr_cur_get_block(cursor);
2895 	page = buf_block_get_frame(block);
2896 	page_zip = buf_block_get_page_zip(block);
2897 
2898 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2899 	ut_ad(!page_is_empty(page));
2900 
2901 	/* try to insert to the next page if possible before split */
2902 	if (rec_t* rec = btr_insert_into_right_sibling(
2903 		    flags, cursor, offsets, *heap, tuple, n_ext, mtr)) {
2904 		return(rec);
2905 	}
2906 
2907 	/* 1. Decide the split record; split_rec == NULL means that the
2908 	tuple to be inserted should be the first record on the upper
2909 	half-page */
2910 	bool insert_left = false;
2911 	ulint hint_page_no = block->page.id.page_no() + 1;
2912 	byte direction = FSP_UP;
2913 
2914 	if (tuple && n_iterations > 0) {
2915 		split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
2916 
2917 		if (split_rec == NULL) {
2918 			insert_left = btr_page_tuple_smaller(
2919 				cursor, tuple, offsets, n_uniq, heap);
2920 		}
2921 	} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
2922 	} else if ((split_rec = btr_page_get_split_rec_to_left(cursor))) {
2923 		direction = FSP_DOWN;
2924 		hint_page_no -= 2;
2925 	} else {
2926 		/* If there is only one record in the index page, we
2927 		can't split the node in the middle by default. We need
2928 		to determine whether the new record will be inserted
2929 		to the left or right. */
2930 
2931 		if (page_get_n_recs(page) > 1) {
2932 			split_rec = page_get_middle_rec(page);
2933 		} else if (btr_page_tuple_smaller(cursor, tuple,
2934 						  offsets, n_uniq, heap)) {
2935 			split_rec = page_rec_get_next(
2936 				page_get_infimum_rec(page));
2937 		} else {
2938 			split_rec = NULL;
2939 		}
2940 	}
2941 
2942 	DBUG_EXECUTE_IF("disk_is_full",
2943 			os_has_said_disk_full = true;
2944 			return(NULL););
2945 
2946 	/* 2. Allocate a new page to the index */
2947 	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2948 				   btr_page_get_level(page), mtr, mtr);
2949 
2950 	if (!new_block) {
2951 		return(NULL);
2952 	}
2953 
2954 	new_page = buf_block_get_frame(new_block);
2955 	new_page_zip = buf_block_get_page_zip(new_block);
2956 	btr_page_create(new_block, new_page_zip, cursor->index,
2957 			btr_page_get_level(page), mtr);
2958 	/* Only record the leaf level page splits. */
2959 	if (page_is_leaf(page)) {
2960 		cursor->index->stat_defrag_n_page_split ++;
2961 		cursor->index->stat_defrag_modified_counter ++;
2962 		btr_defragment_save_defrag_stats_if_needed(cursor->index);
2963 	}
2964 
2965 	/* 3. Calculate the first record on the upper half-page, and the
2966 	first record (move_limit) on original page which ends up on the
2967 	upper half */
2968 
2969 	if (split_rec) {
2970 		first_rec = move_limit = split_rec;
2971 
2972 		*offsets = rec_get_offsets(split_rec, cursor->index, *offsets,
2973 					   page_is_leaf(page)
2974 					   ? cursor->index->n_core_fields : 0,
2975 					   n_uniq, heap);
2976 
2977 		insert_left = !tuple
2978 			|| cmp_dtuple_rec(tuple, split_rec, *offsets) < 0;
2979 
2980 		if (!insert_left && new_page_zip && n_iterations > 0) {
2981 			/* If a compressed page has already been split,
2982 			avoid further splits by inserting the record
2983 			to an empty page. */
2984 			split_rec = NULL;
2985 			goto insert_empty;
2986 		}
2987 	} else if (insert_left) {
2988 		ut_a(n_iterations > 0);
2989 		first_rec = page_rec_get_next(page_get_infimum_rec(page));
2990 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2991 	} else {
2992 insert_empty:
2993 		ut_ad(!split_rec);
2994 		ut_ad(!insert_left);
2995 		buf = UT_NEW_ARRAY_NOKEY(
2996 			byte,
2997 			rec_get_converted_size(cursor->index, tuple, n_ext));
2998 
2999 		first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
3000 						      tuple, n_ext);
3001 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
3002 	}
3003 
3004 	/* 4. Do first the modifications in the tree structure */
3005 
3006 	btr_attach_half_pages(flags, cursor->index, block,
3007 			      first_rec, new_block, direction, mtr);
3008 
3009 	/* If the split is made on the leaf level and the insert will fit
3010 	on the appropriate half-page, we may release the tree x-latch.
3011 	We can then move the records after releasing the tree latch,
3012 	thus reducing the tree latch contention. */
3013 	bool insert_will_fit;
3014 	if (tuple == NULL) {
3015 		insert_will_fit = true;
3016 	} else if (split_rec) {
3017 		insert_will_fit = !new_page_zip
3018 			&& btr_page_insert_fits(cursor, split_rec,
3019 						offsets, tuple, n_ext, heap);
3020 	} else {
3021 		if (!insert_left) {
3022 			UT_DELETE_ARRAY(buf);
3023 			buf = NULL;
3024 		}
3025 
3026 		insert_will_fit = !new_page_zip
3027 			&& btr_page_insert_fits(cursor, NULL,
3028 						offsets, tuple, n_ext, heap);
3029 	}
3030 
3031 	if (!srv_read_only_mode
3032 	    && insert_will_fit
3033 	    && page_is_leaf(page)
3034 	    && !dict_index_is_online_ddl(cursor->index)) {
3035 
3036 		mtr->memo_release(
3037 			dict_index_get_lock(cursor->index),
3038 			MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK);
3039 
3040 		/* NOTE: We cannot release root block latch here, because it
3041 		has segment header and already modified in most of cases.*/
3042 	}
3043 
3044 	/* 5. Move then the records to the new page */
3045 	if (direction == FSP_DOWN) {
3046 		/*		fputs("Split left\n", stderr); */
3047 
3048 		if (0
3049 #ifdef UNIV_ZIP_COPY
3050 		    || page_zip
3051 #endif /* UNIV_ZIP_COPY */
3052 		    || !page_move_rec_list_start(new_block, block, move_limit,
3053 						 cursor->index, mtr)) {
3054 			/* For some reason, compressing new_page failed,
3055 			even though it should contain fewer records than
3056 			the original page.  Copy the page byte for byte
3057 			and then delete the records from both pages
3058 			as appropriate.  Deleting will always succeed. */
3059 			ut_a(new_page_zip);
3060 
3061 			page_zip_copy_recs(new_page_zip, new_page,
3062 					   page_zip, page, cursor->index, mtr);
3063 			page_delete_rec_list_end(move_limit - page + new_page,
3064 						 new_block, cursor->index,
3065 						 ULINT_UNDEFINED,
3066 						 ULINT_UNDEFINED, mtr);
3067 
3068 			/* Update the lock table and possible hash index. */
3069 			lock_move_rec_list_start(
3070 				new_block, block, move_limit,
3071 				new_page + PAGE_NEW_INFIMUM);
3072 
3073 			btr_search_move_or_delete_hash_entries(
3074 				new_block, block);
3075 
3076 			/* Delete the records from the source page. */
3077 
3078 			page_delete_rec_list_start(move_limit, block,
3079 						   cursor->index, mtr);
3080 		}
3081 
3082 		left_block = new_block;
3083 		right_block = block;
3084 
3085 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
3086 			lock_update_split_left(right_block, left_block);
3087 		}
3088 	} else {
3089 		/*		fputs("Split right\n", stderr); */
3090 
3091 		if (0
3092 #ifdef UNIV_ZIP_COPY
3093 		    || page_zip
3094 #endif /* UNIV_ZIP_COPY */
3095 		    || !page_move_rec_list_end(new_block, block, move_limit,
3096 					       cursor->index, mtr)) {
3097 			/* For some reason, compressing new_page failed,
3098 			even though it should contain fewer records than
3099 			the original page.  Copy the page byte for byte
3100 			and then delete the records from both pages
3101 			as appropriate.  Deleting will always succeed. */
3102 			ut_a(new_page_zip);
3103 
3104 			page_zip_copy_recs(new_page_zip, new_page,
3105 					   page_zip, page, cursor->index, mtr);
3106 			page_delete_rec_list_start(move_limit - page
3107 						   + new_page, new_block,
3108 						   cursor->index, mtr);
3109 
3110 			/* Update the lock table and possible hash index. */
3111 			lock_move_rec_list_end(new_block, block, move_limit);
3112 
3113 			btr_search_move_or_delete_hash_entries(
3114 				new_block, block);
3115 
3116 			/* Delete the records from the source page. */
3117 
3118 			page_delete_rec_list_end(move_limit, block,
3119 						 cursor->index,
3120 						 ULINT_UNDEFINED,
3121 						 ULINT_UNDEFINED, mtr);
3122 		}
3123 
3124 		left_block = block;
3125 		right_block = new_block;
3126 
3127 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
3128 			lock_update_split_right(right_block, left_block);
3129 		}
3130 	}
3131 
3132 #ifdef UNIV_ZIP_DEBUG
3133 	if (page_zip) {
3134 		ut_a(page_zip_validate(page_zip, page, cursor->index));
3135 		ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
3136 	}
3137 #endif /* UNIV_ZIP_DEBUG */
3138 
3139 	/* At this point, split_rec, move_limit and first_rec may point
3140 	to garbage on the old page. */
3141 
3142 	/* 6. The split and the tree modification is now completed. Decide the
3143 	page where the tuple should be inserted */
3144 	rec_t* rec;
3145 	buf_block_t* const insert_block = insert_left
3146 		? left_block : right_block;
3147 
3148 	if (UNIV_UNLIKELY(!tuple)) {
3149 		rec = NULL;
3150 		goto func_exit;
3151 	}
3152 
3153 	/* 7. Reposition the cursor for insert and try insertion */
3154 	page_cursor = btr_cur_get_page_cur(cursor);
3155 
3156 	page_cur_search(insert_block, cursor->index, tuple, page_cursor);
3157 
3158 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3159 				    offsets, heap, n_ext, mtr);
3160 
3161 #ifdef UNIV_ZIP_DEBUG
3162 	{
3163 		page_t*		insert_page
3164 			= buf_block_get_frame(insert_block);
3165 
3166 		page_zip_des_t*	insert_page_zip
3167 			= buf_block_get_page_zip(insert_block);
3168 
3169 		ut_a(!insert_page_zip
3170 		     || page_zip_validate(insert_page_zip, insert_page,
3171 					  cursor->index));
3172 	}
3173 #endif /* UNIV_ZIP_DEBUG */
3174 
3175 	if (rec != NULL) {
3176 
3177 		goto func_exit;
3178 	}
3179 
3180 	/* 8. If insert did not fit, try page reorganization.
3181 	For compressed pages, page_cur_tuple_insert() will have
3182 	attempted this already. */
3183 
3184 	if (page_cur_get_page_zip(page_cursor)
3185 	    || !btr_page_reorganize(page_cursor, cursor->index, mtr)) {
3186 
3187 		goto insert_failed;
3188 	}
3189 
3190 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3191 				    offsets, heap, n_ext, mtr);
3192 
3193 	if (rec == NULL) {
3194 		/* The insert did not fit on the page: loop back to the
3195 		start of the function for a new split */
3196 insert_failed:
3197 		/* We play safe and reset the free bits for new_page */
3198 		if (!dict_index_is_clust(cursor->index)
3199 		    && !cursor->index->table->is_temporary()) {
3200 			ibuf_reset_free_bits(new_block);
3201 			ibuf_reset_free_bits(block);
3202 		}
3203 
3204 		n_iterations++;
3205 		ut_ad(n_iterations < 2
3206 		      || buf_block_get_page_zip(insert_block));
3207 		ut_ad(!insert_will_fit);
3208 
3209 		goto func_start;
3210 	}
3211 
3212 func_exit:
3213 	/* Insert fit on the page: update the free bits for the
3214 	left and right pages in the same mtr */
3215 
3216 	if (!dict_index_is_clust(cursor->index)
3217 	    && !cursor->index->table->is_temporary()
3218 	    && page_is_leaf(page)) {
3219 
3220 		ibuf_update_free_bits_for_two_pages_low(
3221 			left_block, right_block, mtr);
3222 	}
3223 
3224 	MONITOR_INC(MONITOR_INDEX_SPLIT);
3225 
3226 	ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
3227 	ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
3228 
3229 	ut_ad(tuple || !rec);
3230 	ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
3231 	return(rec);
3232 }
3233 
3234 /** Remove a page from the level list of pages.
3235 @param[in]	space		space where removed
3236 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
3237 @param[in,out]	page		page to remove
3238 @param[in]	index		index tree
3239 @param[in,out]	mtr		mini-transaction */
3240 dberr_t
btr_level_list_remove_func(ulint space,ulint zip_size,page_t * page,dict_index_t * index,mtr_t * mtr)3241 btr_level_list_remove_func(
3242 	ulint			space,
3243 	ulint			zip_size,
3244 	page_t*			page,
3245 	dict_index_t*		index,
3246 	mtr_t*			mtr)
3247 {
3248 	ut_ad(page != NULL);
3249 	ut_ad(mtr != NULL);
3250 	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
3251 	ut_ad(space == page_get_space_id(page));
3252 	/* Get the previous and next page numbers of page */
3253 
3254 	const uint32_t	prev_page_no = btr_page_get_prev(page);
3255 	const uint32_t	next_page_no = btr_page_get_next(page);
3256 
3257 	/* Update page links of the level */
3258 
3259 	if (prev_page_no != FIL_NULL) {
3260 		buf_block_t*	prev_block
3261 			= btr_block_get(page_id_t(space, prev_page_no),
3262 					zip_size, RW_X_LATCH, index, mtr);
3263 
3264 		page_t*		prev_page
3265 			= buf_block_get_frame(prev_block);
3266 #ifdef UNIV_BTR_DEBUG
3267 		ut_a(page_is_comp(prev_page) == page_is_comp(page));
3268 		ut_a(!memcmp(prev_page + FIL_PAGE_NEXT, page + FIL_PAGE_OFFSET,
3269 			     4));
3270 #endif /* UNIV_BTR_DEBUG */
3271 
3272 		btr_page_set_next(prev_page,
3273 				  buf_block_get_page_zip(prev_block),
3274 				  next_page_no, mtr);
3275 	}
3276 
3277 	if (next_page_no != FIL_NULL) {
3278 		buf_block_t*	next_block
3279 			= btr_block_get(
3280 				page_id_t(space, next_page_no), zip_size,
3281 				RW_X_LATCH, index, mtr);
3282 
3283 		if (!next_block) {
3284 			return DB_ERROR;
3285 		}
3286 
3287 		page_t*		next_page
3288 			= buf_block_get_frame(next_block);
3289 #ifdef UNIV_BTR_DEBUG
3290 		ut_a(page_is_comp(next_page) == page_is_comp(page));
3291 		ut_a(!memcmp(next_page + FIL_PAGE_PREV, page + FIL_PAGE_OFFSET,
3292 			     4));
3293 #endif /* UNIV_BTR_DEBUG */
3294 
3295 		btr_page_set_prev(next_page,
3296 				  buf_block_get_page_zip(next_block),
3297 				  prev_page_no, mtr);
3298 	}
3299 
3300 	return DB_SUCCESS;
3301 }
3302 
3303 /****************************************************************//**
3304 Writes the redo log record for setting an index record as the predefined
3305 minimum record. */
3306 UNIV_INLINE
3307 void
btr_set_min_rec_mark_log(rec_t * rec,mlog_id_t type,mtr_t * mtr)3308 btr_set_min_rec_mark_log(
3309 /*=====================*/
3310 	rec_t*		rec,	/*!< in: record */
3311 	mlog_id_t	type,	/*!< in: MLOG_COMP_REC_MIN_MARK or
3312 				MLOG_REC_MIN_MARK */
3313 	mtr_t*		mtr)	/*!< in: mtr */
3314 {
3315 	mlog_write_initial_log_record(rec, type, mtr);
3316 
3317 	/* Write rec offset as a 2-byte ulint */
3318 	mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
3319 }
3320 
3321 /****************************************************************//**
3322 Parses the redo log record for setting an index record as the predefined
3323 minimum record.
3324 @return end of log record or NULL */
3325 byte*
btr_parse_set_min_rec_mark(byte * ptr,byte * end_ptr,ulint comp,page_t * page,mtr_t * mtr)3326 btr_parse_set_min_rec_mark(
3327 /*=======================*/
3328 	byte*	ptr,	/*!< in: buffer */
3329 	byte*	end_ptr,/*!< in: buffer end */
3330 	ulint	comp,	/*!< in: nonzero=compact page format */
3331 	page_t*	page,	/*!< in: page or NULL */
3332 	mtr_t*	mtr)	/*!< in: mtr or NULL */
3333 {
3334 	rec_t*	rec;
3335 
3336 	if (end_ptr < ptr + 2) {
3337 
3338 		return(NULL);
3339 	}
3340 
3341 	if (page) {
3342 		ut_a(!page_is_comp(page) == !comp);
3343 
3344 		rec = page + mach_read_from_2(ptr);
3345 
3346 		btr_set_min_rec_mark(rec, mtr);
3347 	}
3348 
3349 	return(ptr + 2);
3350 }
3351 
3352 /** Sets a record as the predefined minimum record. */
btr_set_min_rec_mark(rec_t * rec,mtr_t * mtr)3353 void btr_set_min_rec_mark(rec_t* rec, mtr_t* mtr)
3354 {
3355 	const bool comp = page_rec_is_comp(rec);
3356 
3357 	ut_ad(rec == page_rec_get_next_const(page_get_infimum_rec(
3358 						     page_align(rec))));
3359 	ut_ad(!(rec_get_info_bits(page_rec_get_next(rec), comp)
3360 		& REC_INFO_MIN_REC_FLAG));
3361 
3362 	size_t info_bits = rec_get_info_bits(rec, comp);
3363 	if (comp) {
3364 		rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3365 
3366 		btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
3367 	} else {
3368 		rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3369 
3370 		btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
3371 	}
3372 }
3373 
3374 /*************************************************************//**
3375 If page is the only on its level, this function moves its records to the
3376 father page, thus reducing the tree height.
3377 @return father block */
3378 UNIV_INTERN
3379 buf_block_t*
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3380 btr_lift_page_up(
3381 /*=============*/
3382 	dict_index_t*	index,	/*!< in: index tree */
3383 	buf_block_t*	block,	/*!< in: page which is the only on its level;
3384 				must not be empty: use
3385 				btr_discard_only_page_on_level if the last
3386 				record from the page should be removed */
3387 	mtr_t*		mtr)	/*!< in: mtr */
3388 {
3389 	buf_block_t*	father_block;
3390 	page_t*		father_page;
3391 	ulint		page_level;
3392 	page_zip_des_t*	father_page_zip;
3393 	page_t*		page		= buf_block_get_frame(block);
3394 	ulint		root_page_no;
3395 	buf_block_t*	blocks[BTR_MAX_LEVELS];
3396 	ulint		n_blocks;	/*!< last used index in blocks[] */
3397 	ulint		i;
3398 	bool		lift_father_up;
3399 	buf_block_t*	block_orig	= block;
3400 
3401 	ut_ad(!page_has_siblings(page));
3402 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3403 
3404 	page_level = btr_page_get_level(page);
3405 	root_page_no = dict_index_get_page(index);
3406 
3407 	{
3408 		btr_cur_t	cursor;
3409 		rec_offs*	offsets	= NULL;
3410 		mem_heap_t*	heap	= mem_heap_create(
3411 			sizeof(*offsets)
3412 			* (REC_OFFS_HEADER_SIZE + 1 + 1
3413 			   + unsigned(index->n_fields)));
3414 		buf_block_t*	b;
3415 
3416 		if (dict_index_is_spatial(index)) {
3417 			offsets = rtr_page_get_father_block(
3418 				NULL, heap, index, block, mtr,
3419 				NULL, &cursor);
3420 		} else {
3421 			offsets = btr_page_get_father_block(offsets, heap,
3422 							    index, block,
3423 							    mtr, &cursor);
3424 		}
3425 		father_block = btr_cur_get_block(&cursor);
3426 		father_page_zip = buf_block_get_page_zip(father_block);
3427 		father_page = buf_block_get_frame(father_block);
3428 
3429 		n_blocks = 0;
3430 
3431 		/* Store all ancestor pages so we can reset their
3432 		levels later on.  We have to do all the searches on
3433 		the tree now because later on, after we've replaced
3434 		the first level, the tree is in an inconsistent state
3435 		and can not be searched. */
3436 		for (b = father_block;
3437 		     b->page.id.page_no() != root_page_no; ) {
3438 			ut_a(n_blocks < BTR_MAX_LEVELS);
3439 
3440 			if (dict_index_is_spatial(index)) {
3441 				offsets = rtr_page_get_father_block(
3442 					NULL, heap, index, b, mtr,
3443 					NULL, &cursor);
3444 			} else {
3445 				offsets = btr_page_get_father_block(offsets,
3446 								    heap,
3447 								    index, b,
3448 								    mtr,
3449 								    &cursor);
3450 			}
3451 
3452 			blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
3453 		}
3454 
3455 		lift_father_up = (n_blocks && page_level == 0);
3456 		if (lift_father_up) {
3457 			/* The father page also should be the only on its level (not
3458 			root). We should lift up the father page at first.
3459 			Because the leaf page should be lifted up only for root page.
3460 			The freeing page is based on page_level (==0 or !=0)
3461 			to choose segment. If the page_level is changed ==0 from !=0,
3462 			later freeing of the page doesn't find the page allocation
3463 			to be freed.*/
3464 
3465 			block = father_block;
3466 			page = buf_block_get_frame(block);
3467 			page_level = btr_page_get_level(page);
3468 
3469 			ut_ad(!page_has_siblings(page));
3470 			ut_ad(mtr_memo_contains(
3471 				      mtr, block, MTR_MEMO_PAGE_X_FIX));
3472 
3473 			father_block = blocks[0];
3474 			father_page_zip = buf_block_get_page_zip(father_block);
3475 			father_page = buf_block_get_frame(father_block);
3476 		}
3477 
3478 		mem_heap_free(heap);
3479 	}
3480 
3481 	btr_search_drop_page_hash_index(block);
3482 
3483 	/* Make the father empty */
3484 	btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
3485 	/* btr_page_empty() is supposed to zero-initialize the field. */
3486 	ut_ad(!page_get_instant(father_block->frame));
3487 
3488 	if (index->is_instant()
3489 	    && father_block->page.id.page_no() == root_page_no) {
3490 		ut_ad(!father_page_zip);
3491 		btr_set_instant(father_block, *index, mtr);
3492 	}
3493 
3494 	page_level++;
3495 
3496 	/* Copy the records to the father page one by one. */
3497 	if (0
3498 #ifdef UNIV_ZIP_COPY
3499 	    || father_page_zip
3500 #endif /* UNIV_ZIP_COPY */
3501 	    || !page_copy_rec_list_end(father_block, block,
3502 				       page_get_infimum_rec(page),
3503 				       index, mtr)) {
3504 		const page_zip_des_t*	page_zip
3505 			= buf_block_get_page_zip(block);
3506 		ut_a(father_page_zip);
3507 		ut_a(page_zip);
3508 
3509 		/* Copy the page byte for byte. */
3510 		page_zip_copy_recs(father_page_zip, father_page,
3511 				   page_zip, page, index, mtr);
3512 
3513 		/* Update the lock table and possible hash index. */
3514 
3515 		lock_move_rec_list_end(father_block, block,
3516 				       page_get_infimum_rec(page));
3517 
3518 		/* Also update the predicate locks */
3519 		if (dict_index_is_spatial(index)) {
3520 			lock_prdt_rec_move(father_block, block);
3521 		} else {
3522 			btr_search_move_or_delete_hash_entries(
3523 				father_block, block);
3524 		}
3525 	}
3526 
3527 	if (!dict_table_is_locking_disabled(index->table)) {
3528 		/* Free predicate page locks on the block */
3529 		if (dict_index_is_spatial(index)) {
3530 			lock_mutex_enter();
3531 			lock_prdt_page_free_from_discard(
3532 				block, lock_sys.prdt_page_hash);
3533 			lock_mutex_exit();
3534 		}
3535 		lock_update_copy_and_discard(father_block, block);
3536 	}
3537 
3538 	/* Go upward to root page, decrementing levels by one. */
3539 	for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
3540 		page_t*		page	= buf_block_get_frame(blocks[i]);
3541 		page_zip_des_t*	page_zip= buf_block_get_page_zip(blocks[i]);
3542 
3543 		ut_ad(btr_page_get_level(page) == page_level + 1);
3544 
3545 		btr_page_set_level(page, page_zip, page_level, mtr);
3546 #ifdef UNIV_ZIP_DEBUG
3547 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3548 #endif /* UNIV_ZIP_DEBUG */
3549 	}
3550 
3551 	if (dict_index_is_spatial(index)) {
3552 		rtr_check_discard_page(index, NULL, block);
3553 	}
3554 
3555 	/* Free the file page */
3556 	btr_page_free(index, block, mtr);
3557 
3558 	/* We play it safe and reset the free bits for the father */
3559 	if (!dict_index_is_clust(index)
3560 	    && !index->table->is_temporary()) {
3561 		ibuf_reset_free_bits(father_block);
3562 	}
3563 	ut_ad(page_validate(father_page, index));
3564 	ut_ad(btr_check_node_ptr(index, father_block, mtr));
3565 
3566 	return(lift_father_up ? block_orig : father_block);
3567 }
3568 
3569 /*************************************************************//**
3570 Tries to merge the page first to the left immediate brother if such a
3571 brother exists, and the node pointers to the current page and to the brother
3572 reside on the same page. If the left brother does not satisfy these
3573 conditions, looks at the right brother. If the page is the only one on that
3574 level lifts the records of the page to the father page, thus reducing the
3575 tree height. It is assumed that mtr holds an x-latch on the tree and on the
3576 page. If cursor is on the leaf level, mtr must also hold x-latches to the
3577 brothers, if they exist.
3578 @return TRUE on success */
3579 ibool
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3580 btr_compress(
3581 /*=========*/
3582 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to merge
3583 				or lift; the page must not be empty:
3584 				when deleting records, use btr_discard_page()
3585 				if the page would become empty */
3586 	ibool		adjust,	/*!< in: TRUE if should adjust the
3587 				cursor position even if compression occurs */
3588 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3589 {
3590 	dict_index_t*	index;
3591 	ulint		left_page_no;
3592 	ulint		right_page_no;
3593 	buf_block_t*	merge_block;
3594 	page_t*		merge_page = NULL;
3595 	page_zip_des_t*	merge_page_zip;
3596 	ibool		is_left;
3597 	buf_block_t*	block;
3598 	page_t*		page;
3599 	btr_cur_t	father_cursor;
3600 	mem_heap_t*	heap;
3601 	rec_offs*	offsets;
3602 	ulint		nth_rec = 0; /* remove bogus warning */
3603 	bool		mbr_changed = false;
3604 #ifdef UNIV_DEBUG
3605 	bool		leftmost_child;
3606 #endif
3607 	DBUG_ENTER("btr_compress");
3608 
3609 	block = btr_cur_get_block(cursor);
3610 	page = btr_cur_get_page(cursor);
3611 	index = btr_cur_get_index(cursor);
3612 
3613 	btr_assert_not_corrupted(block, index);
3614 
3615 #ifdef UNIV_DEBUG
3616 	if (dict_index_is_spatial(index)) {
3617 		ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3618 						MTR_MEMO_X_LOCK));
3619 	} else {
3620 		ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3621 						MTR_MEMO_X_LOCK
3622 						| MTR_MEMO_SX_LOCK));
3623 	}
3624 #endif /* UNIV_DEBUG */
3625 
3626 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3627 
3628 	const ulint zip_size = index->table->space->zip_size();
3629 
3630 	MONITOR_INC(MONITOR_INDEX_MERGE_ATTEMPTS);
3631 
3632 	left_page_no = btr_page_get_prev(page);
3633 	right_page_no = btr_page_get_next(page);
3634 
3635 #ifdef UNIV_DEBUG
3636 	if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3637 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3638 			page_rec_get_next(page_get_infimum_rec(page)),
3639 			page_is_comp(page)));
3640 	}
3641 #endif /* UNIV_DEBUG */
3642 
3643 	heap = mem_heap_create(100);
3644 
3645 	if (dict_index_is_spatial(index)) {
3646 		offsets = rtr_page_get_father_block(
3647 			NULL, heap, index, block, mtr, cursor, &father_cursor);
3648 		ut_ad(cursor->page_cur.block->page.id.page_no()
3649 		      == block->page.id.page_no());
3650 		rec_t*  my_rec = father_cursor.page_cur.rec;
3651 
3652 		ulint page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3653 
3654 		if (page_no != block->page.id.page_no()) {
3655 			ib::info() << "father positioned on page "
3656 				<< page_no << "instead of "
3657 				<< block->page.id.page_no();
3658 			offsets = btr_page_get_father_block(
3659 				NULL, heap, index, block, mtr, &father_cursor);
3660 		}
3661 	} else {
3662 		offsets = btr_page_get_father_block(
3663 			NULL, heap, index, block, mtr, &father_cursor);
3664 	}
3665 
3666 	if (adjust) {
3667 		nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3668 		ut_ad(nth_rec > 0);
3669 	}
3670 
3671 	if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3672 		/* The page is the only one on the level, lift the records
3673 		to the father */
3674 
3675 		merge_block = btr_lift_page_up(index, block, mtr);
3676 		goto func_exit;
3677 	}
3678 
3679 	ut_d(leftmost_child =
3680 		left_page_no != FIL_NULL
3681 		&& (page_rec_get_next(
3682 			page_get_infimum_rec(
3683 				btr_cur_get_page(&father_cursor)))
3684 		    == btr_cur_get_rec(&father_cursor)));
3685 
3686 	/* Decide the page to which we try to merge and which will inherit
3687 	the locks */
3688 
3689 	is_left = btr_can_merge_with_page(cursor, left_page_no,
3690 					  &merge_block, mtr);
3691 
3692 	DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3693 retry:
3694 	if (!is_left
3695 	   && !btr_can_merge_with_page(cursor, right_page_no, &merge_block,
3696 				       mtr)) {
3697 		if (!merge_block) {
3698 			merge_page = NULL;
3699 		}
3700 		goto err_exit;
3701 	}
3702 
3703 	merge_page = buf_block_get_frame(merge_block);
3704 
3705 #ifdef UNIV_BTR_DEBUG
3706 	if (is_left) {
3707 		ut_a(btr_page_get_next(merge_page)
3708 		     == block->page.id.page_no());
3709 	} else {
3710 		ut_a(btr_page_get_prev(merge_page)
3711 		     == block->page.id.page_no());
3712 	}
3713 #endif /* UNIV_BTR_DEBUG */
3714 
3715 #ifdef UNIV_GIS_DEBUG
3716 	if (dict_index_is_spatial(index)) {
3717 		if (is_left) {
3718 			fprintf(stderr, "GIS_DIAG: merge left  %ld to %ld \n",
3719 				(long) block->page.id.page_no(), left_page_no);
3720 		} else {
3721 			fprintf(stderr, "GIS_DIAG: merge right %ld to %ld\n",
3722 				(long) block->page.id.page_no(), right_page_no);
3723 		}
3724 	}
3725 #endif /* UNIV_GIS_DEBUG */
3726 
3727 	ut_ad(page_validate(merge_page, index));
3728 
3729 	merge_page_zip = buf_block_get_page_zip(merge_block);
3730 #ifdef UNIV_ZIP_DEBUG
3731 	if (merge_page_zip) {
3732 		const page_zip_des_t*	page_zip
3733 			= buf_block_get_page_zip(block);
3734 		ut_a(page_zip);
3735 		ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3736 		ut_a(page_zip_validate(page_zip, page, index));
3737 	}
3738 #endif /* UNIV_ZIP_DEBUG */
3739 
3740 	/* Move records to the merge page */
3741 	if (is_left) {
3742 		btr_cur_t	cursor2;
3743 		rtr_mbr_t	new_mbr;
3744 		rec_offs*	offsets2 = NULL;
3745 
3746 		/* For rtree, we need to update father's mbr. */
3747 		if (index->is_spatial()) {
3748 			/* We only support merge pages with the same parent
3749 			page */
3750 			if (!rtr_check_same_block(
3751 				index, &cursor2,
3752 				btr_cur_get_block(&father_cursor),
3753 				merge_block, heap)) {
3754 				is_left = false;
3755 				goto retry;
3756 			}
3757 
3758 			/* Set rtr_info for cursor2, since it is
3759 			necessary in recursive page merge. */
3760 			cursor2.rtr_info = cursor->rtr_info;
3761 			cursor2.tree_height = cursor->tree_height;
3762 
3763 			offsets2 = rec_get_offsets(
3764 				btr_cur_get_rec(&cursor2), index, NULL,
3765 				page_is_leaf(cursor2.page_cur.block->frame)
3766 				? index->n_fields : 0,
3767 				ULINT_UNDEFINED, &heap);
3768 
3769 			/* Check if parent entry needs to be updated */
3770 			mbr_changed = rtr_merge_mbr_changed(
3771 				&cursor2, &father_cursor,
3772 				offsets2, offsets, &new_mbr);
3773 		}
3774 
3775 		rec_t*	orig_pred = page_copy_rec_list_start(
3776 			merge_block, block, page_get_supremum_rec(page),
3777 			index, mtr);
3778 
3779 		if (!orig_pred) {
3780 			goto err_exit;
3781 		}
3782 
3783 		btr_search_drop_page_hash_index(block);
3784 
3785 		/* Remove the page from the level list */
3786 		if (DB_SUCCESS != btr_level_list_remove(index->table->space_id,
3787 							zip_size, page, index,
3788 							mtr)) {
3789 			goto err_exit;
3790 		}
3791 
3792 		if (dict_index_is_spatial(index)) {
3793 			rec_t*  my_rec = father_cursor.page_cur.rec;
3794 
3795 			ulint page_no = btr_node_ptr_get_child_page_no(
3796 						my_rec, offsets);
3797 
3798 			if (page_no != block->page.id.page_no()) {
3799 
3800 				ib::fatal() << "father positioned on "
3801 					<< page_no << " instead of "
3802 					<< block->page.id.page_no();
3803 
3804 				ut_ad(0);
3805 			}
3806 
3807 			if (mbr_changed) {
3808 #ifdef UNIV_DEBUG
3809 				bool	success = rtr_update_mbr_field(
3810 					&cursor2, offsets2, &father_cursor,
3811 					merge_page, &new_mbr, NULL, mtr);
3812 
3813 				ut_ad(success);
3814 #else
3815 				rtr_update_mbr_field(
3816 					&cursor2, offsets2, &father_cursor,
3817 					merge_page, &new_mbr, NULL, mtr);
3818 #endif
3819 			} else {
3820 				rtr_node_ptr_delete(&father_cursor, mtr);
3821 			}
3822 
3823 			/* No GAP lock needs to be worrying about */
3824 			lock_mutex_enter();
3825 			lock_prdt_page_free_from_discard(
3826 				block, lock_sys.prdt_page_hash);
3827 			lock_rec_free_all_from_discard_page(block);
3828 			lock_mutex_exit();
3829 		} else {
3830 			btr_cur_node_ptr_delete(&father_cursor, mtr);
3831 			if (!dict_table_is_locking_disabled(index->table)) {
3832 				lock_update_merge_left(
3833 					merge_block, orig_pred, block);
3834 			}
3835 		}
3836 
3837 		if (adjust) {
3838 			nth_rec += page_rec_get_n_recs_before(orig_pred);
3839 		}
3840 	} else {
3841 		rec_t*		orig_succ;
3842 		ibool		compressed;
3843 		dberr_t		err;
3844 		btr_cur_t	cursor2;
3845 					/* father cursor pointing to node ptr
3846 					of the right sibling */
3847 #ifdef UNIV_BTR_DEBUG
3848 		byte		fil_page_prev[4];
3849 #endif /* UNIV_BTR_DEBUG */
3850 
3851 		if (dict_index_is_spatial(index)) {
3852 			cursor2.rtr_info = NULL;
3853 
3854 			/* For spatial index, we disallow merge of blocks
3855 			with different parents, since the merge would need
3856 			to update entry (for MBR and Primary key) in the
3857 			parent of block being merged */
3858 			if (!rtr_check_same_block(
3859 				index, &cursor2,
3860 				btr_cur_get_block(&father_cursor),
3861 				merge_block, heap)) {
3862 				goto err_exit;
3863 			}
3864 
3865 			/* Set rtr_info for cursor2, since it is
3866 			necessary in recursive page merge. */
3867 			cursor2.rtr_info = cursor->rtr_info;
3868 			cursor2.tree_height = cursor->tree_height;
3869 		} else {
3870 			btr_page_get_father(index, merge_block, mtr, &cursor2);
3871 		}
3872 
3873 		if (merge_page_zip && left_page_no == FIL_NULL) {
3874 
3875 			/* The function page_zip_compress(), which will be
3876 			invoked by page_copy_rec_list_end() below,
3877 			requires that FIL_PAGE_PREV be FIL_NULL.
3878 			Clear the field, but prepare to restore it. */
3879 #ifdef UNIV_BTR_DEBUG
3880 			memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3881 #endif /* UNIV_BTR_DEBUG */
3882 			compile_time_assert(FIL_NULL == 0xffffffffU);
3883 			memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
3884 		}
3885 
3886 		orig_succ = page_copy_rec_list_end(merge_block, block,
3887 						   page_get_infimum_rec(page),
3888 						   cursor->index, mtr);
3889 
3890 		if (!orig_succ) {
3891 			ut_a(merge_page_zip);
3892 #ifdef UNIV_BTR_DEBUG
3893 			if (left_page_no == FIL_NULL) {
3894 				/* FIL_PAGE_PREV was restored from
3895 				merge_page_zip. */
3896 				ut_a(!memcmp(fil_page_prev,
3897 					     merge_page + FIL_PAGE_PREV, 4));
3898 			}
3899 #endif /* UNIV_BTR_DEBUG */
3900 			goto err_exit;
3901 		}
3902 
3903 		btr_search_drop_page_hash_index(block);
3904 
3905 #ifdef UNIV_BTR_DEBUG
3906 		if (merge_page_zip && left_page_no == FIL_NULL) {
3907 
3908 			/* Restore FIL_PAGE_PREV in order to avoid an assertion
3909 			failure in btr_level_list_remove(), which will set
3910 			the field again to FIL_NULL.  Even though this makes
3911 			merge_page and merge_page_zip inconsistent for a
3912 			split second, it is harmless, because the pages
3913 			are X-latched. */
3914 			memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3915 		}
3916 #endif /* UNIV_BTR_DEBUG */
3917 
3918 		/* Remove the page from the level list */
3919 		if (DB_SUCCESS != btr_level_list_remove(index->table->space_id,
3920 							zip_size, page, index,
3921 							mtr)) {
3922 			goto err_exit;
3923 		}
3924 
3925 		ut_ad(btr_node_ptr_get_child_page_no(
3926 			btr_cur_get_rec(&father_cursor), offsets)
3927 			== block->page.id.page_no());
3928 
3929 		/* Replace the address of the old child node (= page) with the
3930 		address of the merge page to the right */
3931 		btr_node_ptr_set_child_page_no(
3932 			btr_cur_get_rec(&father_cursor),
3933 			btr_cur_get_page_zip(&father_cursor),
3934 			offsets, right_page_no, mtr);
3935 
3936 #ifdef UNIV_DEBUG
3937 		if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3938 			ut_ad(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3939 				page_rec_get_next(page_get_infimum_rec(
3940 					buf_block_get_frame(merge_block))),
3941 				page_is_comp(page)));
3942 		}
3943 #endif /* UNIV_DEBUG */
3944 
3945 		/* For rtree, we need to update father's mbr. */
3946 		if (index->is_spatial()) {
3947 			rec_offs* offsets2;
3948 			ulint	rec_info;
3949 
3950 			offsets2 = rec_get_offsets(
3951 				btr_cur_get_rec(&cursor2), index, NULL,
3952 				page_is_leaf(cursor2.page_cur.block->frame)
3953 				? index->n_fields : 0,
3954 				ULINT_UNDEFINED, &heap);
3955 
3956 			ut_ad(btr_node_ptr_get_child_page_no(
3957 				btr_cur_get_rec(&cursor2), offsets2)
3958 				== right_page_no);
3959 
3960 			rec_info = rec_get_info_bits(
3961 				btr_cur_get_rec(&father_cursor),
3962 				rec_offs_comp(offsets));
3963 			if (rec_info & REC_INFO_MIN_REC_FLAG) {
3964 				/* When the father node ptr is minimal rec,
3965 				we will keep it and delete the node ptr of
3966 				merge page. */
3967 				rtr_merge_and_update_mbr(&father_cursor,
3968 							 &cursor2,
3969 							 offsets, offsets2,
3970 							 merge_page, mtr);
3971 			} else {
3972 				/* Otherwise, we will keep the node ptr of
3973 				merge page and delete the father node ptr.
3974 				This is for keeping the rec order in upper
3975 				level. */
3976 				rtr_merge_and_update_mbr(&cursor2,
3977 							 &father_cursor,
3978 							 offsets2, offsets,
3979 							 merge_page, mtr);
3980 			}
3981 			lock_mutex_enter();
3982 			lock_prdt_page_free_from_discard(
3983 				block, lock_sys.prdt_page_hash);
3984 			lock_rec_free_all_from_discard_page(block);
3985 			lock_mutex_exit();
3986 		} else {
3987 
3988 			compressed = btr_cur_pessimistic_delete(&err, TRUE,
3989 								&cursor2,
3990 								BTR_CREATE_FLAG,
3991 								false, mtr);
3992 			ut_a(err == DB_SUCCESS);
3993 
3994 			if (!compressed) {
3995 				btr_cur_compress_if_useful(&cursor2,
3996 							   FALSE,
3997 							   mtr);
3998 			}
3999 
4000 			if (!dict_table_is_locking_disabled(index->table)) {
4001 				lock_update_merge_right(
4002 					merge_block, orig_succ, block);
4003 			}
4004 		}
4005 	}
4006 
4007 	if (!dict_index_is_clust(index)
4008 	    && !index->table->is_temporary()
4009 	    && page_is_leaf(merge_page)) {
4010 		/* Update the free bits of the B-tree page in the
4011 		insert buffer bitmap.  This has to be done in a
4012 		separate mini-transaction that is committed before the
4013 		main mini-transaction.  We cannot update the insert
4014 		buffer bitmap in this mini-transaction, because
4015 		btr_compress() can be invoked recursively without
4016 		committing the mini-transaction in between.  Since
4017 		insert buffer bitmap pages have a lower rank than
4018 		B-tree pages, we must not access other pages in the
4019 		same mini-transaction after accessing an insert buffer
4020 		bitmap page. */
4021 
4022 		/* The free bits in the insert buffer bitmap must
4023 		never exceed the free space on a page.  It is safe to
4024 		decrement or reset the bits in the bitmap in a
4025 		mini-transaction that is committed before the
4026 		mini-transaction that affects the free space. */
4027 
4028 		/* It is unsafe to increment the bits in a separately
4029 		committed mini-transaction, because in crash recovery,
4030 		the free bits could momentarily be set too high. */
4031 
4032 		if (zip_size) {
4033 			/* Because the free bits may be incremented
4034 			and we cannot update the insert buffer bitmap
4035 			in the same mini-transaction, the only safe
4036 			thing we can do here is the pessimistic
4037 			approach: reset the free bits. */
4038 			ibuf_reset_free_bits(merge_block);
4039 		} else {
4040 			/* On uncompressed pages, the free bits will
4041 			never increase here.  Thus, it is safe to
4042 			write the bits accurately in a separate
4043 			mini-transaction. */
4044 			ibuf_update_free_bits_if_full(merge_block,
4045 						      srv_page_size,
4046 						      ULINT_UNDEFINED);
4047 		}
4048 	}
4049 
4050 	ut_ad(page_validate(merge_page, index));
4051 #ifdef UNIV_ZIP_DEBUG
4052 	ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page,
4053 						  index));
4054 #endif /* UNIV_ZIP_DEBUG */
4055 
4056 	if (dict_index_is_spatial(index)) {
4057 #ifdef UNIV_GIS_DEBUG
4058 		fprintf(stderr, "GIS_DIAG: compressed away  %ld\n",
4059 			(long) block->page.id.page_no());
4060 		fprintf(stderr, "GIS_DIAG: merged to %ld\n",
4061 			(long) merge_block->page.id.page_no());
4062 #endif
4063 
4064 		rtr_check_discard_page(index, NULL, block);
4065 	}
4066 
4067 	/* Free the file page */
4068 	btr_page_free(index, block, mtr);
4069 
4070 	/* btr_check_node_ptr() needs parent block latched.
4071 	If the merge_block's parent block is not same,
4072 	we cannot use btr_check_node_ptr() */
4073 	ut_ad(leftmost_child
4074 	      || btr_check_node_ptr(index, merge_block, mtr));
4075 func_exit:
4076 	mem_heap_free(heap);
4077 
4078 	if (adjust) {
4079 		ut_ad(nth_rec > 0);
4080 		btr_cur_position(
4081 			index,
4082 			page_rec_get_nth(merge_block->frame, nth_rec),
4083 			merge_block, cursor);
4084 	}
4085 
4086 	MONITOR_INC(MONITOR_INDEX_MERGE_SUCCESSFUL);
4087 
4088 	DBUG_RETURN(TRUE);
4089 
4090 err_exit:
4091 	/* We play it safe and reset the free bits. */
4092 	if (zip_size
4093 	    && merge_page
4094 	    && page_is_leaf(merge_page)
4095 	    && !dict_index_is_clust(index)) {
4096 
4097 		ibuf_reset_free_bits(merge_block);
4098 	}
4099 
4100 	mem_heap_free(heap);
4101 	DBUG_RETURN(FALSE);
4102 }
4103 
4104 /*************************************************************//**
4105 Discards a page that is the only page on its level.  This will empty
4106 the whole B-tree, leaving just an empty root page.  This function
4107 should almost never be reached, because btr_compress(), which is invoked in
4108 delete operations, calls btr_lift_page_up() to flatten the B-tree. */
4109 ATTRIBUTE_COLD
4110 static
4111 void
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4112 btr_discard_only_page_on_level(
4113 /*===========================*/
4114 	dict_index_t*	index,	/*!< in: index tree */
4115 	buf_block_t*	block,	/*!< in: page which is the only on its level */
4116 	mtr_t*		mtr)	/*!< in: mtr */
4117 {
4118 	ulint		page_level = 0;
4119 
4120 	ut_ad(!index->is_dummy);
4121 
4122 	/* Save the PAGE_MAX_TRX_ID from the leaf page. */
4123 	const trx_id_t max_trx_id = page_get_max_trx_id(block->frame);
4124 	const rec_t* r = page_rec_get_next(page_get_infimum_rec(block->frame));
4125 	ut_ad(rec_is_metadata(r, *index) == index->is_instant());
4126 
4127 	while (block->page.id.page_no() != dict_index_get_page(index)) {
4128 		btr_cur_t	cursor;
4129 		buf_block_t*	father;
4130 		const page_t*	page	= buf_block_get_frame(block);
4131 
4132 		ut_a(page_get_n_recs(page) == 1);
4133 		ut_a(page_level == btr_page_get_level(page));
4134 		ut_a(!page_has_siblings(page));
4135 		ut_ad(fil_page_index_page_check(page));
4136 		ut_ad(block->page.id.space() == index->table->space->id);
4137 		ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4138 		btr_search_drop_page_hash_index(block);
4139 
4140 		if (dict_index_is_spatial(index)) {
4141 			/* Check any concurrent search having this page */
4142 			rtr_check_discard_page(index, NULL, block);
4143 			rtr_page_get_father(index, block, mtr, NULL, &cursor);
4144 		} else {
4145 			btr_page_get_father(index, block, mtr, &cursor);
4146 		}
4147 		father = btr_cur_get_block(&cursor);
4148 
4149 		if (!dict_table_is_locking_disabled(index->table)) {
4150 			lock_update_discard(
4151 				father, PAGE_HEAP_NO_SUPREMUM, block);
4152 		}
4153 
4154 		/* Free the file page */
4155 		btr_page_free(index, block, mtr);
4156 
4157 		block = father;
4158 		page_level++;
4159 	}
4160 
4161 	/* block is the root page, which must be empty, except
4162 	for the node pointer to the (now discarded) block(s). */
4163 	ut_ad(!page_has_siblings(block->frame));
4164 
4165 #ifdef UNIV_BTR_DEBUG
4166 	if (!dict_index_is_ibuf(index)) {
4167 		const page_t*	root	= buf_block_get_frame(block);
4168 		const ulint	space	= index->table->space_id;
4169 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
4170 					    + root, space));
4171 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
4172 					    + root, space));
4173 	}
4174 #endif /* UNIV_BTR_DEBUG */
4175 
4176 	mem_heap_t* heap = nullptr;
4177 	const rec_t* rec = nullptr;
4178 	rec_offs* offsets = nullptr;
4179 	if (index->table->instant) {
4180 		if (rec_is_alter_metadata(r, *index)) {
4181 			heap = mem_heap_create(srv_page_size);
4182 			offsets = rec_get_offsets(r, index, nullptr,
4183 						  index->n_core_fields,
4184 						  ULINT_UNDEFINED, &heap);
4185 			rec = rec_copy(mem_heap_alloc(heap,
4186 						      rec_offs_size(offsets)),
4187 				       r, offsets);
4188 			rec_offs_make_valid(rec, index, true, offsets);
4189 		}
4190 	}
4191 
4192 	btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
4193 	ut_ad(page_is_leaf(buf_block_get_frame(block)));
4194 	/* btr_page_empty() is supposed to zero-initialize the field. */
4195 	ut_ad(!page_get_instant(block->frame));
4196 
4197 	if (index->is_primary()) {
4198 		if (rec) {
4199 			DBUG_ASSERT(index->table->instant);
4200 			DBUG_ASSERT(rec_is_alter_metadata(rec, *index));
4201 			btr_set_instant(block, *index, mtr);
4202 			rec = page_cur_insert_rec_low(
4203 				page_get_infimum_rec(block->frame),
4204 				index, rec, offsets, mtr);
4205 			ut_ad(rec);
4206 			mem_heap_free(heap);
4207 		} else if (index->is_instant()) {
4208 			index->clear_instant_add();
4209 		}
4210 	} else if (!index->table->is_temporary()) {
4211 		/* We play it safe and reset the free bits for the root */
4212 		ibuf_reset_free_bits(block);
4213 
4214 		ut_a(max_trx_id);
4215 		page_set_max_trx_id(block,
4216 				    buf_block_get_page_zip(block),
4217 				    max_trx_id, mtr);
4218 	}
4219 }
4220 
4221 /*************************************************************//**
4222 Discards a page from a B-tree. This is used to remove the last record from
4223 a B-tree page: the whole page must be removed at the same time. This cannot
4224 be used for the root page, which is allowed to be empty. */
4225 void
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)4226 btr_discard_page(
4227 /*=============*/
4228 	btr_cur_t*	cursor,	/*!< in: cursor on the page to discard: not on
4229 				the root page */
4230 	mtr_t*		mtr)	/*!< in: mtr */
4231 {
4232 	dict_index_t*	index;
4233 	ulint		left_page_no;
4234 	ulint		right_page_no;
4235 	buf_block_t*	merge_block;
4236 	page_t*		merge_page;
4237 	buf_block_t*	block;
4238 	page_t*		page;
4239 	rec_t*		node_ptr;
4240 	btr_cur_t	parent_cursor;
4241 
4242 	block = btr_cur_get_block(cursor);
4243 	index = btr_cur_get_index(cursor);
4244 
4245 	ut_ad(dict_index_get_page(index) != block->page.id.page_no());
4246 
4247 	ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
4248 					MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
4249 
4250 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4251 
4252 	MONITOR_INC(MONITOR_INDEX_DISCARD);
4253 
4254 	if (dict_index_is_spatial(index)) {
4255 		rtr_page_get_father(index, block, mtr, cursor, &parent_cursor);
4256 	} else {
4257 		btr_page_get_father(index, block, mtr, &parent_cursor);
4258 	}
4259 
4260 	/* Decide the page which will inherit the locks */
4261 
4262 	left_page_no = btr_page_get_prev(buf_block_get_frame(block));
4263 	right_page_no = btr_page_get_next(buf_block_get_frame(block));
4264 
4265 	const ulint zip_size = index->table->space->zip_size();
4266 	ut_d(bool parent_is_different = false);
4267 	if (left_page_no != FIL_NULL) {
4268 		merge_block = btr_block_get(
4269 			page_id_t(index->table->space_id, left_page_no),
4270 			zip_size, RW_X_LATCH, index, mtr);
4271 
4272 		merge_page = buf_block_get_frame(merge_block);
4273 #ifdef UNIV_BTR_DEBUG
4274 		ut_a(btr_page_get_next(merge_page)
4275 		     == block->page.id.page_no());
4276 #endif /* UNIV_BTR_DEBUG */
4277 		ut_d(parent_is_different =
4278 			(page_rec_get_next(
4279 				page_get_infimum_rec(
4280 					btr_cur_get_page(
4281 						&parent_cursor)))
4282 			 == btr_cur_get_rec(&parent_cursor)));
4283 	} else if (right_page_no != FIL_NULL) {
4284 		merge_block = btr_block_get(
4285 			page_id_t(index->table->space_id, right_page_no),
4286 			zip_size, RW_X_LATCH, index, mtr);
4287 
4288 		merge_page = buf_block_get_frame(merge_block);
4289 #ifdef UNIV_BTR_DEBUG
4290 		ut_a(btr_page_get_prev(merge_page)
4291 		     == block->page.id.page_no());
4292 #endif /* UNIV_BTR_DEBUG */
4293 		ut_d(parent_is_different = page_rec_is_supremum(
4294 			page_rec_get_next(btr_cur_get_rec(&parent_cursor))));
4295 	} else {
4296 		btr_discard_only_page_on_level(index, block, mtr);
4297 
4298 		return;
4299 	}
4300 
4301 	page = buf_block_get_frame(block);
4302 	ut_a(page_is_comp(merge_page) == page_is_comp(page));
4303 	btr_search_drop_page_hash_index(block);
4304 
4305 	if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
4306 
4307 		/* We have to mark the leftmost node pointer on the right
4308 		side page as the predefined minimum record */
4309 		node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));
4310 
4311 		ut_ad(page_rec_is_user_rec(node_ptr));
4312 
4313 		/* This will make page_zip_validate() fail on merge_page
4314 		until btr_level_list_remove() completes.  This is harmless,
4315 		because everything will take place within a single
4316 		mini-transaction and because writing to the redo log
4317 		is an atomic operation (performed by mtr_commit()). */
4318 		btr_set_min_rec_mark(node_ptr, mtr);
4319 	}
4320 
4321 	if (dict_index_is_spatial(index)) {
4322 		rtr_node_ptr_delete(&parent_cursor, mtr);
4323 	} else {
4324 		btr_cur_node_ptr_delete(&parent_cursor, mtr);
4325 	}
4326 
4327 	/* Remove the page from the level list */
4328 	ut_a(DB_SUCCESS == btr_level_list_remove(index->table->space_id,
4329 						 zip_size, page, index, mtr));
4330 
4331 #ifdef UNIV_ZIP_DEBUG
4332 	{
4333 		page_zip_des_t*	merge_page_zip
4334 			= buf_block_get_page_zip(merge_block);
4335 		ut_a(!merge_page_zip
4336 		     || page_zip_validate(merge_page_zip, merge_page, index));
4337 	}
4338 #endif /* UNIV_ZIP_DEBUG */
4339 
4340 	if (!dict_table_is_locking_disabled(index->table)) {
4341 		if (left_page_no != FIL_NULL) {
4342 			lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
4343 					    block);
4344 		} else {
4345 			lock_update_discard(merge_block,
4346 					    lock_get_min_heap_no(merge_block),
4347 					    block);
4348 		}
4349 	}
4350 
4351 	if (dict_index_is_spatial(index)) {
4352 		rtr_check_discard_page(index, cursor, block);
4353 	}
4354 
4355 	/* Free the file page */
4356 	btr_page_free(index, block, mtr);
4357 
4358 	/* btr_check_node_ptr() needs parent block latched.
4359 	If the merge_block's parent block is not same,
4360 	we cannot use btr_check_node_ptr() */
4361 	ut_ad(parent_is_different
4362 	      || btr_check_node_ptr(index, merge_block, mtr));
4363 
4364 	if (btr_cur_get_block(&parent_cursor)->page.id.page_no() == index->page
4365 	    && !page_has_siblings(btr_cur_get_page(&parent_cursor))
4366 	    && page_get_n_recs(btr_cur_get_page(&parent_cursor)) == 1) {
4367 		btr_lift_page_up(index, merge_block, mtr);
4368 	}
4369 }
4370 
4371 #ifdef UNIV_BTR_PRINT
4372 /*************************************************************//**
4373 Prints size info of a B-tree. */
4374 void
btr_print_size(dict_index_t * index)4375 btr_print_size(
4376 /*===========*/
4377 	dict_index_t*	index)	/*!< in: index tree */
4378 {
4379 	page_t*		root;
4380 	fseg_header_t*	seg;
4381 	mtr_t		mtr;
4382 
4383 	if (dict_index_is_ibuf(index)) {
4384 		fputs("Sorry, cannot print info of an ibuf tree:"
4385 		      " use ibuf functions\n", stderr);
4386 
4387 		return;
4388 	}
4389 
4390 	mtr_start(&mtr);
4391 
4392 	root = btr_root_get(index, &mtr);
4393 
4394 	seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4395 
4396 	fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
4397 	fseg_print(seg, &mtr);
4398 
4399 	if (!dict_index_is_ibuf(index)) {
4400 
4401 		seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
4402 
4403 		fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
4404 		fseg_print(seg, &mtr);
4405 	}
4406 
4407 	mtr_commit(&mtr);
4408 }
4409 
4410 /************************************************************//**
4411 Prints recursively index tree pages. */
4412 static
4413 void
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,rec_offs ** offsets,mtr_t * mtr)4414 btr_print_recursive(
4415 /*================*/
4416 	dict_index_t*	index,	/*!< in: index tree */
4417 	buf_block_t*	block,	/*!< in: index page */
4418 	ulint		width,	/*!< in: print this many entries from start
4419 				and end */
4420 	mem_heap_t**	heap,	/*!< in/out: heap for rec_get_offsets() */
4421 	rec_offs**	offsets,/*!< in/out: buffer for rec_get_offsets() */
4422 	mtr_t*		mtr)	/*!< in: mtr */
4423 {
4424 	const page_t*	page	= buf_block_get_frame(block);
4425 	page_cur_t	cursor;
4426 	ulint		n_recs;
4427 	ulint		i	= 0;
4428 	mtr_t		mtr2;
4429 
4430 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_SX_FIX));
4431 
4432 	ib::info() << "NODE ON LEVEL " << btr_page_get_level(page)
4433 		<< " page " << block->page.id;
4434 
4435 	page_print(block, index, width, width);
4436 
4437 	n_recs = page_get_n_recs(page);
4438 
4439 	page_cur_set_before_first(block, &cursor);
4440 	page_cur_move_to_next(&cursor);
4441 
4442 	while (!page_cur_is_after_last(&cursor)) {
4443 
4444 		if (page_is_leaf(page)) {
4445 
4446 			/* If this is the leaf level, do nothing */
4447 
4448 		} else if ((i <= width) || (i >= n_recs - width)) {
4449 
4450 			const rec_t*	node_ptr;
4451 
4452 			mtr_start(&mtr2);
4453 
4454 			node_ptr = page_cur_get_rec(&cursor);
4455 
4456 			*offsets = rec_get_offsets(
4457 				node_ptr, index, *offsets, 0,
4458 				ULINT_UNDEFINED, heap);
4459 			btr_print_recursive(index,
4460 					    btr_node_ptr_get_child(node_ptr,
4461 								   index,
4462 								   *offsets,
4463 								   &mtr2),
4464 					    width, heap, offsets, &mtr2);
4465 			mtr_commit(&mtr2);
4466 		}
4467 
4468 		page_cur_move_to_next(&cursor);
4469 		i++;
4470 	}
4471 }
4472 
4473 /**************************************************************//**
4474 Prints directories and other info of all nodes in the tree. */
4475 void
btr_print_index(dict_index_t * index,ulint width)4476 btr_print_index(
4477 /*============*/
4478 	dict_index_t*	index,	/*!< in: index */
4479 	ulint		width)	/*!< in: print this many entries from start
4480 				and end */
4481 {
4482 	mtr_t		mtr;
4483 	buf_block_t*	root;
4484 	mem_heap_t*	heap	= NULL;
4485 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
4486 	rec_offs*	offsets	= offsets_;
4487 	rec_offs_init(offsets_);
4488 
4489 	fputs("--------------------------\n"
4490 	      "INDEX TREE PRINT\n", stderr);
4491 
4492 	mtr_start(&mtr);
4493 
4494 	root = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4495 
4496 	btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
4497 	if (heap) {
4498 		mem_heap_free(heap);
4499 	}
4500 
4501 	mtr_commit(&mtr);
4502 
4503 	ut_ad(btr_validate_index(index, 0));
4504 }
4505 #endif /* UNIV_BTR_PRINT */
4506 
4507 #ifdef UNIV_DEBUG
4508 /************************************************************//**
4509 Checks that the node pointer to a page is appropriate.
4510 @return TRUE */
4511 ibool
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4512 btr_check_node_ptr(
4513 /*===============*/
4514 	dict_index_t*	index,	/*!< in: index tree */
4515 	buf_block_t*	block,	/*!< in: index page */
4516 	mtr_t*		mtr)	/*!< in: mtr */
4517 {
4518 	mem_heap_t*	heap;
4519 	dtuple_t*	tuple;
4520 	rec_offs*	offsets;
4521 	btr_cur_t	cursor;
4522 	page_t*		page = buf_block_get_frame(block);
4523 
4524 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4525 
4526 	if (dict_index_get_page(index) == block->page.id.page_no()) {
4527 
4528 		return(TRUE);
4529 	}
4530 
4531 	heap = mem_heap_create(256);
4532 
4533 	if (dict_index_is_spatial(index)) {
4534 		offsets = rtr_page_get_father_block(NULL, heap, index, block, mtr,
4535 						    NULL, &cursor);
4536 	} else {
4537 		offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
4538 						    &cursor);
4539 	}
4540 
4541 	if (page_is_leaf(page)) {
4542 
4543 		goto func_exit;
4544 	}
4545 
4546 	tuple = dict_index_build_node_ptr(
4547 		index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
4548 		btr_page_get_level(page));
4549 
4550 	/* For spatial index, the MBR in the parent rec could be different
4551 	with that of first rec of child, their relationship should be
4552 	"WITHIN" relationship */
4553 	if (dict_index_is_spatial(index)) {
4554 		ut_a(!cmp_dtuple_rec_with_gis(
4555 			tuple, btr_cur_get_rec(&cursor),
4556 			offsets, PAGE_CUR_WITHIN));
4557 	} else {
4558 		ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
4559 	}
4560 func_exit:
4561 	mem_heap_free(heap);
4562 
4563 	return(TRUE);
4564 }
4565 #endif /* UNIV_DEBUG */
4566 
4567 /************************************************************//**
4568 Display identification information for a record. */
4569 static
4570 void
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)4571 btr_index_rec_validate_report(
4572 /*==========================*/
4573 	const page_t*		page,	/*!< in: index page */
4574 	const rec_t*		rec,	/*!< in: index record */
4575 	const dict_index_t*	index)	/*!< in: index */
4576 {
4577 	ib::info() << "Record in index " << index->name
4578 		<< " of table " << index->table->name
4579 		<< ", page " << page_id_t(page_get_space_id(page),
4580 					  page_get_page_no(page))
4581 		<< ", at offset " << page_offset(rec);
4582 }
4583 
4584 /************************************************************//**
4585 Checks the size and number of fields in a record based on the definition of
4586 the index.
4587 @return TRUE if ok */
4588 ibool
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)4589 btr_index_rec_validate(
4590 /*===================*/
4591 	const rec_t*		rec,		/*!< in: index record */
4592 	const dict_index_t*	index,		/*!< in: index */
4593 	ibool			dump_on_error)	/*!< in: TRUE if the function
4594 						should print hex dump of record
4595 						and page on error */
4596 {
4597 	ulint		len;
4598 	const page_t*	page;
4599 	mem_heap_t*	heap	= NULL;
4600 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
4601 	rec_offs*	offsets	= offsets_;
4602 	rec_offs_init(offsets_);
4603 
4604 	page = page_align(rec);
4605 
4606 	ut_ad(index->n_core_fields);
4607 
4608 	if (index->is_ibuf()) {
4609 		/* The insert buffer index tree can contain records from any
4610 		other index: we cannot check the number of fields or
4611 		their length */
4612 
4613 		return(TRUE);
4614 	}
4615 
4616 #ifdef VIRTUAL_INDEX_DEBUG
4617 	if (dict_index_has_virtual(index)) {
4618 		fprintf(stderr, "index name is %s\n", index->name());
4619 	}
4620 #endif
4621 	if ((ibool)!!page_is_comp(page) != dict_table_is_comp(index->table)) {
4622 		btr_index_rec_validate_report(page, rec, index);
4623 
4624 		ib::error() << "Compact flag=" << !!page_is_comp(page)
4625 			<< ", should be " << dict_table_is_comp(index->table);
4626 
4627 		return(FALSE);
4628 	}
4629 
4630 	const bool is_alter_metadata = page_is_leaf(page)
4631 		&& !page_has_prev(page)
4632 		&& index->is_primary() && index->table->instant
4633 		&& rec == page_rec_get_next_const(page_get_infimum_rec(page));
4634 
4635 	if (is_alter_metadata
4636 	    && !rec_is_alter_metadata(rec, page_is_comp(page))) {
4637 		btr_index_rec_validate_report(page, rec, index);
4638 
4639 		ib::error() << "First record is not ALTER TABLE metadata";
4640 		return FALSE;
4641 	}
4642 
4643 	if (!page_is_comp(page)) {
4644 		const ulint n_rec_fields = rec_get_n_fields_old(rec);
4645 		if (n_rec_fields == DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD
4646 		    && index->id == DICT_INDEXES_ID) {
4647 			/* A record for older SYS_INDEXES table
4648 			(missing merge_threshold column) is acceptable. */
4649 		} else if (is_alter_metadata) {
4650 			if (n_rec_fields != ulint(index->n_fields) + 1) {
4651 				goto n_field_mismatch;
4652 			}
4653 		} else if (n_rec_fields < index->n_core_fields
4654 			   || n_rec_fields > index->n_fields) {
4655 n_field_mismatch:
4656 			btr_index_rec_validate_report(page, rec, index);
4657 
4658 			ib::error() << "Has " << rec_get_n_fields_old(rec)
4659 				    << " fields, should have "
4660 				    << index->n_core_fields << ".."
4661 				    << index->n_fields;
4662 
4663 			if (dump_on_error) {
4664 				fputs("InnoDB: corrupt record ", stderr);
4665 				rec_print_old(stderr, rec);
4666 				putc('\n', stderr);
4667 			}
4668 			return(FALSE);
4669 		}
4670 	}
4671 
4672 	offsets = rec_get_offsets(rec, index, offsets, page_is_leaf(page)
4673 				  ? index->n_core_fields : 0,
4674 				  ULINT_UNDEFINED, &heap);
4675 	const dict_field_t* field = index->fields;
4676 	ut_ad(rec_offs_n_fields(offsets)
4677 	      == ulint(index->n_fields) + is_alter_metadata);
4678 
4679 	for (unsigned i = 0; i < rec_offs_n_fields(offsets); i++) {
4680 		rec_get_nth_field_offs(offsets, i, &len);
4681 
4682 		ulint fixed_size;
4683 
4684 		if (is_alter_metadata && i == index->first_user_field()) {
4685 			fixed_size = FIELD_REF_SIZE;
4686 			if (len != FIELD_REF_SIZE
4687 			    || !rec_offs_nth_extern(offsets, i)) {
4688 				goto len_mismatch;
4689 			}
4690 
4691 			continue;
4692 		} else {
4693 			fixed_size = dict_col_get_fixed_size(
4694 				field->col, page_is_comp(page));
4695 			if (rec_offs_nth_extern(offsets, i)) {
4696 				const byte* data = rec_get_nth_field(
4697 					rec, offsets, i, &len);
4698 				len -= BTR_EXTERN_FIELD_REF_SIZE;
4699 				ulint extern_len = mach_read_from_4(
4700 					data + len + BTR_EXTERN_LEN + 4);
4701 				if (fixed_size == extern_len) {
4702 					goto next_field;
4703 				}
4704 			}
4705 		}
4706 
4707 		/* Note that if fixed_size != 0, it equals the
4708 		length of a fixed-size column in the clustered index.
4709 		We should adjust it here.
4710 		A prefix index of the column is of fixed, but different
4711 		length.  When fixed_size == 0, prefix_len is the maximum
4712 		length of the prefix index column. */
4713 
4714 		if (len_is_stored(len)
4715 		    && (field->prefix_len
4716 			? len > field->prefix_len
4717 			: (fixed_size && len != fixed_size))) {
4718 len_mismatch:
4719 			btr_index_rec_validate_report(page, rec, index);
4720 			ib::error	error;
4721 
4722 			error << "Field " << i << " len is " << len
4723 				<< ", should be " << fixed_size;
4724 
4725 			if (dump_on_error) {
4726 				error << "; ";
4727 				rec_print(error.m_oss, rec,
4728 					  rec_get_info_bits(
4729 						  rec, rec_offs_comp(offsets)),
4730 					  offsets);
4731 			}
4732 			if (heap) {
4733 				mem_heap_free(heap);
4734 			}
4735 			return(FALSE);
4736 		}
4737 next_field:
4738 		field++;
4739 	}
4740 
4741 #ifdef VIRTUAL_INDEX_DEBUG
4742 	if (dict_index_has_virtual(index)) {
4743 		rec_print_new(stderr, rec, offsets);
4744 	}
4745 #endif
4746 
4747 	if (heap) {
4748 		mem_heap_free(heap);
4749 	}
4750 	return(TRUE);
4751 }
4752 
4753 /************************************************************//**
4754 Checks the size and number of fields in records based on the definition of
4755 the index.
4756 @return TRUE if ok */
4757 static
4758 ibool
btr_index_page_validate(buf_block_t * block,dict_index_t * index)4759 btr_index_page_validate(
4760 /*====================*/
4761 	buf_block_t*	block,	/*!< in: index page */
4762 	dict_index_t*	index)	/*!< in: index */
4763 {
4764 	page_cur_t	cur;
4765 	ibool		ret	= TRUE;
4766 #ifndef DBUG_OFF
4767 	ulint		nth	= 1;
4768 #endif /* !DBUG_OFF */
4769 
4770 	page_cur_set_before_first(block, &cur);
4771 
4772 	/* Directory slot 0 should only contain the infimum record. */
4773 	DBUG_EXECUTE_IF("check_table_rec_next",
4774 			ut_a(page_rec_get_nth_const(
4775 				     page_cur_get_page(&cur), 0)
4776 			     == cur.rec);
4777 			ut_a(page_dir_slot_get_n_owned(
4778 				     page_dir_get_nth_slot(
4779 					     page_cur_get_page(&cur), 0))
4780 			     == 1););
4781 
4782 	page_cur_move_to_next(&cur);
4783 
4784 	for (;;) {
4785 		if (page_cur_is_after_last(&cur)) {
4786 
4787 			break;
4788 		}
4789 
4790 		if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4791 
4792 			return(FALSE);
4793 		}
4794 
4795 		/* Verify that page_rec_get_nth_const() is correctly
4796 		retrieving each record. */
4797 		DBUG_EXECUTE_IF("check_table_rec_next",
4798 				ut_a(cur.rec == page_rec_get_nth_const(
4799 					     page_cur_get_page(&cur),
4800 					     page_rec_get_n_recs_before(
4801 						     cur.rec)));
4802 				ut_a(nth++ == page_rec_get_n_recs_before(
4803 					     cur.rec)););
4804 
4805 		page_cur_move_to_next(&cur);
4806 	}
4807 
4808 	return(ret);
4809 }
4810 
4811 /************************************************************//**
4812 Report an error on one page of an index tree. */
4813 static
4814 void
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4815 btr_validate_report1(
4816 /*=================*/
4817 	dict_index_t*		index,	/*!< in: index */
4818 	ulint			level,	/*!< in: B-tree level */
4819 	const buf_block_t*	block)	/*!< in: index page */
4820 {
4821 	ib::error	error;
4822 	error << "In page " << block->page.id.page_no()
4823 		<< " of index " << index->name
4824 		<< " of table " << index->table->name;
4825 
4826 	if (level > 0) {
4827 		error << ", index tree level " << level;
4828 	}
4829 }
4830 
4831 /************************************************************//**
4832 Report an error on two pages of an index tree. */
4833 static
4834 void
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4835 btr_validate_report2(
4836 /*=================*/
4837 	const dict_index_t*	index,	/*!< in: index */
4838 	ulint			level,	/*!< in: B-tree level */
4839 	const buf_block_t*	block1,	/*!< in: first index page */
4840 	const buf_block_t*	block2)	/*!< in: second index page */
4841 {
4842 	ib::error	error;
4843 	error << "In pages " << block1->page.id
4844 		<< " and " << block2->page.id << " of index " << index->name
4845 		<< " of table " << index->table->name;
4846 
4847 	if (level > 0) {
4848 		error << ", index tree level " << level;
4849 	}
4850 }
4851 
4852 /************************************************************//**
4853 Validates index tree level.
4854 @return TRUE if ok */
4855 static
4856 bool
btr_validate_level(dict_index_t * index,const trx_t * trx,ulint level,bool lockout)4857 btr_validate_level(
4858 /*===============*/
4859 	dict_index_t*	index,	/*!< in: index tree */
4860 	const trx_t*	trx,	/*!< in: transaction or NULL */
4861 	ulint		level,	/*!< in: level number */
4862 	bool		lockout)/*!< in: true if X-latch index is intended */
4863 {
4864 	buf_block_t*	block;
4865 	page_t*		page;
4866 	buf_block_t*	right_block = 0; /* remove warning */
4867 	page_t*		right_page = 0; /* remove warning */
4868 	page_t*		father_page;
4869 	btr_cur_t	node_cur;
4870 	btr_cur_t	right_node_cur;
4871 	rec_t*		rec;
4872 	ulint		right_page_no;
4873 	ulint		left_page_no;
4874 	page_cur_t	cursor;
4875 	dtuple_t*	node_ptr_tuple;
4876 	bool		ret	= true;
4877 	mtr_t		mtr;
4878 	mem_heap_t*	heap	= mem_heap_create(256);
4879 	rec_offs*	offsets	= NULL;
4880 	rec_offs*	offsets2= NULL;
4881 #ifdef UNIV_ZIP_DEBUG
4882 	page_zip_des_t*	page_zip;
4883 #endif /* UNIV_ZIP_DEBUG */
4884 	ulint		savepoint = 0;
4885 	ulint		savepoint2 = 0;
4886 	ulint		parent_page_no = FIL_NULL;
4887 	ulint		parent_right_page_no = FIL_NULL;
4888 	bool		rightmost_child = false;
4889 
4890 	mtr.start();
4891 
4892 	if (!srv_read_only_mode) {
4893 		if (lockout) {
4894 			mtr_x_lock_index(index, &mtr);
4895 		} else {
4896 			mtr_sx_lock_index(index, &mtr);
4897 		}
4898 	}
4899 
4900 	block = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4901 	page = buf_block_get_frame(block);
4902 
4903 	fil_space_t*		space	= index->table->space;
4904 	const ulint		zip_size = space->zip_size();
4905 
4906 	while (level != btr_page_get_level(page)) {
4907 		const rec_t*	node_ptr;
4908 
4909 		if (fseg_page_is_free(space, block->page.id.page_no())) {
4910 
4911 			btr_validate_report1(index, level, block);
4912 
4913 			ib::warn() << "Page is free";
4914 
4915 			ret = false;
4916 		}
4917 
4918 		ut_a(index->table->space_id == block->page.id.space());
4919 		ut_a(block->page.id.space() == page_get_space_id(page));
4920 #ifdef UNIV_ZIP_DEBUG
4921 		page_zip = buf_block_get_page_zip(block);
4922 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4923 #endif /* UNIV_ZIP_DEBUG */
4924 		ut_a(!page_is_leaf(page));
4925 
4926 		page_cur_set_before_first(block, &cursor);
4927 		page_cur_move_to_next(&cursor);
4928 
4929 		node_ptr = page_cur_get_rec(&cursor);
4930 		offsets = rec_get_offsets(node_ptr, index, offsets, 0,
4931 					  ULINT_UNDEFINED, &heap);
4932 
4933 		savepoint2 = mtr_set_savepoint(&mtr);
4934 		block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4935 		page = buf_block_get_frame(block);
4936 
4937 		/* For R-Tree, since record order might not be the same as
4938 		linked index page in the lower level, we need to travers
4939 		backwards to get the first page rec in this level.
4940 		This is only used for index validation. Spatial index
4941 		does not use such scan for any of its DML or query
4942 		operations  */
4943 		if (dict_index_is_spatial(index)) {
4944 			left_page_no = btr_page_get_prev(page);
4945 
4946 			while (left_page_no != FIL_NULL) {
4947 				/* To obey latch order of tree blocks,
4948 				we should release the right_block once to
4949 				obtain lock of the uncle block. */
4950 				mtr_release_block_at_savepoint(
4951 					&mtr, savepoint2, block);
4952 
4953 				savepoint2 = mtr_set_savepoint(&mtr);
4954 				block = btr_block_get(
4955 					page_id_t(index->table->space_id,
4956 						  left_page_no),
4957 					zip_size,
4958 					RW_SX_LATCH, index, &mtr);
4959 				page = buf_block_get_frame(block);
4960 				left_page_no = btr_page_get_prev(page);
4961 			}
4962 		}
4963 	}
4964 
4965 	/* Now we are on the desired level. Loop through the pages on that
4966 	level. */
4967 
4968 loop:
4969 	mem_heap_empty(heap);
4970 	offsets = offsets2 = NULL;
4971 	if (!srv_read_only_mode) {
4972 		if (lockout) {
4973 			mtr_x_lock_index(index, &mtr);
4974 		} else {
4975 			mtr_sx_lock_index(index, &mtr);
4976 		}
4977 	}
4978 
4979 #ifdef UNIV_ZIP_DEBUG
4980 	page_zip = buf_block_get_page_zip(block);
4981 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4982 #endif /* UNIV_ZIP_DEBUG */
4983 
4984 	ut_a(block->page.id.space() == index->table->space_id);
4985 
4986 	if (fseg_page_is_free(space, block->page.id.page_no())) {
4987 
4988 		btr_validate_report1(index, level, block);
4989 
4990 		ib::warn() << "Page is marked as free";
4991 		ret = false;
4992 
4993 	} else if (btr_page_get_index_id(page) != index->id) {
4994 
4995 		ib::error() << "Page index id " << btr_page_get_index_id(page)
4996 			<< " != data dictionary index id " << index->id;
4997 
4998 		ret = false;
4999 
5000 	} else if (!page_validate(page, index)) {
5001 
5002 		btr_validate_report1(index, level, block);
5003 		ret = false;
5004 
5005 	} else if (level == 0 && !btr_index_page_validate(block, index)) {
5006 
5007 		/* We are on level 0. Check that the records have the right
5008 		number of fields, and field lengths are right. */
5009 
5010 		ret = false;
5011 	}
5012 
5013 	ut_a(btr_page_get_level(page) == level);
5014 
5015 	right_page_no = btr_page_get_next(page);
5016 	left_page_no = btr_page_get_prev(page);
5017 
5018 	ut_a(!page_is_empty(page)
5019 	     || (level == 0
5020 		 && page_get_page_no(page) == dict_index_get_page(index)));
5021 
5022 	if (right_page_no != FIL_NULL) {
5023 		const rec_t*	right_rec;
5024 		savepoint = mtr_set_savepoint(&mtr);
5025 
5026 		right_block = btr_block_get(
5027 			page_id_t(index->table->space_id, right_page_no),
5028 			zip_size,
5029 			RW_SX_LATCH, index, &mtr);
5030 
5031 		right_page = buf_block_get_frame(right_block);
5032 
5033 		if (btr_page_get_prev(right_page) != page_get_page_no(page)) {
5034 			btr_validate_report2(index, level, block, right_block);
5035 			fputs("InnoDB: broken FIL_PAGE_NEXT"
5036 			      " or FIL_PAGE_PREV links\n", stderr);
5037 
5038 			ret = false;
5039 		}
5040 
5041 		if (page_is_comp(right_page) != page_is_comp(page)) {
5042 			btr_validate_report2(index, level, block, right_block);
5043 			fputs("InnoDB: 'compact' flag mismatch\n", stderr);
5044 
5045 			ret = false;
5046 
5047 			goto node_ptr_fails;
5048 		}
5049 
5050 		rec = page_rec_get_prev(page_get_supremum_rec(page));
5051 		right_rec = page_rec_get_next(page_get_infimum_rec(
5052 						      right_page));
5053 		offsets = rec_get_offsets(rec, index, offsets,
5054 					  page_is_leaf(page)
5055 					  ? index->n_core_fields : 0,
5056 					  ULINT_UNDEFINED, &heap);
5057 		offsets2 = rec_get_offsets(right_rec, index, offsets2,
5058 					   page_is_leaf(right_page)
5059 					   ? index->n_core_fields : 0,
5060 					   ULINT_UNDEFINED, &heap);
5061 
5062 		/* For spatial index, we cannot guarantee the key ordering
5063 		across pages, so skip the record compare verification for
5064 		now. Will enhanced in special R-Tree index validation scheme */
5065 		if (!dict_index_is_spatial(index)
5066 		    && cmp_rec_rec(rec, right_rec,
5067 				   offsets, offsets2, index) >= 0) {
5068 
5069 			btr_validate_report2(index, level, block, right_block);
5070 
5071 			fputs("InnoDB: records in wrong order"
5072 			      " on adjacent pages\n", stderr);
5073 
5074 			fputs("InnoDB: record ", stderr);
5075 			rec = page_rec_get_prev(page_get_supremum_rec(page));
5076 			rec_print(stderr, rec, index);
5077 			putc('\n', stderr);
5078 			fputs("InnoDB: record ", stderr);
5079 			rec = page_rec_get_next(
5080 				page_get_infimum_rec(right_page));
5081 			rec_print(stderr, rec, index);
5082 			putc('\n', stderr);
5083 
5084 			ret = false;
5085 		}
5086 	}
5087 
5088 	if (level > 0 && left_page_no == FIL_NULL) {
5089 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
5090 			     page_rec_get_next(page_get_infimum_rec(page)),
5091 			     page_is_comp(page)));
5092 	}
5093 
5094 	/* Similarly skip the father node check for spatial index for now,
5095 	for a couple of reasons:
5096 	1) As mentioned, there is no ordering relationship between records
5097 	in parent level and linked pages in the child level.
5098 	2) Search parent from root is very costly for R-tree.
5099 	We will add special validation mechanism for R-tree later (WL #7520) */
5100 	if (!dict_index_is_spatial(index)
5101 	    && block->page.id.page_no() != dict_index_get_page(index)) {
5102 
5103 		/* Check father node pointers */
5104 		rec_t*	node_ptr;
5105 
5106 		btr_cur_position(
5107 			index, page_rec_get_next(page_get_infimum_rec(page)),
5108 			block, &node_cur);
5109 		offsets = btr_page_get_father_node_ptr_for_validate(
5110 			offsets, heap, &node_cur, &mtr);
5111 
5112 		father_page = btr_cur_get_page(&node_cur);
5113 		node_ptr = btr_cur_get_rec(&node_cur);
5114 
5115 		parent_page_no = page_get_page_no(father_page);
5116 		parent_right_page_no = btr_page_get_next(father_page);
5117 		rightmost_child = page_rec_is_supremum(
5118 					page_rec_get_next(node_ptr));
5119 
5120 		btr_cur_position(
5121 			index,
5122 			page_rec_get_prev(page_get_supremum_rec(page)),
5123 			block, &node_cur);
5124 
5125 		offsets = btr_page_get_father_node_ptr_for_validate(
5126 				offsets, heap, &node_cur, &mtr);
5127 
5128 		if (node_ptr != btr_cur_get_rec(&node_cur)
5129 		    || btr_node_ptr_get_child_page_no(node_ptr, offsets)
5130 				     != block->page.id.page_no()) {
5131 
5132 			btr_validate_report1(index, level, block);
5133 
5134 			fputs("InnoDB: node pointer to the page is wrong\n",
5135 			      stderr);
5136 
5137 			fputs("InnoDB: node ptr ", stderr);
5138 			rec_print(stderr, node_ptr, index);
5139 
5140 			rec = btr_cur_get_rec(&node_cur);
5141 			fprintf(stderr, "\n"
5142 				"InnoDB: node ptr child page n:o "
5143 				ULINTPF "\n",
5144 				btr_node_ptr_get_child_page_no(rec, offsets));
5145 
5146 			fputs("InnoDB: record on page ", stderr);
5147 			rec_print_new(stderr, rec, offsets);
5148 			putc('\n', stderr);
5149 			ret = false;
5150 
5151 			goto node_ptr_fails;
5152 		}
5153 
5154 		if (!page_is_leaf(page)) {
5155 			node_ptr_tuple = dict_index_build_node_ptr(
5156 				index,
5157 				page_rec_get_next(page_get_infimum_rec(page)),
5158 				0, heap, btr_page_get_level(page));
5159 
5160 			if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
5161 					   offsets)) {
5162 				const rec_t* first_rec = page_rec_get_next(
5163 					page_get_infimum_rec(page));
5164 
5165 				btr_validate_report1(index, level, block);
5166 
5167 				ib::error() << "Node ptrs differ on levels > 0";
5168 
5169 				fputs("InnoDB: node ptr ",stderr);
5170 				rec_print_new(stderr, node_ptr, offsets);
5171 				fputs("InnoDB: first rec ", stderr);
5172 				rec_print(stderr, first_rec, index);
5173 				putc('\n', stderr);
5174 				ret = false;
5175 
5176 				goto node_ptr_fails;
5177 			}
5178 		}
5179 
5180 		if (left_page_no == FIL_NULL) {
5181 			ut_a(node_ptr == page_rec_get_next(
5182 				     page_get_infimum_rec(father_page)));
5183 			ut_a(!page_has_prev(father_page));
5184 		}
5185 
5186 		if (right_page_no == FIL_NULL) {
5187 			ut_a(node_ptr == page_rec_get_prev(
5188 				     page_get_supremum_rec(father_page)));
5189 			ut_a(!page_has_next(father_page));
5190 		} else {
5191 			const rec_t*	right_node_ptr;
5192 
5193 			right_node_ptr = page_rec_get_next(node_ptr);
5194 
5195 			if (!lockout && rightmost_child) {
5196 
5197 				/* To obey latch order of tree blocks,
5198 				we should release the right_block once to
5199 				obtain lock of the uncle block. */
5200 				mtr_release_block_at_savepoint(
5201 					&mtr, savepoint, right_block);
5202 
5203 				if (parent_right_page_no != FIL_NULL) {
5204 					btr_block_get(
5205 						page_id_t(index->table
5206 							  ->space_id,
5207 							  parent_right_page_no),
5208 						zip_size,
5209 						RW_SX_LATCH, index, &mtr);
5210 				}
5211 
5212 				right_block = btr_block_get(
5213 					page_id_t(index->table->space_id,
5214 						  right_page_no),
5215 					zip_size,
5216 					RW_SX_LATCH, index, &mtr);
5217 			}
5218 
5219 			btr_cur_position(
5220 				index, page_rec_get_next(
5221 					page_get_infimum_rec(
5222 						buf_block_get_frame(
5223 							right_block))),
5224 				right_block, &right_node_cur);
5225 
5226 			offsets = btr_page_get_father_node_ptr_for_validate(
5227 					offsets, heap, &right_node_cur, &mtr);
5228 
5229 			if (right_node_ptr
5230 			    != page_get_supremum_rec(father_page)) {
5231 
5232 				if (btr_cur_get_rec(&right_node_cur)
5233 				    != right_node_ptr) {
5234 					ret = false;
5235 					fputs("InnoDB: node pointer to"
5236 					      " the right page is wrong\n",
5237 					      stderr);
5238 
5239 					btr_validate_report1(index, level,
5240 							     block);
5241 				}
5242 			} else {
5243 				page_t*	right_father_page
5244 					= btr_cur_get_page(&right_node_cur);
5245 
5246 				if (btr_cur_get_rec(&right_node_cur)
5247 				    != page_rec_get_next(
5248 					    page_get_infimum_rec(
5249 						    right_father_page))) {
5250 					ret = false;
5251 					fputs("InnoDB: node pointer 2 to"
5252 					      " the right page is wrong\n",
5253 					      stderr);
5254 
5255 					btr_validate_report1(index, level,
5256 							     block);
5257 				}
5258 
5259 				if (page_get_page_no(right_father_page)
5260 				    != btr_page_get_next(father_page)) {
5261 
5262 					ret = false;
5263 					fputs("InnoDB: node pointer 3 to"
5264 					      " the right page is wrong\n",
5265 					      stderr);
5266 
5267 					btr_validate_report1(index, level,
5268 							     block);
5269 				}
5270 			}
5271 		}
5272 	}
5273 
5274 node_ptr_fails:
5275 	/* Commit the mini-transaction to release the latch on 'page'.
5276 	Re-acquire the latch on right_page, which will become 'page'
5277 	on the next loop.  The page has already been checked. */
5278 	mtr.commit();
5279 
5280 	if (trx_is_interrupted(trx)) {
5281 		/* On interrupt, return the current status. */
5282 	} else if (right_page_no != FIL_NULL) {
5283 
5284 		mtr.start();
5285 
5286 		if (!lockout) {
5287 			if (rightmost_child) {
5288 				if (parent_right_page_no != FIL_NULL) {
5289 					btr_block_get(
5290 						page_id_t(
5291 							index->table->space_id,
5292 							parent_right_page_no),
5293 						zip_size,
5294 						RW_SX_LATCH, index, &mtr);
5295 				}
5296 			} else if (parent_page_no != FIL_NULL) {
5297 				btr_block_get(
5298 					page_id_t(index->table->space_id,
5299 						  parent_page_no),
5300 					zip_size,
5301 					RW_SX_LATCH, index, &mtr);
5302 			}
5303 		}
5304 
5305 		block = btr_block_get(
5306 			page_id_t(index->table->space_id, right_page_no),
5307 			zip_size,
5308 			RW_SX_LATCH, index, &mtr);
5309 
5310 		page = buf_block_get_frame(block);
5311 
5312 		goto loop;
5313 	}
5314 
5315 	mem_heap_free(heap);
5316 
5317 	return(ret);
5318 }
5319 
5320 /**************************************************************//**
5321 Checks the consistency of an index tree.
5322 @return	DB_SUCCESS if ok, error code if not */
5323 dberr_t
btr_validate_index(dict_index_t * index,const trx_t * trx)5324 btr_validate_index(
5325 /*===============*/
5326 	dict_index_t*	index,	/*!< in: index */
5327 	const trx_t*	trx)	/*!< in: transaction or NULL */
5328 {
5329 	dberr_t err = DB_SUCCESS;
5330 	bool lockout = dict_index_is_spatial(index);
5331 
5332 	/* Full Text index are implemented by auxiliary tables,
5333 	not the B-tree */
5334 	if (dict_index_is_online_ddl(index) || (index->type & DICT_FTS)) {
5335 		return(err);
5336 	}
5337 
5338 	mtr_t		mtr;
5339 
5340 	mtr_start(&mtr);
5341 
5342 	if (!srv_read_only_mode) {
5343 		if (lockout) {
5344 			mtr_x_lock_index(index, &mtr);
5345 		} else {
5346 			mtr_sx_lock_index(index, &mtr);
5347 		}
5348 	}
5349 
5350 	page_t*	root = btr_root_get(index, &mtr);
5351 
5352 	if (!root) {
5353 		mtr_commit(&mtr);
5354 		return DB_CORRUPTION;
5355 	}
5356 
5357 	ulint	n = btr_page_get_level(root);
5358 
5359 	btr_validate_index_running++;
5360 	for (ulint i = 0; i <= n; ++i) {
5361 
5362 		if (!btr_validate_level(index, trx, n - i, lockout)) {
5363 			err = DB_CORRUPTION;
5364 		}
5365 	}
5366 
5367 	mtr_commit(&mtr);
5368 	/* In theory we need release barrier here, so that
5369 	btr_validate_index_running decrement is guaranteed to
5370 	happen after latches are released.
5371 
5372 	Original code issued SEQ_CST on update and non-atomic
5373 	access on load. Which means it had broken synchronisation
5374 	as well. */
5375 	btr_validate_index_running--;
5376 
5377 	return(err);
5378 }
5379 
5380 /**************************************************************//**
5381 Checks if the page in the cursor can be merged with given page.
5382 If necessary, re-organize the merge_page.
5383 @return	true if possible to merge. */
5384 static
5385 bool
btr_can_merge_with_page(btr_cur_t * cursor,ulint page_no,buf_block_t ** merge_block,mtr_t * mtr)5386 btr_can_merge_with_page(
5387 /*====================*/
5388 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
5389 	ulint		page_no,	/*!< in: a sibling page */
5390 	buf_block_t**	merge_block,	/*!< out: the merge block */
5391 	mtr_t*		mtr)		/*!< in: mini-transaction */
5392 {
5393 	dict_index_t*	index;
5394 	page_t*		page;
5395 	ulint		n_recs;
5396 	ulint		data_size;
5397 	ulint		max_ins_size_reorg;
5398 	ulint		max_ins_size;
5399 	buf_block_t*	mblock;
5400 	page_t*		mpage;
5401 	DBUG_ENTER("btr_can_merge_with_page");
5402 
5403 	if (page_no == FIL_NULL) {
5404 		*merge_block = NULL;
5405 		DBUG_RETURN(false);
5406 	}
5407 
5408 	index = btr_cur_get_index(cursor);
5409 	page = btr_cur_get_page(cursor);
5410 
5411 	const page_id_t		page_id(index->table->space_id, page_no);
5412 	const ulint zip_size = index->table->space->zip_size();
5413 
5414 	mblock = btr_block_get(page_id, zip_size, RW_X_LATCH, index, mtr);
5415 	mpage = buf_block_get_frame(mblock);
5416 
5417 	n_recs = page_get_n_recs(page);
5418 	data_size = page_get_data_size(page);
5419 
5420 	max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
5421 		mpage, n_recs);
5422 
5423 	if (data_size > max_ins_size_reorg) {
5424 		goto error;
5425 	}
5426 
5427 	/* If compression padding tells us that merging will result in
5428 	too packed up page i.e.: which is likely to cause compression
5429 	failure then don't merge the pages. */
5430 	if (zip_size && page_is_leaf(mpage)
5431 	    && (page_get_data_size(mpage) + data_size
5432 		>= dict_index_zip_pad_optimal_page_size(index))) {
5433 
5434 		goto error;
5435 	}
5436 
5437 	max_ins_size = page_get_max_insert_size(mpage, n_recs);
5438 
5439 	if (data_size > max_ins_size) {
5440 
5441 		/* We have to reorganize mpage */
5442 
5443 		if (!btr_page_reorganize_block(
5444 			    false, page_zip_level, mblock, index, mtr)) {
5445 
5446 			goto error;
5447 		}
5448 
5449 		max_ins_size = page_get_max_insert_size(mpage, n_recs);
5450 
5451 		ut_ad(page_validate(mpage, index));
5452 		ut_ad(max_ins_size == max_ins_size_reorg);
5453 
5454 		if (data_size > max_ins_size) {
5455 
5456 			/* Add fault tolerance, though this should
5457 			never happen */
5458 
5459 			goto error;
5460 		}
5461 	}
5462 
5463 	*merge_block = mblock;
5464 	DBUG_RETURN(true);
5465 
5466 error:
5467 	*merge_block = NULL;
5468 	DBUG_RETURN(false);
5469 }
5470