1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2014, 2021, MariaDB Corporation.
6 
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10 
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 
19 *****************************************************************************/
20 
21 /**************************************************//**
22 @file btr/btr0btr.cc
23 The B-tree
24 
25 Created 6/2/1994 Heikki Tuuri
26 *******************************************************/
27 
28 #include "btr0btr.h"
29 
30 #include "page0page.h"
31 #include "page0zip.h"
32 #include "gis0rtree.h"
33 
34 #include "btr0cur.h"
35 #include "btr0sea.h"
36 #include "btr0pcur.h"
37 #include "btr0defragment.h"
38 #include "rem0cmp.h"
39 #include "lock0lock.h"
40 #include "ibuf0ibuf.h"
41 #include "trx0trx.h"
42 #include "srv0mon.h"
43 #include "gis0geo.h"
44 #include "dict0boot.h"
45 #include "row0sel.h" /* row_search_max_autoinc() */
46 
47 /**************************************************************//**
48 Checks if the page in the cursor can be merged with given page.
49 If necessary, re-organize the merge_page.
50 @return	true if possible to merge. */
51 static
52 bool
53 btr_can_merge_with_page(
54 /*====================*/
55 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
56 	ulint		page_no,	/*!< in: a sibling page */
57 	buf_block_t**	merge_block,	/*!< out: the merge block */
58 	mtr_t*		mtr);		/*!< in: mini-transaction */
59 
60 /**************************************************************//**
61 Report that an index page is corrupted. */
62 void
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)63 btr_corruption_report(
64 /*==================*/
65 	const buf_block_t*	block,	/*!< in: corrupted block */
66 	const dict_index_t*	index)	/*!< in: index tree */
67 {
68 	ib::error()
69 		<< "Flag mismatch in page " << block->page.id
70 		<< " index " << index->name
71 		<< " of table " << index->table->name;
72 }
73 
74 /*
75 Latching strategy of the InnoDB B-tree
76 --------------------------------------
77 
78 Node pointer page latches acquisition is protected by index->lock latch.
79 
80 Before MariaDB 10.2.2, all node pointer pages were protected by index->lock
81 either in S (shared) or X (exclusive) mode and block->lock was not acquired on
82 node pointer pages.
83 
84 After MariaDB 10.2.2, block->lock S-latch or X-latch is used to protect
85 node pointer pages and obtaiment of node pointer page latches is protected by
86 index->lock.
87 
88 (0) Definition: B-tree level.
89 
90 (0.1) The leaf pages of the B-tree are at level 0.
91 
92 (0.2) The parent of a page at level L has level L+1. (The level of the
93 root page is equal to the tree height.)
94 
95 (0.3) The B-tree lock (index->lock) is the parent of the root page and
96 has a level = tree height + 1.
97 
98 Index->lock has 3 possible locking modes:
99 
100 (1) S-latch:
101 
102 (1.1) All latches for pages must be obtained in descending order of tree level.
103 
104 (1.2) Before obtaining the first node pointer page latch at a given B-tree
105 level, parent latch must be held (at level +1 ).
106 
107 (1.3) If a node pointer page is already latched at the same level
108 we can only obtain latch to its right sibling page latch at the same level.
109 
110 (1.4) Release of the node pointer page latches must be done in
111 child-to-parent order. (Prevents deadlocks when obtained index->lock
112 in SX mode).
113 
114 (1.4.1) Level L node pointer page latch can be released only when
115 no latches at children level i.e. level < L are hold.
116 
117 (1.4.2) All latches from node pointer pages must be released so
118 that no latches are obtained between.
119 
120 (1.5) [implied by (1.1), (1.2)] Root page latch must be first node pointer
121 latch obtained.
122 
123 (2) SX-latch:
124 
125 In this case rules (1.2) and (1.3) from S-latch case are relaxed and
126 merged into (2.2) and rule (1.4) is removed. Thus, latch acquisition
127 can be skipped at some tree levels and latches can be obtained in
128 a less restricted order.
129 
130 (2.1) [identical to (1.1)]: All latches for pages must be obtained in descending
131 order of tree level.
132 
133 (2.2) When a node pointer latch at level L is obtained,
134 the left sibling page latch in the same level or some ancestor
135 page latch (at level > L) must be hold.
136 
137 (2.3) [implied by (2.1), (2.2)] The first node pointer page latch obtained can
138 be any node pointer page.
139 
140 (3) X-latch:
141 
142 Node pointer latches can be obtained in any order.
143 
144 NOTE: New rules after MariaDB 10.2.2 does not affect the latching rules of leaf pages:
145 
146 index->lock S-latch is needed in read for the node pointer traversal. When the leaf
147 level is reached, index-lock can be released (and with the MariaDB 10.2.2 changes, all
148 node pointer latches). Left to right index travelsal in leaf page level can be safely done
149 by obtaining right sibling leaf page latch and then releasing the old page latch.
150 
151 Single leaf page modifications (BTR_MODIFY_LEAF) are protected by index->lock
152 S-latch.
153 
154 B-tree operations involving page splits or merges (BTR_MODIFY_TREE) and page
155 allocations are protected by index->lock X-latch.
156 
157 Node pointers
158 -------------
159 Leaf pages of a B-tree contain the index records stored in the
160 tree. On levels n > 0 we store 'node pointers' to pages on level
161 n - 1. For each page there is exactly one node pointer stored:
162 thus the our tree is an ordinary B-tree, not a B-link tree.
163 
164 A node pointer contains a prefix P of an index record. The prefix
165 is long enough so that it determines an index record uniquely.
166 The file page number of the child page is added as the last
167 field. To the child page we can store node pointers or index records
168 which are >= P in the alphabetical order, but < P1 if there is
169 a next node pointer on the level, and P1 is its prefix.
170 
171 If a node pointer with a prefix P points to a non-leaf child,
172 then the leftmost record in the child must have the same
173 prefix P. If it points to a leaf node, the child is not required
174 to contain any record with a prefix equal to P. The leaf case
175 is decided this way to allow arbitrary deletions in a leaf node
176 without touching upper levels of the tree.
177 
178 We have predefined a special minimum record which we
179 define as the smallest record in any alphabetical order.
180 A minimum record is denoted by setting a bit in the record
181 header. A minimum record acts as the prefix of a node pointer
182 which points to a leftmost node on any level of the tree.
183 
184 File page allocation
185 --------------------
186 In the root node of a B-tree there are two file segment headers.
187 The leaf pages of a tree are allocated from one file segment, to
188 make them consecutive on disk if possible. From the other file segment
189 we allocate pages for the non-leaf levels of the tree.
190 */
191 
192 #ifdef UNIV_BTR_DEBUG
193 /**************************************************************//**
194 Checks a file segment header within a B-tree root page.
195 @return TRUE if valid */
196 static
197 ibool
btr_root_fseg_validate(const fseg_header_t * seg_header,ulint space)198 btr_root_fseg_validate(
199 /*===================*/
200 	const fseg_header_t*	seg_header,	/*!< in: segment header */
201 	ulint			space)		/*!< in: tablespace identifier */
202 {
203 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
204 
205 	ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
206 	ut_a(offset >= FIL_PAGE_DATA);
207 	ut_a(offset <= srv_page_size - FIL_PAGE_DATA_END);
208 	return(TRUE);
209 }
210 #endif /* UNIV_BTR_DEBUG */
211 
212 /**************************************************************//**
213 Gets the root node of a tree and x- or s-latches it.
214 @return root page, x- or s-latched */
215 buf_block_t*
btr_root_block_get(const dict_index_t * index,ulint mode,mtr_t * mtr)216 btr_root_block_get(
217 /*===============*/
218 	const dict_index_t*	index,	/*!< in: index tree */
219 	ulint			mode,	/*!< in: either RW_S_LATCH
220 					or RW_X_LATCH */
221 	mtr_t*			mtr)	/*!< in: mtr */
222 {
223 	if (!index->table || !index->table->space) {
224 		return NULL;
225 	}
226 
227 	buf_block_t*	block = btr_block_get(
228 		page_id_t(index->table->space_id, index->page),
229 		page_size_t(index->table->space->flags), mode,
230 		index, mtr);
231 
232 	if (!block) {
233 		index->table->file_unreadable = true;
234 
235 		ib_push_warning(
236 			static_cast<THD*>(NULL), DB_DECRYPTION_FAILED,
237 			"Table %s in file %s is encrypted but encryption service or"
238 			" used key_id is not available. "
239 			" Can't continue reading table.",
240 			index->table->name.m_name,
241 			UT_LIST_GET_FIRST(index->table->space->chain)->name);
242 
243 		return NULL;
244 	}
245 
246 	btr_assert_not_corrupted(block, index);
247 
248 #ifdef UNIV_BTR_DEBUG
249 	if (!dict_index_is_ibuf(index)) {
250 		const page_t*	root = buf_block_get_frame(block);
251 
252 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
253 					    + root, index->table->space_id));
254 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
255 					    + root, index->table->space_id));
256 	}
257 #endif /* UNIV_BTR_DEBUG */
258 
259 	return(block);
260 }
261 
262 /**************************************************************//**
263 Gets the root node of a tree and sx-latches it for segment access.
264 @return root page, sx-latched */
265 page_t*
btr_root_get(const dict_index_t * index,mtr_t * mtr)266 btr_root_get(
267 /*=========*/
268 	const dict_index_t*	index,	/*!< in: index tree */
269 	mtr_t*			mtr)	/*!< in: mtr */
270 {
271 	/* Intended to be used for segment list access.
272 	SX lock doesn't block reading user data by other threads.
273 	And block the segment list access by others.*/
274 	buf_block_t* root = btr_root_block_get(index, RW_SX_LATCH,
275 					       mtr);
276 	return(root ? buf_block_get_frame(root) : NULL);
277 }
278 
279 /**************************************************************//**
280 Gets the height of the B-tree (the level of the root, when the leaf
281 level is assumed to be 0). The caller must hold an S or X latch on
282 the index.
283 @return tree height (level of the root) */
284 ulint
btr_height_get(const dict_index_t * index,mtr_t * mtr)285 btr_height_get(
286 /*===========*/
287 	const dict_index_t*	index,	/*!< in: index tree */
288 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
289 {
290 	ulint		height=0;
291 	buf_block_t*	root_block;
292 
293 	ut_ad(srv_read_only_mode
294 	      || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
295 					   MTR_MEMO_S_LOCK
296 					   | MTR_MEMO_X_LOCK
297 					   | MTR_MEMO_SX_LOCK));
298 
299 	/* S latches the page */
300 	root_block = btr_root_block_get(index, RW_S_LATCH, mtr);
301 
302 	if (root_block) {
303 		height = btr_page_get_level(buf_block_get_frame(root_block));
304 
305 		/* Release the S latch on the root page. */
306 		mtr->memo_release(root_block, MTR_MEMO_PAGE_S_FIX);
307 
308 		ut_d(sync_check_unlock(&root_block->lock));
309 	}
310 
311 	return(height);
312 }
313 
314 /**************************************************************//**
315 Checks a file segment header within a B-tree root page and updates
316 the segment header space id.
317 @return TRUE if valid */
318 static
319 bool
btr_root_fseg_adjust_on_import(fseg_header_t * seg_header,page_zip_des_t * page_zip,ulint space)320 btr_root_fseg_adjust_on_import(
321 /*===========================*/
322 	fseg_header_t*	seg_header,	/*!< in/out: segment header */
323 	page_zip_des_t*	page_zip,	/*!< in/out: compressed page,
324 					or NULL */
325 	ulint		space)		/*!< in: tablespace identifier */
326 {
327 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
328 
329 	if (offset < FIL_PAGE_DATA
330 	    || offset > srv_page_size - FIL_PAGE_DATA_END) {
331 		return false;
332 	}
333 
334 	seg_header += FSEG_HDR_SPACE;
335 
336 	mach_write_to_4(seg_header, space);
337 	if (UNIV_LIKELY_NULL(page_zip)) {
338 		memcpy(page_zip->data + page_offset(seg_header), seg_header,
339 		       4);
340 	}
341 
342 	return true;
343 }
344 
345 /**************************************************************//**
346 Checks and adjusts the root node of a tree during IMPORT TABLESPACE.
347 @return error code, or DB_SUCCESS */
348 dberr_t
btr_root_adjust_on_import(const dict_index_t * index)349 btr_root_adjust_on_import(
350 /*======================*/
351 	const dict_index_t*	index)	/*!< in: index tree */
352 {
353 	dberr_t			err;
354 	mtr_t			mtr;
355 	page_t*			page;
356 	buf_block_t*		block;
357 	page_zip_des_t*		page_zip;
358 	dict_table_t*		table = index->table;
359 	const page_id_t		page_id(table->space_id, index->page);
360 	const page_size_t	page_size(table->space->flags);
361 
362 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_3",
363 			return(DB_CORRUPTION););
364 
365 	mtr_start(&mtr);
366 
367 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
368 
369 	block = btr_block_get(page_id, page_size, RW_X_LATCH, index, &mtr);
370 
371 	page = buf_block_get_frame(block);
372 	page_zip = buf_block_get_page_zip(block);
373 
374 	if (!fil_page_index_page_check(page) || page_has_siblings(page)) {
375 		err = DB_CORRUPTION;
376 
377 	} else if (dict_index_is_clust(index)) {
378 		bool	page_is_compact_format;
379 
380 		page_is_compact_format = page_is_comp(page) > 0;
381 
382 		/* Check if the page format and table format agree. */
383 		if (page_is_compact_format != dict_table_is_comp(table)) {
384 			err = DB_CORRUPTION;
385 		} else {
386 			/* Check that the table flags and the tablespace
387 			flags match. */
388 			err = (dict_tf_to_fsp_flags(table->flags)
389 			       == table->space->flags)
390 				? DB_SUCCESS : DB_CORRUPTION;
391 		}
392 	} else {
393 		err = DB_SUCCESS;
394 	}
395 
396 	/* Check and adjust the file segment headers, if all OK so far. */
397 	if (err == DB_SUCCESS
398 	    && (!btr_root_fseg_adjust_on_import(
399 			FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
400 			+ page, page_zip, table->space_id)
401 		|| !btr_root_fseg_adjust_on_import(
402 			FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
403 			+ page, page_zip, table->space_id))) {
404 
405 		err = DB_CORRUPTION;
406 	}
407 
408 	mtr_commit(&mtr);
409 
410 	return(err);
411 }
412 
413 /**************************************************************//**
414 Creates a new index page (not the root, and also not
415 used in page reorganization).  @see btr_page_empty(). */
416 void
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)417 btr_page_create(
418 /*============*/
419 	buf_block_t*	block,	/*!< in/out: page to be created */
420 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
421 	dict_index_t*	index,	/*!< in: index */
422 	ulint		level,	/*!< in: the B-tree level of the page */
423 	mtr_t*		mtr)	/*!< in: mtr */
424 {
425 	page_t*		page = buf_block_get_frame(block);
426 
427 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
428 
429 	if (page_zip) {
430 		page_create_zip(block, index, level, 0, NULL, mtr);
431 	} else {
432 		page_create(block, mtr, dict_table_is_comp(index->table),
433 			    dict_index_is_spatial(index));
434 		/* Set the level of the new index page */
435 		btr_page_set_level(page, NULL, level, mtr);
436 	}
437 
438 	/* For Spatial Index, initialize the Split Sequence Number */
439 	if (dict_index_is_spatial(index)) {
440 		page_set_ssn_id(block, page_zip, 0, mtr);
441 	}
442 
443 	btr_page_set_index_id(page, page_zip, index->id, mtr);
444 }
445 
446 /**************************************************************//**
447 Allocates a new file page to be used in an ibuf tree. Takes the page from
448 the free list of the tree, which must contain pages!
449 @return new allocated block, x-latched */
450 static
451 buf_block_t*
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)452 btr_page_alloc_for_ibuf(
453 /*====================*/
454 	dict_index_t*	index,	/*!< in: index tree */
455 	mtr_t*		mtr)	/*!< in: mtr */
456 {
457 	fil_addr_t	node_addr;
458 	page_t*		root;
459 	page_t*		new_page;
460 	buf_block_t*	new_block;
461 
462 	root = btr_root_get(index, mtr);
463 
464 	node_addr = flst_get_first(root + PAGE_HEADER
465 				   + PAGE_BTR_IBUF_FREE_LIST, mtr);
466 	ut_a(node_addr.page != FIL_NULL);
467 
468 	new_block = buf_page_get(
469 		page_id_t(index->table->space_id, node_addr.page),
470 		page_size_t(index->table->space->flags),
471 		RW_X_LATCH, mtr);
472 
473 	new_page = buf_block_get_frame(new_block);
474 	buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
475 
476 	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
477 		    new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
478 		    mtr);
479 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
480 			    mtr));
481 
482 	return(new_block);
483 }
484 
485 /**************************************************************//**
486 Allocates a new file page to be used in an index tree. NOTE: we assume
487 that the caller has made the reservation for free extents!
488 @retval NULL if no page could be allocated
489 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
490 (init_mtr == mtr, or the page was not previously freed in mtr)
491 @retval block (not allocated or initialized) otherwise */
492 static MY_ATTRIBUTE((nonnull, warn_unused_result))
493 buf_block_t*
btr_page_alloc_low(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)494 btr_page_alloc_low(
495 /*===============*/
496 	dict_index_t*	index,		/*!< in: index */
497 	ulint		hint_page_no,	/*!< in: hint of a good page */
498 	byte		file_direction,	/*!< in: direction where a possible
499 					page split is made */
500 	ulint		level,		/*!< in: level where the page is placed
501 					in the tree */
502 	mtr_t*		mtr,		/*!< in/out: mini-transaction
503 					for the allocation */
504 	mtr_t*		init_mtr)	/*!< in/out: mtr or another
505 					mini-transaction in which the
506 					page should be initialized.
507 					If init_mtr!=mtr, but the page
508 					is already X-latched in mtr, do
509 					not initialize the page. */
510 {
511 	fseg_header_t*	seg_header;
512 	page_t*		root;
513 
514 	root = btr_root_get(index, mtr);
515 
516 	if (level == 0) {
517 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
518 	} else {
519 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
520 	}
521 
522 	/* Parameter TRUE below states that the caller has made the
523 	reservation for free extents, and thus we know that a page can
524 	be allocated: */
525 
526 	buf_block_t* block = fseg_alloc_free_page_general(
527 		seg_header, hint_page_no, file_direction,
528 		TRUE, mtr, init_mtr);
529 
530 #ifdef UNIV_DEBUG_SCRUBBING
531 	if (block != NULL) {
532 		fprintf(stderr,
533 			"alloc %lu:%lu to index: %lu root: %lu\n",
534 			buf_block_get_page_no(block),
535 			buf_block_get_space(block),
536 			index->id,
537 			dict_index_get_page(index));
538 	} else {
539 		fprintf(stderr,
540 			"failed alloc index: %lu root: %lu\n",
541 			index->id,
542 			dict_index_get_page(index));
543 	}
544 #endif /* UNIV_DEBUG_SCRUBBING */
545 
546 	return block;
547 }
548 
549 /**************************************************************//**
550 Allocates a new file page to be used in an index tree. NOTE: we assume
551 that the caller has made the reservation for free extents!
552 @retval NULL if no page could be allocated
553 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
554 (init_mtr == mtr, or the page was not previously freed in mtr)
555 @retval block (not allocated or initialized) otherwise */
556 buf_block_t*
btr_page_alloc(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)557 btr_page_alloc(
558 /*===========*/
559 	dict_index_t*	index,		/*!< in: index */
560 	ulint		hint_page_no,	/*!< in: hint of a good page */
561 	byte		file_direction,	/*!< in: direction where a possible
562 					page split is made */
563 	ulint		level,		/*!< in: level where the page is placed
564 					in the tree */
565 	mtr_t*		mtr,		/*!< in/out: mini-transaction
566 					for the allocation */
567 	mtr_t*		init_mtr)	/*!< in/out: mini-transaction
568 					for x-latching and initializing
569 					the page */
570 {
571 	buf_block_t*	new_block;
572 
573 	if (dict_index_is_ibuf(index)) {
574 
575 		return(btr_page_alloc_for_ibuf(index, mtr));
576 	}
577 
578 	new_block = btr_page_alloc_low(
579 		index, hint_page_no, file_direction, level, mtr, init_mtr);
580 
581 	if (new_block) {
582 		buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
583 	}
584 
585 	return(new_block);
586 }
587 
588 /**************************************************************//**
589 Gets the number of pages in a B-tree.
590 @return number of pages, or ULINT_UNDEFINED if the index is unavailable */
591 ulint
btr_get_size(const dict_index_t * index,ulint flag,mtr_t * mtr)592 btr_get_size(
593 /*=========*/
594 	const dict_index_t*	index,	/*!< in: index */
595 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
596 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
597 				is s-latched */
598 {
599 	fseg_header_t*	seg_header;
600 	page_t*		root;
601 	ulint		n=0;
602 	ulint		dummy;
603 
604 	ut_ad(srv_read_only_mode
605 	      || mtr_memo_contains(mtr, dict_index_get_lock(index),
606 				   MTR_MEMO_S_LOCK));
607 
608 	if (index->page == FIL_NULL
609 	    || dict_index_is_online_ddl(index)
610 	    || !index->is_committed()) {
611 		return(ULINT_UNDEFINED);
612 	}
613 
614 	root = btr_root_get(index, mtr);
615 
616 	if (root) {
617 		if (flag == BTR_N_LEAF_PAGES) {
618 			seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
619 
620 			fseg_n_reserved_pages(seg_header, &n, mtr);
621 
622 		} else if (flag == BTR_TOTAL_SIZE) {
623 			seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
624 
625 			n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
626 
627 			seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
628 
629 			n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
630 		} else {
631 			ut_error;
632 		}
633 	} else {
634 		n = ULINT_UNDEFINED;
635 	}
636 
637 	return(n);
638 }
639 
640 /**************************************************************//**
641 Gets the number of reserved and used pages in a B-tree.
642 @return	number of pages reserved, or ULINT_UNDEFINED if the index
643 is unavailable */
644 UNIV_INTERN
645 ulint
btr_get_size_and_reserved(dict_index_t * index,ulint flag,ulint * used,mtr_t * mtr)646 btr_get_size_and_reserved(
647 /*======================*/
648 	dict_index_t*	index,	/*!< in: index */
649 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
650 	ulint*		used,	/*!< out: number of pages used (<= reserved) */
651 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
652 				is s-latched */
653 {
654 	fseg_header_t*	seg_header;
655 	page_t*		root;
656 	ulint		n=ULINT_UNDEFINED;
657 	ulint		dummy;
658 
659 	ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
660 				MTR_MEMO_S_LOCK));
661 
662 	ut_a(flag == BTR_N_LEAF_PAGES || flag == BTR_TOTAL_SIZE);
663 
664 	if (index->page == FIL_NULL
665 	    || dict_index_is_online_ddl(index)
666 	    || !index->is_committed()) {
667 		return(ULINT_UNDEFINED);
668 	}
669 
670 	root = btr_root_get(index, mtr);
671 	*used = 0;
672 
673 	if (root) {
674 
675 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
676 
677 		n = fseg_n_reserved_pages(seg_header, used, mtr);
678 
679 		if (flag == BTR_TOTAL_SIZE) {
680 			seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
681 
682 			n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
683 			*used += dummy;
684 
685 		}
686 	}
687 
688 	return(n);
689 }
690 
691 /**************************************************************//**
692 Frees a page used in an ibuf tree. Puts the page to the free list of the
693 ibuf tree. */
694 static
695 void
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)696 btr_page_free_for_ibuf(
697 /*===================*/
698 	dict_index_t*	index,	/*!< in: index tree */
699 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
700 	mtr_t*		mtr)	/*!< in: mtr */
701 {
702 	page_t*		root;
703 
704 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
705 	root = btr_root_get(index, mtr);
706 
707 	flst_add_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
708 		       buf_block_get_frame(block)
709 		       + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
710 
711 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
712 			    mtr));
713 }
714 
715 /** Free an index page.
716 @param[in,out]	index	index tree
717 @param[in,out]	block	block to be freed
718 @param[in,out]	mtr	mini-transaction
719 @param[in]	blob	whether this is freeing a BLOB page */
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr,bool blob)720 void btr_page_free(dict_index_t* index, buf_block_t* block, mtr_t* mtr,
721 		   bool blob)
722 {
723 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
724 #ifdef BTR_CUR_HASH_ADAPT
725 	if (block->index && !block->index->freed()) {
726 		ut_ad(!blob);
727 		ut_ad(page_is_leaf(block->frame));
728 	}
729 #endif
730 	ut_ad(index->table->space_id == block->page.id.space());
731 	/* The root page is freed by btr_free_root(). */
732 	ut_ad(block->page.id.page_no() != index->page);
733 	ut_ad(mtr->is_named_space(index->table->space));
734 
735 	/* The page gets invalid for optimistic searches: increment the frame
736 	modify clock */
737 
738 	buf_block_modify_clock_inc(block);
739 
740 	if (dict_index_is_ibuf(index)) {
741 		btr_page_free_for_ibuf(index, block, mtr);
742 		return;
743 	}
744 
745 	/* TODO: Discard any operations for block from mtr->log.
746 	The page will be freed, so previous changes to it by this
747 	mini-transaction should not matter. */
748 	page_t* root = btr_root_get(index, mtr);
749 	fseg_header_t* seg_header = &root[blob || page_is_leaf(block->frame)
750 					  ? PAGE_HEADER + PAGE_BTR_SEG_LEAF
751 					  : PAGE_HEADER + PAGE_BTR_SEG_TOP];
752 	fseg_free_page(seg_header,
753 		       index->table->space, block->page.id.page_no(), mtr);
754 
755 	/* The page was marked free in the allocation bitmap, but it
756 	should remain exclusively latched until mtr_t::commit() or until it
757 	is explicitly freed from the mini-transaction. */
758 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
759 
760 	if (srv_immediate_scrub_data_uncompressed) {
761 		/* In MDEV-15528 this code must be removed and the
762 		check in buf_flush_init_for_writing() re-enabled.  We
763 		should zero out the page after the redo log for this
764 		mini-transaction has been durably written. The log
765 		would include the 10.4 MLOG_INIT_FREE_PAGE record. */
766 		fsp_init_file_page(index->table->space, block, mtr);
767 	}
768 }
769 
770 /**************************************************************//**
771 Sets the child node file address in a node pointer. */
772 UNIV_INLINE
773 void
btr_node_ptr_set_child_page_no(rec_t * rec,page_zip_des_t * page_zip,const rec_offs * offsets,ulint page_no,mtr_t * mtr)774 btr_node_ptr_set_child_page_no(
775 /*===========================*/
776 	rec_t*		rec,	/*!< in: node pointer record */
777 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
778 				part will be updated, or NULL */
779 	const rec_offs*	offsets,/*!< in: array returned by rec_get_offsets() */
780 	ulint		page_no,/*!< in: child node address */
781 	mtr_t*		mtr)	/*!< in: mtr */
782 {
783 	byte*	field;
784 	ulint	len;
785 
786 	ut_ad(rec_offs_validate(rec, NULL, offsets));
787 	ut_ad(!page_rec_is_leaf(rec));
788 	ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
789 
790 	/* The child address is in the last field */
791 	field = rec_get_nth_field(rec, offsets,
792 				  rec_offs_n_fields(offsets) - 1, &len);
793 
794 	ut_ad(len == REC_NODE_PTR_SIZE);
795 
796 	if (page_zip) {
797 		page_zip_write_node_ptr(page_zip, rec,
798 					rec_offs_data_size(offsets),
799 					page_no, mtr);
800 	} else {
801 		mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
802 	}
803 }
804 
805 /************************************************************//**
806 Returns the child page of a node pointer and sx-latches it.
807 @return child page, sx-latched */
808 static
809 buf_block_t*
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const rec_offs * offsets,mtr_t * mtr)810 btr_node_ptr_get_child(
811 /*===================*/
812 	const rec_t*	node_ptr,/*!< in: node pointer */
813 	dict_index_t*	index,	/*!< in: index */
814 	const rec_offs*	offsets,/*!< in: array returned by rec_get_offsets() */
815 	mtr_t*		mtr)	/*!< in: mtr */
816 {
817 	ut_ad(rec_offs_validate(node_ptr, index, offsets));
818 	ut_ad(index->table->space_id
819 	      == page_get_space_id(page_align(node_ptr)));
820 
821 	return btr_block_get(
822 		page_id_t(index->table->space_id,
823 			  btr_node_ptr_get_child_page_no(node_ptr, offsets)),
824 		page_size_t(index->table->space->flags),
825 		RW_SX_LATCH, index, mtr);
826 }
827 
828 /************************************************************//**
829 Returns the upper level node pointer to a page. It is assumed that mtr holds
830 an sx-latch on the tree.
831 @return rec_get_offsets() of the node pointer record */
832 static
833 rec_offs*
btr_page_get_father_node_ptr_func(rec_offs * offsets,mem_heap_t * heap,btr_cur_t * cursor,ulint latch_mode,const char * file,unsigned line,mtr_t * mtr)834 btr_page_get_father_node_ptr_func(
835 /*==============================*/
836 	rec_offs*	offsets,/*!< in: work area for the return value */
837 	mem_heap_t*	heap,	/*!< in: memory heap to use */
838 	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
839 				out: cursor on node pointer record,
840 				its page x-latched */
841 	ulint		latch_mode,/*!< in: BTR_CONT_MODIFY_TREE
842 				or BTR_CONT_SEARCH_TREE */
843 	const char*	file,	/*!< in: file name */
844 	unsigned	line,	/*!< in: line where called */
845 	mtr_t*		mtr)	/*!< in: mtr */
846 {
847 	dtuple_t*	tuple;
848 	rec_t*		user_rec;
849 	rec_t*		node_ptr;
850 	ulint		level;
851 	ulint		page_no;
852 	dict_index_t*	index;
853 
854 	ut_ad(latch_mode == BTR_CONT_MODIFY_TREE
855 	      || latch_mode == BTR_CONT_SEARCH_TREE);
856 
857 	page_no = btr_cur_get_block(cursor)->page.id.page_no();
858 	index = btr_cur_get_index(cursor);
859 	ut_ad(!dict_index_is_spatial(index));
860 
861 	ut_ad(srv_read_only_mode
862 	      || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
863 					   MTR_MEMO_X_LOCK
864 					   | MTR_MEMO_SX_LOCK));
865 
866 	ut_ad(dict_index_get_page(index) != page_no);
867 
868 	level = btr_page_get_level(btr_cur_get_page(cursor));
869 
870 	user_rec = btr_cur_get_rec(cursor);
871 	ut_a(page_rec_is_user_rec(user_rec));
872 
873 	tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
874 	dberr_t err = DB_SUCCESS;
875 
876 	err = btr_cur_search_to_nth_level(
877 		index, level + 1, tuple,
878 		PAGE_CUR_LE, latch_mode, cursor, 0,
879 		file, line, mtr);
880 
881 	if (err != DB_SUCCESS) {
882 		ib::warn() << " Error code: " << err
883 			<< " btr_page_get_father_node_ptr_func "
884 			<< " level: " << level + 1
885 			<< " called from file: "
886 			<< file << " line: " << line
887 			<< " table: " << index->table->name
888 			<< " index: " << index->name();
889 	}
890 
891 	node_ptr = btr_cur_get_rec(cursor);
892 
893 	offsets = rec_get_offsets(node_ptr, index, offsets, 0,
894 				  ULINT_UNDEFINED, &heap);
895 
896 	if (btr_node_ptr_get_child_page_no(node_ptr, offsets) != page_no) {
897 		rec_t*	print_rec;
898 
899 		ib::error()
900 			<< "Corruption of an index tree: table "
901 			<< index->table->name
902 			<< " index " << index->name
903 			<< ", father ptr page no "
904 			<< btr_node_ptr_get_child_page_no(node_ptr, offsets)
905 			<< ", child page no " << page_no;
906 
907 		print_rec = page_rec_get_next(
908 			page_get_infimum_rec(page_align(user_rec)));
909 		offsets = rec_get_offsets(print_rec, index, offsets,
910 					  page_rec_is_leaf(user_rec)
911 					  ? index->n_core_fields : 0,
912 					  ULINT_UNDEFINED, &heap);
913 		page_rec_print(print_rec, offsets);
914 		offsets = rec_get_offsets(node_ptr, index, offsets, 0,
915 					  ULINT_UNDEFINED, &heap);
916 		page_rec_print(node_ptr, offsets);
917 
918 		ib::fatal()
919 			<< "You should dump + drop + reimport the table to"
920 			<< " fix the corruption. If the crash happens at"
921 			<< " database startup. " << FORCE_RECOVERY_MSG
922 			<< " Then dump + drop + reimport.";
923 	}
924 
925 	return(offsets);
926 }
927 
928 #define btr_page_get_father_node_ptr(of,heap,cur,mtr)			\
929 	btr_page_get_father_node_ptr_func(				\
930 		of,heap,cur,BTR_CONT_MODIFY_TREE,__FILE__,__LINE__,mtr)
931 
932 #define btr_page_get_father_node_ptr_for_validate(of,heap,cur,mtr)	\
933 	btr_page_get_father_node_ptr_func(				\
934 		of,heap,cur,BTR_CONT_SEARCH_TREE,__FILE__,__LINE__,mtr)
935 
936 /************************************************************//**
937 Returns the upper level node pointer to a page. It is assumed that mtr holds
938 an x-latch on the tree.
939 @return rec_get_offsets() of the node pointer record */
940 static
941 rec_offs*
btr_page_get_father_block(rec_offs * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)942 btr_page_get_father_block(
943 /*======================*/
944 	rec_offs*	offsets,/*!< in: work area for the return value */
945 	mem_heap_t*	heap,	/*!< in: memory heap to use */
946 	dict_index_t*	index,	/*!< in: b-tree index */
947 	buf_block_t*	block,	/*!< in: child page in the index */
948 	mtr_t*		mtr,	/*!< in: mtr */
949 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
950 				its page x-latched */
951 {
952 	rec_t*	rec
953 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
954 								 block)));
955 	btr_cur_position(index, rec, block, cursor);
956 	return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
957 }
958 
959 /** Seek to the parent page of a B-tree page.
960 @param[in,out]	index	b-tree
961 @param[in]	block	child page
962 @param[in,out]	mtr	mini-transaction
963 @param[out]	cursor	cursor pointing to the x-latched parent page */
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)964 void btr_page_get_father(dict_index_t* index, buf_block_t* block, mtr_t* mtr,
965 			 btr_cur_t* cursor)
966 {
967 	mem_heap_t*	heap;
968 	rec_t*		rec
969 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
970 								 block)));
971 	btr_cur_position(index, rec, block, cursor);
972 
973 	heap = mem_heap_create(100);
974 	btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
975 	mem_heap_free(heap);
976 }
977 
978 /** PAGE_INDEX_ID value for freed index B-trees */
979 static const index_id_t	BTR_FREED_INDEX_ID = 0;
980 
981 /** Free a B-tree root page. btr_free_but_not_root() must already
982 have been called.
983 In a persistent tablespace, the caller must invoke fsp_init_file_page()
984 before mtr.commit().
985 @param[in,out]	block		index root page
986 @param[in,out]	mtr		mini-transaction
987 @param[in]	invalidate	whether to invalidate PAGE_INDEX_ID */
btr_free_root(buf_block_t * block,mtr_t * mtr,bool invalidate)988 static void btr_free_root(buf_block_t* block, mtr_t* mtr, bool invalidate)
989 {
990 	fseg_header_t*	header;
991 
992 	ut_ad(mtr_memo_contains_flagged(mtr, block, MTR_MEMO_PAGE_X_FIX
993 					| MTR_MEMO_PAGE_SX_FIX));
994 	ut_ad(mtr->is_named_space(block->page.id.space()));
995 
996 	btr_search_drop_page_hash_index(block);
997 
998 	header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
999 #ifdef UNIV_BTR_DEBUG
1000 	ut_a(btr_root_fseg_validate(header, block->page.id.space()));
1001 #endif /* UNIV_BTR_DEBUG */
1002 	if (invalidate) {
1003 		btr_page_set_index_id(
1004 			buf_block_get_frame(block),
1005 			buf_block_get_page_zip(block),
1006 			BTR_FREED_INDEX_ID, mtr);
1007 	}
1008 
1009 	while (!fseg_free_step(header, mtr)) {
1010 		/* Free the entire segment in small steps. */
1011 	}
1012 }
1013 
1014 /** Prepare to free a B-tree.
1015 @param[in]	page_id		page id
1016 @param[in]	page_size	page size
1017 @param[in]	index_id	PAGE_INDEX_ID contents
1018 @param[in,out]	mtr		mini-transaction
1019 @return root block, to invoke btr_free_but_not_root() and btr_free_root()
1020 @retval NULL if the page is no longer a matching B-tree page */
1021 static MY_ATTRIBUTE((warn_unused_result))
1022 buf_block_t*
btr_free_root_check(const page_id_t page_id,const page_size_t & page_size,index_id_t index_id,mtr_t * mtr)1023 btr_free_root_check(
1024 	const page_id_t		page_id,
1025 	const page_size_t&	page_size,
1026 	index_id_t		index_id,
1027 	mtr_t*			mtr)
1028 {
1029 	ut_ad(page_id.space() != SRV_TMP_SPACE_ID);
1030 	ut_ad(index_id != BTR_FREED_INDEX_ID);
1031 
1032 	buf_block_t*	block = buf_page_get(
1033 		page_id, page_size, RW_X_LATCH, mtr);
1034 
1035 	if (block) {
1036 		buf_block_dbg_add_level(block, SYNC_TREE_NODE);
1037 
1038 		if (fil_page_index_page_check(block->frame)
1039 		    && index_id == btr_page_get_index_id(block->frame)) {
1040 			/* This should be a root page.
1041 			It should not be possible to reassign the same
1042 			index_id for some other index in the tablespace. */
1043 			ut_ad(!page_has_siblings(block->frame));
1044 		} else {
1045 			block = NULL;
1046 		}
1047 	}
1048 
1049 	return(block);
1050 }
1051 
1052 /** Create the root node for a new index tree.
1053 @param[in]	type			type of the index
1054 @param[in,out]	space			tablespace where created
1055 @param[in]	index_id		index id
1056 @param[in]	index			index, or NULL when applying TRUNCATE
1057 log record during recovery
1058 @param[in]	btr_redo_create_info	used for applying TRUNCATE log
1059 @param[in]	mtr			mini-transaction handle
1060 record during recovery
1061 @return page number of the created root, FIL_NULL if did not succeed */
1062 ulint
btr_create(ulint type,fil_space_t * space,index_id_t index_id,dict_index_t * index,const btr_create_t * btr_redo_create_info,mtr_t * mtr)1063 btr_create(
1064 	ulint			type,
1065 	fil_space_t*		space,
1066 	index_id_t		index_id,
1067 	dict_index_t*		index,
1068 	const btr_create_t*	btr_redo_create_info,
1069 	mtr_t*			mtr)
1070 {
1071 	buf_block_t*		block;
1072 	page_t*			page;
1073 	page_zip_des_t*		page_zip;
1074 
1075 	ut_ad(mtr->is_named_space(space));
1076 	ut_ad(index_id != BTR_FREED_INDEX_ID);
1077 
1078 	/* Create the two new segments (one, in the case of an ibuf tree) for
1079 	the index tree; the segment headers are put on the allocated root page
1080 	(for an ibuf tree, not in the root, but on a separate ibuf header
1081 	page) */
1082 
1083 	if (type & DICT_IBUF) {
1084 		/* Allocate first the ibuf header page */
1085 		buf_block_t*	ibuf_hdr_block = fseg_create(
1086 			space, IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
1087 
1088 		if (ibuf_hdr_block == NULL) {
1089 			return(FIL_NULL);
1090 		}
1091 
1092 		buf_block_dbg_add_level(
1093 			ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
1094 
1095 		ut_ad(ibuf_hdr_block->page.id.page_no()
1096 		      == IBUF_HEADER_PAGE_NO);
1097 		/* Allocate then the next page to the segment: it will be the
1098 		tree root page */
1099 
1100 		block = fseg_alloc_free_page(
1101 			buf_block_get_frame(ibuf_hdr_block)
1102 			+ IBUF_HEADER + IBUF_TREE_SEG_HEADER,
1103 			IBUF_TREE_ROOT_PAGE_NO,
1104 			FSP_UP, mtr);
1105 
1106 		if (block == NULL) {
1107 			return(FIL_NULL);
1108 		}
1109 
1110 		ut_ad(block->page.id.page_no() == IBUF_TREE_ROOT_PAGE_NO);
1111 
1112 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
1113 
1114 		flst_init(block->frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
1115 			  mtr);
1116 	} else {
1117 		block = fseg_create(space,
1118 				    PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
1119 
1120 		if (block == NULL) {
1121 			return(FIL_NULL);
1122 		}
1123 
1124 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1125 
1126 		if (!fseg_create(space,
1127 				 PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr,
1128 				 false, block)) {
1129 			/* Not enough space for new segment, free root
1130 			segment before return. */
1131 			btr_free_root(block, mtr,
1132 				      !index->table->is_temporary());
1133 			return(FIL_NULL);
1134 		}
1135 
1136 		/* The fseg create acquires a second latch on the page,
1137 		therefore we must declare it: */
1138 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1139 	}
1140 
1141 	/* Create a new index page on the allocated segment page */
1142 	page_zip = buf_block_get_page_zip(block);
1143 
1144 	if (page_zip) {
1145 		if (index != NULL) {
1146 			page = page_create_zip(block, index, 0, 0, NULL, mtr);
1147 		} else {
1148 			/* Create a compressed index page when applying
1149 			TRUNCATE log record during recovery */
1150 			ut_ad(btr_redo_create_info != NULL);
1151 
1152 			redo_page_compress_t	page_comp_info;
1153 
1154 			page_comp_info.type = type;
1155 
1156 			page_comp_info.index_id = index_id;
1157 
1158 			page_comp_info.n_fields =
1159 				btr_redo_create_info->n_fields;
1160 
1161 			page_comp_info.field_len =
1162 				btr_redo_create_info->field_len;
1163 
1164 			page_comp_info.fields = btr_redo_create_info->fields;
1165 
1166 			page_comp_info.trx_id_pos =
1167 				btr_redo_create_info->trx_id_pos;
1168 
1169 			page = page_create_zip(block, NULL, 0, 0,
1170 					       &page_comp_info, mtr);
1171 		}
1172 	} else {
1173 		if (index != NULL) {
1174 			page = page_create(block, mtr,
1175 					   dict_table_is_comp(index->table),
1176 					   dict_index_is_spatial(index));
1177 		} else {
1178 			ut_ad(btr_redo_create_info != NULL);
1179 			page = page_create(
1180 				block, mtr, btr_redo_create_info->format_flags,
1181 				type == DICT_SPATIAL);
1182 		}
1183 		/* Set the level of the new index page */
1184 		btr_page_set_level(page, NULL, 0, mtr);
1185 	}
1186 
1187 	/* Set the index id of the page */
1188 	btr_page_set_index_id(page, page_zip, index_id, mtr);
1189 
1190 	/* Set the next node and previous node fields */
1191 	btr_page_set_next(page, page_zip, FIL_NULL, mtr);
1192 	btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
1193 
1194 	/* We reset the free bits for the page to allow creation of several
1195 	trees in the same mtr, otherwise the latch on a bitmap page would
1196 	prevent it because of the latching order.
1197 
1198 	index will be NULL if we are recreating the table during recovery
1199 	on behalf of TRUNCATE.
1200 
1201 	Note: Insert Buffering is disabled for temporary tables given that
1202 	most temporary tables are smaller in size and short-lived. */
1203 	if (!(type & DICT_CLUSTERED)
1204 	    && (index == NULL || !index->table->is_temporary())) {
1205 
1206 		ibuf_reset_free_bits(block);
1207 	}
1208 
1209 	/* In the following assertion we test that two records of maximum
1210 	allowed size fit on the root page: this fact is needed to ensure
1211 	correctness of split algorithms */
1212 
1213 	ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);
1214 
1215 	return(block->page.id.page_no());
1216 }
1217 
1218 /** Free a B-tree except the root page. The root page MUST be freed after
1219 this by calling btr_free_root.
1220 @param[in,out]	block		root page
1221 @param[in]	log_mode	mtr logging mode */
1222 static
1223 void
btr_free_but_not_root(buf_block_t * block,mtr_log_t log_mode)1224 btr_free_but_not_root(
1225 	buf_block_t*	block,
1226 	mtr_log_t	log_mode)
1227 {
1228 	ibool	finished;
1229 	mtr_t	mtr;
1230 
1231 	ut_ad(fil_page_index_page_check(block->frame));
1232 	ut_ad(!page_has_siblings(block->frame));
1233 leaf_loop:
1234 	mtr_start(&mtr);
1235 	mtr_set_log_mode(&mtr, log_mode);
1236 	mtr.set_named_space_id(block->page.id.space());
1237 
1238 	page_t*	root = block->frame;
1239 
1240 	if (!root) {
1241 		mtr_commit(&mtr);
1242 		return;
1243 	}
1244 
1245 #ifdef UNIV_BTR_DEBUG
1246 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1247 				    + root, block->page.id.space()));
1248 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1249 				    + root, block->page.id.space()));
1250 #endif /* UNIV_BTR_DEBUG */
1251 
1252 	/* NOTE: page hash indexes are dropped when a page is freed inside
1253 	fsp0fsp. */
1254 
1255 	finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
1256 				  &mtr);
1257 	mtr_commit(&mtr);
1258 
1259 	if (!finished) {
1260 
1261 		goto leaf_loop;
1262 	}
1263 top_loop:
1264 	mtr_start(&mtr);
1265 	mtr_set_log_mode(&mtr, log_mode);
1266 	mtr.set_named_space_id(block->page.id.space());
1267 
1268 	root = block->frame;
1269 
1270 #ifdef UNIV_BTR_DEBUG
1271 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1272 				    + root, block->page.id.space()));
1273 #endif /* UNIV_BTR_DEBUG */
1274 
1275 	finished = fseg_free_step_not_header(
1276 		root + PAGE_HEADER + PAGE_BTR_SEG_TOP, &mtr);
1277 	mtr_commit(&mtr);
1278 
1279 	if (!finished) {
1280 		goto top_loop;
1281 	}
1282 }
1283 
1284 /** Free a persistent index tree if it exists.
1285 @param[in]	page_id		root page id
1286 @param[in]	page_size	page size
1287 @param[in]	index_id	PAGE_INDEX_ID contents
1288 @param[in,out]	mtr		mini-transaction */
1289 void
btr_free_if_exists(const page_id_t page_id,const page_size_t & page_size,index_id_t index_id,mtr_t * mtr)1290 btr_free_if_exists(
1291 	const page_id_t		page_id,
1292 	const page_size_t&	page_size,
1293 	index_id_t		index_id,
1294 	mtr_t*			mtr)
1295 {
1296 	buf_block_t* root = btr_free_root_check(
1297 		page_id, page_size, index_id, mtr);
1298 
1299 	if (root == NULL) {
1300 		return;
1301 	}
1302 
1303 	btr_free_but_not_root(root, mtr->get_log_mode());
1304 	mtr->set_named_space_id(page_id.space());
1305 	btr_free_root(root, mtr, true);
1306 }
1307 
1308 /** Free an index tree in a temporary tablespace or during TRUNCATE TABLE.
1309 @param[in]	page_id		root page id
1310 @param[in]	page_size	page size */
1311 void
btr_free(const page_id_t page_id,const page_size_t & page_size)1312 btr_free(
1313 	const page_id_t		page_id,
1314 	const page_size_t&	page_size)
1315 {
1316 	mtr_t		mtr;
1317 	mtr.start();
1318 	mtr.set_log_mode(MTR_LOG_NO_REDO);
1319 
1320 	buf_block_t*	block = buf_page_get(
1321 		page_id, page_size, RW_X_LATCH, &mtr);
1322 
1323 	if (block) {
1324 		btr_free_but_not_root(block, MTR_LOG_NO_REDO);
1325 		btr_free_root(block, &mtr, false);
1326 	}
1327 	mtr.commit();
1328 }
1329 
1330 /** Read the last used AUTO_INCREMENT value from PAGE_ROOT_AUTO_INC.
1331 @param[in,out]	index	clustered index
1332 @return	the last used AUTO_INCREMENT value
1333 @retval	0 on error or if no AUTO_INCREMENT value was used yet */
1334 ib_uint64_t
btr_read_autoinc(dict_index_t * index)1335 btr_read_autoinc(dict_index_t* index)
1336 {
1337 	ut_ad(index->is_primary());
1338 	ut_ad(index->table->persistent_autoinc);
1339 	ut_ad(!index->table->is_temporary());
1340 	mtr_t		mtr;
1341 	mtr.start();
1342 	ib_uint64_t	autoinc;
1343 	if (buf_block_t* block = buf_page_get(
1344 		    page_id_t(index->table->space_id, index->page),
1345 		    page_size_t(index->table->space->flags),
1346 		    RW_S_LATCH, &mtr)) {
1347 		autoinc = page_get_autoinc(block->frame);
1348 	} else {
1349 		autoinc = 0;
1350 	}
1351 	mtr.commit();
1352 	return autoinc;
1353 }
1354 
1355 /** Read the last used AUTO_INCREMENT value from PAGE_ROOT_AUTO_INC,
1356 or fall back to MAX(auto_increment_column).
1357 @param[in]	table	table containing an AUTO_INCREMENT column
1358 @param[in]	col_no	index of the AUTO_INCREMENT column
1359 @return	the AUTO_INCREMENT value
1360 @retval	0 on error or if no AUTO_INCREMENT value was used yet */
1361 ib_uint64_t
btr_read_autoinc_with_fallback(const dict_table_t * table,unsigned col_no)1362 btr_read_autoinc_with_fallback(const dict_table_t* table, unsigned col_no)
1363 {
1364 	ut_ad(table->persistent_autoinc);
1365 	ut_ad(!table->is_temporary());
1366 
1367 	dict_index_t*	index = dict_table_get_first_index(table);
1368 
1369 	if (index == NULL) {
1370 		return 0;
1371 	}
1372 
1373 	mtr_t		mtr;
1374 	mtr.start();
1375 	buf_block_t*	block = buf_page_get(
1376 		page_id_t(index->table->space_id, index->page),
1377 		page_size_t(index->table->space->flags),
1378 		RW_S_LATCH, &mtr);
1379 
1380 	ib_uint64_t	autoinc	= block ? page_get_autoinc(block->frame) : 0;
1381 	const bool	retry	= block && autoinc == 0
1382 		&& !page_is_empty(block->frame);
1383 	mtr.commit();
1384 
1385 	if (retry) {
1386 		/* This should be an old data file where
1387 		PAGE_ROOT_AUTO_INC was initialized to 0.
1388 		Fall back to reading MAX(autoinc_col).
1389 		There should be an index on it. */
1390 		const dict_col_t*	autoinc_col
1391 			= dict_table_get_nth_col(table, col_no);
1392 		while (index && index->fields[0].col != autoinc_col) {
1393 			index = dict_table_get_next_index(index);
1394 		}
1395 
1396 		if (index) {
1397 			autoinc = row_search_max_autoinc(index);
1398 		}
1399 	}
1400 
1401 	return autoinc;
1402 }
1403 
1404 /** Write the next available AUTO_INCREMENT value to PAGE_ROOT_AUTO_INC.
1405 @param[in,out]	index	clustered index
1406 @param[in]	autoinc	the AUTO_INCREMENT value
1407 @param[in]	reset	whether to reset the AUTO_INCREMENT
1408 			to a possibly smaller value than currently
1409 			exists in the page */
1410 void
btr_write_autoinc(dict_index_t * index,ib_uint64_t autoinc,bool reset)1411 btr_write_autoinc(dict_index_t* index, ib_uint64_t autoinc, bool reset)
1412 {
1413 	ut_ad(index->is_primary());
1414 	ut_ad(index->table->persistent_autoinc);
1415 	ut_ad(!index->table->is_temporary());
1416 
1417 	mtr_t		mtr;
1418 	mtr.start();
1419 	fil_space_t* space = index->table->space;
1420 	mtr.set_named_space(space);
1421 	page_set_autoinc(buf_page_get(page_id_t(space->id, index->page),
1422 				      page_size_t(space->flags),
1423 				      RW_SX_LATCH, &mtr),
1424 			 index, autoinc, &mtr, reset);
1425 	mtr.commit();
1426 }
1427 
1428 /*************************************************************//**
1429 Reorganizes an index page.
1430 
1431 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1432 if this is a compressed leaf page in a secondary index. This has to
1433 be done either within the same mini-transaction, or by invoking
1434 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1435 IBUF_BITMAP_FREE is unaffected by reorganization.
1436 
1437 @retval true if the operation was successful
1438 @retval false if it is a compressed page, and recompression failed */
1439 bool
btr_page_reorganize_low(bool recovery,ulint z_level,page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1440 btr_page_reorganize_low(
1441 /*====================*/
1442 	bool		recovery,/*!< in: true if called in recovery:
1443 				locks should not be updated, i.e.,
1444 				there cannot exist locks on the
1445 				page, and a hash index should not be
1446 				dropped: it cannot exist */
1447 	ulint		z_level,/*!< in: compression level to be used
1448 				if dealing with compressed page */
1449 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1450 	dict_index_t*	index,	/*!< in: the index tree of the page */
1451 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1452 {
1453 	buf_block_t*	block		= page_cur_get_block(cursor);
1454 	buf_pool_t*	buf_pool	= buf_pool_from_bpage(&block->page);
1455 	page_t*		page		= buf_block_get_frame(block);
1456 	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
1457 	buf_block_t*	temp_block;
1458 	page_t*		temp_page;
1459 	ulint		data_size1;
1460 	ulint		data_size2;
1461 	ulint		max_ins_size1;
1462 	ulint		max_ins_size2;
1463 	bool		success		= false;
1464 	ulint		pos;
1465 	bool		log_compressed;
1466 	bool		is_spatial;
1467 
1468 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1469 	btr_assert_not_corrupted(block, index);
1470 	ut_ad(fil_page_index_page_check(block->frame));
1471 	ut_ad(index->is_dummy
1472 	      || block->page.id.space() == index->table->space->id);
1473 	ut_ad(index->is_dummy
1474 	      || block->page.id.page_no() != index->page
1475 	      || !page_has_siblings(page));
1476 #ifdef UNIV_ZIP_DEBUG
1477 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1478 #endif /* UNIV_ZIP_DEBUG */
1479 	data_size1 = page_get_data_size(page);
1480 	max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);
1481 	/* Turn logging off */
1482 	mtr_log_t	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1483 
1484 	temp_block = buf_block_alloc(buf_pool);
1485 	temp_page = temp_block->frame;
1486 
1487 	MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
1488 
1489 	/* This function can be called by log redo with a "dummy" index.
1490 	So we would trust more on the original page's type */
1491 	is_spatial = (fil_page_get_type(page) == FIL_PAGE_RTREE
1492 		      || dict_index_is_spatial(index));
1493 
1494 	/* Copy the old page to temporary space */
1495 	buf_frame_copy(temp_page, page);
1496 
1497 	if (!recovery) {
1498 		btr_search_drop_page_hash_index(block);
1499 	}
1500 
1501 	/* Save the cursor position. */
1502 	pos = page_rec_get_n_recs_before(page_cur_get_rec(cursor));
1503 
1504 	/* Recreate the page: note that global data on page (possible
1505 	segment headers, next page-field, etc.) is preserved intact */
1506 
1507 	page_create(block, mtr, dict_table_is_comp(index->table), is_spatial);
1508 
1509 	/* Copy the records from the temporary space to the recreated page;
1510 	do not copy the lock bits yet */
1511 
1512 	page_copy_rec_list_end_no_locks(block, temp_block,
1513 					page_get_infimum_rec(temp_page),
1514 					index, mtr);
1515 
1516 	/* Copy the PAGE_MAX_TRX_ID or PAGE_ROOT_AUTO_INC. */
1517 	memcpy(page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
1518 	       temp_page + (PAGE_HEADER + PAGE_MAX_TRX_ID), 8);
1519 	/* PAGE_MAX_TRX_ID is unused in clustered index pages
1520 	(other than the root where it is repurposed as PAGE_ROOT_AUTO_INC),
1521 	non-leaf pages, and in temporary tables. It was always
1522 	zero-initialized in page_create() in all InnoDB versions.
1523 	PAGE_MAX_TRX_ID must be nonzero on dict_index_is_sec_or_ibuf()
1524 	leaf pages.
1525 
1526 	During redo log apply, dict_index_is_sec_or_ibuf() always
1527 	holds, even for clustered indexes. */
1528 	ut_ad(recovery || index->table->is_temporary()
1529 	      || !page_is_leaf(temp_page)
1530 	      || !dict_index_is_sec_or_ibuf(index)
1531 	      || page_get_max_trx_id(page) != 0);
1532 	/* PAGE_MAX_TRX_ID must be zero on non-leaf pages other than
1533 	clustered index root pages. */
1534 	ut_ad(recovery
1535 	      || page_get_max_trx_id(page) == 0
1536 	      || (dict_index_is_sec_or_ibuf(index)
1537 		  ? page_is_leaf(temp_page)
1538 		  : block->page.id.page_no() == index->page));
1539 
1540 	/* If innodb_log_compressed_pages is ON, page reorganize should log the
1541 	compressed page image.*/
1542 	log_compressed = page_zip && page_zip_log_pages;
1543 
1544 	if (log_compressed) {
1545 		mtr_set_log_mode(mtr, log_mode);
1546 	}
1547 
1548 	if (page_zip
1549 	    && !page_zip_compress(page_zip, page, index, z_level, NULL, mtr)) {
1550 
1551 		/* Restore the old page and exit. */
1552 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1553 		/* Check that the bytes that we skip are identical. */
1554 		ut_a(!memcmp(page, temp_page, PAGE_HEADER));
1555 		ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
1556 			     PAGE_HEADER + PAGE_N_RECS + temp_page,
1557 			     PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
1558 		ut_a(!memcmp(srv_page_size - FIL_PAGE_DATA_END + page,
1559 			     srv_page_size - FIL_PAGE_DATA_END + temp_page,
1560 			     FIL_PAGE_DATA_END));
1561 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1562 
1563 		memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
1564 		       PAGE_N_RECS - PAGE_N_DIR_SLOTS);
1565 		memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
1566 		       srv_page_size - PAGE_DATA - FIL_PAGE_DATA_END);
1567 
1568 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1569 		ut_a(!memcmp(page, temp_page, srv_page_size));
1570 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1571 
1572 		goto func_exit;
1573 	}
1574 
1575 	if (!recovery && !dict_table_is_locking_disabled(index->table)) {
1576 		/* Update the record lock bitmaps */
1577 		lock_move_reorganize_page(block, temp_block);
1578 	}
1579 
1580 	data_size2 = page_get_data_size(page);
1581 	max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);
1582 
1583 	if (data_size1 != data_size2 || max_ins_size1 != max_ins_size2) {
1584 		ib::error()
1585 			<< "Page old data size " << data_size1
1586 			<< " new data size " << data_size2
1587 			<< ", page old max ins size " << max_ins_size1
1588 			<< " new max ins size " << max_ins_size2;
1589 
1590 		ib::error() << BUG_REPORT_MSG;
1591 		ut_ad(0);
1592 	} else {
1593 		success = true;
1594 	}
1595 
1596 	/* Restore the cursor position. */
1597 	if (pos > 0) {
1598 		cursor->rec = page_rec_get_nth(page, pos);
1599 	} else {
1600 		ut_ad(cursor->rec == page_get_infimum_rec(page));
1601 	}
1602 
1603 func_exit:
1604 #ifdef UNIV_ZIP_DEBUG
1605 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1606 #endif /* UNIV_ZIP_DEBUG */
1607 
1608 	if (!recovery && block->page.id.page_no() == index->page
1609 	    && fil_page_get_type(temp_page) == FIL_PAGE_TYPE_INSTANT) {
1610 		/* Preserve the PAGE_INSTANT information. */
1611 		ut_ad(!page_zip);
1612 		ut_ad(index->is_instant());
1613 		memcpy(FIL_PAGE_TYPE + page, FIL_PAGE_TYPE + temp_page, 2);
1614 		memcpy(PAGE_HEADER + PAGE_INSTANT + page,
1615 		       PAGE_HEADER + PAGE_INSTANT + temp_page, 2);
1616 	}
1617 
1618 	buf_block_free(temp_block);
1619 
1620 	/* Restore logging mode */
1621 	mtr_set_log_mode(mtr, log_mode);
1622 
1623 	if (success) {
1624 		mlog_id_t	type;
1625 		byte*		log_ptr;
1626 
1627 		/* Write the log record */
1628 		if (page_zip) {
1629 			ut_ad(page_is_comp(page));
1630 			type = MLOG_ZIP_PAGE_REORGANIZE;
1631 		} else if (page_is_comp(page)) {
1632 			type = MLOG_COMP_PAGE_REORGANIZE;
1633 		} else {
1634 			type = MLOG_PAGE_REORGANIZE;
1635 		}
1636 
1637 		log_ptr = log_compressed
1638 			? NULL
1639 			: mlog_open_and_write_index(
1640 				mtr, page, index, type,
1641 				page_zip ? 1 : 0);
1642 
1643 		/* For compressed pages write the compression level. */
1644 		if (log_ptr && page_zip) {
1645 			mach_write_to_1(log_ptr, z_level);
1646 			mlog_close(mtr, log_ptr + 1);
1647 		}
1648 
1649 		MONITOR_INC(MONITOR_INDEX_REORG_SUCCESSFUL);
1650 	}
1651 
1652 	if (UNIV_UNLIKELY(fil_page_get_type(page) == FIL_PAGE_TYPE_INSTANT)) {
1653 		/* Log the PAGE_INSTANT information. */
1654 		ut_ad(!page_zip);
1655 		ut_ad(index->is_instant());
1656 		ut_ad(!recovery);
1657 		mlog_write_ulint(FIL_PAGE_TYPE + page, FIL_PAGE_TYPE_INSTANT,
1658 				 MLOG_2BYTES, mtr);
1659 		mlog_write_ulint(PAGE_HEADER + PAGE_INSTANT + page,
1660 				 mach_read_from_2(PAGE_HEADER + PAGE_INSTANT
1661 						  + page),
1662 				 MLOG_2BYTES, mtr);
1663 	}
1664 
1665 	return(success);
1666 }
1667 
1668 /*************************************************************//**
1669 Reorganizes an index page.
1670 
1671 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1672 if this is a compressed leaf page in a secondary index. This has to
1673 be done either within the same mini-transaction, or by invoking
1674 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1675 IBUF_BITMAP_FREE is unaffected by reorganization.
1676 
1677 @retval true if the operation was successful
1678 @retval false if it is a compressed page, and recompression failed */
1679 bool
btr_page_reorganize_block(bool recovery,ulint z_level,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1680 btr_page_reorganize_block(
1681 /*======================*/
1682 	bool		recovery,/*!< in: true if called in recovery:
1683 				locks should not be updated, i.e.,
1684 				there cannot exist locks on the
1685 				page, and a hash index should not be
1686 				dropped: it cannot exist */
1687 	ulint		z_level,/*!< in: compression level to be used
1688 				if dealing with compressed page */
1689 	buf_block_t*	block,	/*!< in/out: B-tree page */
1690 	dict_index_t*	index,	/*!< in: the index tree of the page */
1691 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1692 {
1693 	page_cur_t	cur;
1694 	page_cur_set_before_first(block, &cur);
1695 
1696 	return(btr_page_reorganize_low(recovery, z_level, &cur, index, mtr));
1697 }
1698 
1699 /*************************************************************//**
1700 Reorganizes an index page.
1701 
1702 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1703 if this is a compressed leaf page in a secondary index. This has to
1704 be done either within the same mini-transaction, or by invoking
1705 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1706 IBUF_BITMAP_FREE is unaffected by reorganization.
1707 
1708 @retval true if the operation was successful
1709 @retval false if it is a compressed page, and recompression failed */
1710 bool
btr_page_reorganize(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1711 btr_page_reorganize(
1712 /*================*/
1713 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1714 	dict_index_t*	index,	/*!< in: the index tree of the page */
1715 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1716 {
1717 	return(btr_page_reorganize_low(false, page_zip_level,
1718 				       cursor, index, mtr));
1719 }
1720 
1721 /***********************************************************//**
1722 Parses a redo log record of reorganizing a page.
1723 @return end of log record or NULL */
1724 byte*
btr_parse_page_reorganize(byte * ptr,byte * end_ptr,dict_index_t * index,bool compressed,buf_block_t * block,mtr_t * mtr)1725 btr_parse_page_reorganize(
1726 /*======================*/
1727 	byte*		ptr,	/*!< in: buffer */
1728 	byte*		end_ptr,/*!< in: buffer end */
1729 	dict_index_t*	index,	/*!< in: record descriptor */
1730 	bool		compressed,/*!< in: true if compressed page */
1731 	buf_block_t*	block,	/*!< in: page to be reorganized, or NULL */
1732 	mtr_t*		mtr)	/*!< in: mtr or NULL */
1733 {
1734 	ulint	level = page_zip_level;
1735 
1736 	ut_ad(ptr != NULL);
1737 	ut_ad(end_ptr != NULL);
1738 	ut_ad(index != NULL);
1739 
1740 	/* If dealing with a compressed page the record has the
1741 	compression level used during original compression written in
1742 	one byte. Otherwise record is empty. */
1743 	if (compressed) {
1744 		if (ptr == end_ptr) {
1745 			return(NULL);
1746 		}
1747 
1748 		level = mach_read_from_1(ptr);
1749 
1750 		ut_a(level <= 9);
1751 		++ptr;
1752 	} else {
1753 		level = page_zip_level;
1754 	}
1755 
1756 	if (block != NULL) {
1757 		btr_page_reorganize_block(true, level, block, index, mtr);
1758 	}
1759 
1760 	return(ptr);
1761 }
1762 
1763 /** Empty an index page (possibly the root page). @see btr_page_create().
1764 @param[in,out]	block		page to be emptied
1765 @param[in,out]	page_zip	compressed page frame, or NULL
1766 @param[in]	index		index of the page
1767 @param[in]	level		B-tree level of the page (0=leaf)
1768 @param[in,out]	mtr		mini-transaction */
1769 void
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1770 btr_page_empty(
1771 	buf_block_t*	block,
1772 	page_zip_des_t*	page_zip,
1773 	dict_index_t*	index,
1774 	ulint		level,
1775 	mtr_t*		mtr)
1776 {
1777 	page_t*	page = buf_block_get_frame(block);
1778 
1779 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1780 	ut_ad(page_zip == buf_block_get_page_zip(block));
1781 	ut_ad(!index->is_dummy);
1782 	ut_ad(index->table->space->id == block->page.id.space());
1783 #ifdef UNIV_ZIP_DEBUG
1784 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1785 #endif /* UNIV_ZIP_DEBUG */
1786 
1787 	btr_search_drop_page_hash_index(block);
1788 
1789 	/* Recreate the page: note that global data on page (possible
1790 	segment headers, next page-field, etc.) is preserved intact */
1791 
1792 	/* Preserve PAGE_ROOT_AUTO_INC when creating a clustered index
1793 	root page. */
1794 	const ib_uint64_t	autoinc
1795 		= dict_index_is_clust(index)
1796 		&& index->page == block->page.id.page_no()
1797 		? page_get_autoinc(page)
1798 		: 0;
1799 
1800 	if (page_zip) {
1801 		page_create_zip(block, index, level, autoinc, NULL, mtr);
1802 	} else {
1803 		page_create(block, mtr, dict_table_is_comp(index->table),
1804 			    dict_index_is_spatial(index));
1805 		btr_page_set_level(page, NULL, level, mtr);
1806 		if (autoinc) {
1807 			mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID + page,
1808 				       autoinc, mtr);
1809 		}
1810 	}
1811 }
1812 
1813 /*************************************************************//**
1814 Makes tree one level higher by splitting the root, and inserts
1815 the tuple. It is assumed that mtr contains an x-latch on the tree.
1816 NOTE that the operation of this function must always succeed,
1817 we cannot reverse it: therefore enough free disk space must be
1818 guaranteed to be available before this function is called.
1819 @return inserted record */
1820 rec_t*
btr_root_raise_and_insert(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1821 btr_root_raise_and_insert(
1822 /*======================*/
1823 	ulint		flags,	/*!< in: undo logging and locking flags */
1824 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert: must be
1825 				on the root page; when the function returns,
1826 				the cursor is positioned on the predecessor
1827 				of the inserted record */
1828 	rec_offs**	offsets,/*!< out: offsets on inserted record */
1829 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1830 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
1831 	ulint		n_ext,	/*!< in: number of externally stored columns */
1832 	mtr_t*		mtr)	/*!< in: mtr */
1833 {
1834 	dict_index_t*	index;
1835 	page_t*		root;
1836 	page_t*		new_page;
1837 	ulint		new_page_no;
1838 	rec_t*		rec;
1839 	dtuple_t*	node_ptr;
1840 	ulint		level;
1841 	rec_t*		node_ptr_rec;
1842 	page_cur_t*	page_cursor;
1843 	page_zip_des_t*	root_page_zip;
1844 	page_zip_des_t*	new_page_zip;
1845 	buf_block_t*	root_block;
1846 	buf_block_t*	new_block;
1847 
1848 	root = btr_cur_get_page(cursor);
1849 	root_block = btr_cur_get_block(cursor);
1850 	root_page_zip = buf_block_get_page_zip(root_block);
1851 	ut_ad(!page_is_empty(root));
1852 	index = btr_cur_get_index(cursor);
1853 	ut_ad(index->n_core_null_bytes <= UT_BITS_IN_BYTES(index->n_nullable));
1854 #ifdef UNIV_ZIP_DEBUG
1855 	ut_a(!root_page_zip || page_zip_validate(root_page_zip, root, index));
1856 #endif /* UNIV_ZIP_DEBUG */
1857 #ifdef UNIV_BTR_DEBUG
1858 	if (!dict_index_is_ibuf(index)) {
1859 		ulint	space = index->table->space_id;
1860 
1861 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1862 					    + root, space));
1863 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1864 					    + root, space));
1865 	}
1866 
1867 	ut_a(dict_index_get_page(index) == page_get_page_no(root));
1868 #endif /* UNIV_BTR_DEBUG */
1869 	ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
1870 					MTR_MEMO_X_LOCK
1871 					| MTR_MEMO_SX_LOCK));
1872 	ut_ad(mtr_memo_contains(mtr, root_block, MTR_MEMO_PAGE_X_FIX));
1873 
1874 	/* Allocate a new page to the tree. Root splitting is done by first
1875 	moving the root records to the new page, emptying the root, putting
1876 	a node pointer to the new page, and then splitting the new page. */
1877 
1878 	level = btr_page_get_level(root);
1879 
1880 	new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1881 
1882 	if (new_block == NULL && os_has_said_disk_full) {
1883 		return(NULL);
1884 	}
1885 
1886 	new_page = buf_block_get_frame(new_block);
1887 	new_page_zip = buf_block_get_page_zip(new_block);
1888 	ut_a(!new_page_zip == !root_page_zip);
1889 	ut_a(!new_page_zip
1890 	     || page_zip_get_size(new_page_zip)
1891 	     == page_zip_get_size(root_page_zip));
1892 
1893 	btr_page_create(new_block, new_page_zip, index, level, mtr);
1894 
1895 	/* Set the next node and previous node fields of new page */
1896 	btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
1897 	btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
1898 
1899 	/* Copy the records from root to the new page one by one. */
1900 
1901 	if (0
1902 #ifdef UNIV_ZIP_COPY
1903 	    || new_page_zip
1904 #endif /* UNIV_ZIP_COPY */
1905 	    || !page_copy_rec_list_end(new_block, root_block,
1906 				       page_get_infimum_rec(root),
1907 				       index, mtr)) {
1908 		ut_a(new_page_zip);
1909 
1910 		/* Copy the page byte for byte. */
1911 		page_zip_copy_recs(new_page_zip, new_page,
1912 				   root_page_zip, root, index, mtr);
1913 
1914 		/* Update the lock table and possible hash index. */
1915 		lock_move_rec_list_end(new_block, root_block,
1916 				       page_get_infimum_rec(root));
1917 
1918 		/* Move any existing predicate locks */
1919 		if (dict_index_is_spatial(index)) {
1920 			lock_prdt_rec_move(new_block, root_block);
1921 		} else {
1922 			btr_search_move_or_delete_hash_entries(
1923 				new_block, root_block);
1924 		}
1925 	}
1926 
1927 	if (dict_index_is_sec_or_ibuf(index)) {
1928 		/* In secondary indexes and the change buffer,
1929 		PAGE_MAX_TRX_ID can be reset on the root page, because
1930 		the field only matters on leaf pages, and the root no
1931 		longer is a leaf page. (Older versions of InnoDB did
1932 		set PAGE_MAX_TRX_ID on all secondary index pages.) */
1933 		if (root_page_zip) {
1934 			byte* p = PAGE_HEADER + PAGE_MAX_TRX_ID + root;
1935 			memset(p, 0, 8);
1936 			page_zip_write_header(root_page_zip, p, 8, mtr);
1937 		} else {
1938 			mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID
1939 				       + root, 0, mtr);
1940 		}
1941 	} else {
1942 		/* PAGE_ROOT_AUTO_INC is only present in the clustered index
1943 		root page; on other clustered index pages, we want to reserve
1944 		the field PAGE_MAX_TRX_ID for future use. */
1945 		if (new_page_zip) {
1946 			byte* p = PAGE_HEADER + PAGE_MAX_TRX_ID + new_page;
1947 			memset(p, 0, 8);
1948 			page_zip_write_header(new_page_zip, p, 8, mtr);
1949 		} else {
1950 			mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID
1951 				       + new_page, 0, mtr);
1952 		}
1953 	}
1954 
1955 	/* If this is a pessimistic insert which is actually done to
1956 	perform a pessimistic update then we have stored the lock
1957 	information of the record to be inserted on the infimum of the
1958 	root page: we cannot discard the lock structs on the root page */
1959 
1960 	if (!dict_table_is_locking_disabled(index->table)) {
1961 		lock_update_root_raise(new_block, root_block);
1962 	}
1963 
1964 	/* Create a memory heap where the node pointer is stored */
1965 	if (!*heap) {
1966 		*heap = mem_heap_create(1000);
1967 	}
1968 
1969 	rec = page_rec_get_next(page_get_infimum_rec(new_page));
1970 	new_page_no = new_block->page.id.page_no();
1971 
1972 	/* Build the node pointer (= node key and page address) for the
1973 	child */
1974 	if (dict_index_is_spatial(index)) {
1975 		rtr_mbr_t		new_mbr;
1976 
1977 		rtr_page_cal_mbr(index, new_block, &new_mbr, *heap);
1978 		node_ptr = rtr_index_build_node_ptr(
1979 			index, &new_mbr, rec, new_page_no, *heap);
1980 	} else {
1981 		node_ptr = dict_index_build_node_ptr(
1982 			index, rec, new_page_no, *heap, level);
1983 	}
1984 	/* The node pointer must be marked as the predefined minimum record,
1985 	as there is no lower alphabetical limit to records in the leftmost
1986 	node of a level: */
1987 	dtuple_set_info_bits(node_ptr,
1988 			     dtuple_get_info_bits(node_ptr)
1989 			     | REC_INFO_MIN_REC_FLAG);
1990 
1991 	/* Rebuild the root page to get free space */
1992 	btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
1993 	/* btr_page_empty() is supposed to zero-initialize the field. */
1994 	ut_ad(!page_get_instant(root_block->frame));
1995 
1996 	if (index->is_instant()) {
1997 		ut_ad(!root_page_zip);
1998 		byte* page_type = root_block->frame + FIL_PAGE_TYPE;
1999 		ut_ad(mach_read_from_2(page_type) == FIL_PAGE_INDEX);
2000 		mlog_write_ulint(page_type, FIL_PAGE_TYPE_INSTANT,
2001 				 MLOG_2BYTES, mtr);
2002 		page_set_instant(root_block->frame, index->n_core_fields, mtr);
2003 	}
2004 
2005 	ut_ad(!page_has_siblings(root));
2006 
2007 	page_cursor = btr_cur_get_page_cur(cursor);
2008 
2009 	/* Insert node pointer to the root */
2010 
2011 	page_cur_set_before_first(root_block, page_cursor);
2012 
2013 	node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
2014 					     index, offsets, heap, 0, mtr);
2015 
2016 	/* The root page should only contain the node pointer
2017 	to new_page at this point.  Thus, the data should fit. */
2018 	ut_a(node_ptr_rec);
2019 
2020 	/* We play safe and reset the free bits for the new page */
2021 
2022 	if (!dict_index_is_clust(index)
2023 	    && !index->table->is_temporary()) {
2024 		ibuf_reset_free_bits(new_block);
2025 	}
2026 
2027 	if (tuple != NULL) {
2028 		/* Reposition the cursor to the child node */
2029 		page_cur_search(new_block, index, tuple, page_cursor);
2030 	} else {
2031 		/* Set cursor to first record on child node */
2032 		page_cur_set_before_first(new_block, page_cursor);
2033 	}
2034 
2035 	/* Split the child and insert tuple */
2036 	if (dict_index_is_spatial(index)) {
2037 		/* Split rtree page and insert tuple */
2038 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2039 						 tuple, n_ext, mtr));
2040 	} else {
2041 		return(btr_page_split_and_insert(flags, cursor, offsets, heap,
2042 						 tuple, n_ext, mtr));
2043 	}
2044 }
2045 
2046 /** Decide if the page should be split at the convergence point of inserts
2047 converging to the left.
2048 @param[in]	cursor	insert position
2049 @return the first record to be moved to the right half page
2050 @retval	NULL if no split is recommended */
btr_page_get_split_rec_to_left(const btr_cur_t * cursor)2051 rec_t* btr_page_get_split_rec_to_left(const btr_cur_t* cursor)
2052 {
2053 	rec_t* split_rec = btr_cur_get_rec(cursor);
2054 	const page_t* page = page_align(split_rec);
2055 
2056 	if (page_header_get_ptr(page, PAGE_LAST_INSERT)
2057 	    != page_rec_get_next(split_rec)) {
2058 		return NULL;
2059 	}
2060 
2061 	/* The metadata record must be present in the leftmost leaf page
2062 	of the clustered index, if and only if index->is_instant().
2063 	However, during innobase_instant_try(), index->is_instant()
2064 	would already hold when row_ins_clust_index_entry_low()
2065 	is being invoked to insert the the metadata record.
2066 	So, we can only assert that when the metadata record exists,
2067 	index->is_instant() must hold. */
2068 	ut_ad(!page_is_leaf(page) || page_has_prev(page)
2069 	      || cursor->index->is_instant()
2070 	      || !(rec_get_info_bits(page_rec_get_next_const(
2071 					     page_get_infimum_rec(page)),
2072 				     dict_table_is_comp(cursor->index->table))
2073 		   & REC_INFO_MIN_REC_FLAG));
2074 
2075 	const rec_t* infimum = page_get_infimum_rec(page);
2076 
2077 	/* If the convergence is in the middle of a page, include also
2078 	the record immediately before the new insert to the upper
2079 	page. Otherwise, we could repeatedly move from page to page
2080 	lots of records smaller than the convergence point. */
2081 
2082 	if (split_rec == infimum
2083 	    || split_rec == page_rec_get_next_const(infimum)) {
2084 		split_rec = page_rec_get_next(split_rec);
2085 	}
2086 
2087 	return split_rec;
2088 }
2089 
2090 /** Decide if the page should be split at the convergence point of inserts
2091 converging to the right.
2092 @param[in]	cursor		insert position
2093 @param[out]	split_rec	if split recommended, the first record
2094 				on the right half page, or
2095 				NULL if the to-be-inserted record
2096 				should be first
2097 @return whether split is recommended */
2098 bool
btr_page_get_split_rec_to_right(const btr_cur_t * cursor,rec_t ** split_rec)2099 btr_page_get_split_rec_to_right(const btr_cur_t* cursor, rec_t** split_rec)
2100 {
2101 	rec_t* insert_point = btr_cur_get_rec(cursor);
2102 	const page_t* page = page_align(insert_point);
2103 
2104 	/* We use eager heuristics: if the new insert would be right after
2105 	the previous insert on the same page, we assume that there is a
2106 	pattern of sequential inserts here. */
2107 
2108 	if (page_header_get_ptr(page, PAGE_LAST_INSERT) != insert_point) {
2109 		return false;
2110 	}
2111 
2112 	insert_point = page_rec_get_next(insert_point);
2113 
2114 	if (page_rec_is_supremum(insert_point)) {
2115 		insert_point = NULL;
2116 	} else {
2117 		insert_point = page_rec_get_next(insert_point);
2118 		if (page_rec_is_supremum(insert_point)) {
2119 			insert_point = NULL;
2120 		}
2121 
2122 		/* If there are >= 2 user records up from the insert
2123 		point, split all but 1 off. We want to keep one because
2124 		then sequential inserts can use the adaptive hash
2125 		index, as they can do the necessary checks of the right
2126 		search position just by looking at the records on this
2127 		page. */
2128 	}
2129 
2130 	*split_rec = insert_point;
2131 	return true;
2132 }
2133 
2134 /*************************************************************//**
2135 Calculates a split record such that the tuple will certainly fit on
2136 its half-page when the split is performed. We assume in this function
2137 only that the cursor page has at least one user record.
2138 @return split record, or NULL if tuple will be the first record on
2139 the lower or upper half-page (determined by btr_page_tuple_smaller()) */
2140 static
2141 rec_t*
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext)2142 btr_page_get_split_rec(
2143 /*===================*/
2144 	btr_cur_t*	cursor,	/*!< in: cursor at which insert should be made */
2145 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2146 	ulint		n_ext)	/*!< in: number of externally stored columns */
2147 {
2148 	page_t*		page;
2149 	page_zip_des_t*	page_zip;
2150 	ulint		insert_size;
2151 	ulint		free_space;
2152 	ulint		total_data;
2153 	ulint		total_n_recs;
2154 	ulint		total_space;
2155 	ulint		incl_data;
2156 	rec_t*		ins_rec;
2157 	rec_t*		rec;
2158 	rec_t*		next_rec;
2159 	ulint		n;
2160 	mem_heap_t*	heap;
2161 	rec_offs*	offsets;
2162 
2163 	page = btr_cur_get_page(cursor);
2164 
2165 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2166 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2167 
2168 	page_zip = btr_cur_get_page_zip(cursor);
2169 	if (page_zip) {
2170 		/* Estimate the free space of an empty compressed page. */
2171 		ulint	free_space_zip = page_zip_empty_size(
2172 			cursor->index->n_fields,
2173 			page_zip_get_size(page_zip));
2174 
2175 		if (free_space > (ulint) free_space_zip) {
2176 			free_space = (ulint) free_space_zip;
2177 		}
2178 	}
2179 
2180 	/* free_space is now the free space of a created new page */
2181 
2182 	total_data   = page_get_data_size(page) + insert_size;
2183 	total_n_recs = ulint(page_get_n_recs(page)) + 1;
2184 	ut_ad(total_n_recs >= 2);
2185 	total_space  = total_data + page_dir_calc_reserved_space(total_n_recs);
2186 
2187 	n = 0;
2188 	incl_data = 0;
2189 	ins_rec = btr_cur_get_rec(cursor);
2190 	rec = page_get_infimum_rec(page);
2191 
2192 	heap = NULL;
2193 	offsets = NULL;
2194 
2195 	/* We start to include records to the left half, and when the
2196 	space reserved by them exceeds half of total_space, then if
2197 	the included records fit on the left page, they will be put there
2198 	if something was left over also for the right page,
2199 	otherwise the last included record will be the first on the right
2200 	half page */
2201 
2202 	do {
2203 		/* Decide the next record to include */
2204 		if (rec == ins_rec) {
2205 			rec = NULL;	/* NULL denotes that tuple is
2206 					now included */
2207 		} else if (rec == NULL) {
2208 			rec = page_rec_get_next(ins_rec);
2209 		} else {
2210 			rec = page_rec_get_next(rec);
2211 		}
2212 
2213 		if (rec == NULL) {
2214 			/* Include tuple */
2215 			incl_data += insert_size;
2216 		} else {
2217 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2218 						  page_is_leaf(page)
2219 						  ? cursor->index->n_core_fields
2220 						  : 0,
2221 						  ULINT_UNDEFINED, &heap);
2222 			incl_data += rec_offs_size(offsets);
2223 		}
2224 
2225 		n++;
2226 	} while (incl_data + page_dir_calc_reserved_space(n)
2227 		 < total_space / 2);
2228 
2229 	if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
2230 		/* The next record will be the first on
2231 		the right half page if it is not the
2232 		supremum record of page */
2233 
2234 		if (rec == ins_rec) {
2235 			rec = NULL;
2236 
2237 			goto func_exit;
2238 		} else if (rec == NULL) {
2239 			next_rec = page_rec_get_next(ins_rec);
2240 		} else {
2241 			next_rec = page_rec_get_next(rec);
2242 		}
2243 		ut_ad(next_rec);
2244 		if (!page_rec_is_supremum(next_rec)) {
2245 			rec = next_rec;
2246 		}
2247 	}
2248 
2249 func_exit:
2250 	if (heap) {
2251 		mem_heap_free(heap);
2252 	}
2253 	return(rec);
2254 }
2255 
2256 /*************************************************************//**
2257 Returns TRUE if the insert fits on the appropriate half-page with the
2258 chosen split_rec.
2259 @return true if fits */
2260 static MY_ATTRIBUTE((nonnull(1,3,4,6), warn_unused_result))
2261 bool
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,rec_offs ** offsets,const dtuple_t * tuple,ulint n_ext,mem_heap_t ** heap)2262 btr_page_insert_fits(
2263 /*=================*/
2264 	btr_cur_t*	cursor,	/*!< in: cursor at which insert
2265 				should be made */
2266 	const rec_t*	split_rec,/*!< in: suggestion for first record
2267 				on upper half-page, or NULL if
2268 				tuple to be inserted should be first */
2269 	rec_offs**	offsets,/*!< in: rec_get_offsets(
2270 				split_rec, cursor->index); out: garbage */
2271 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2272 	ulint		n_ext,	/*!< in: number of externally stored columns */
2273 	mem_heap_t**	heap)	/*!< in: temporary memory heap */
2274 {
2275 	page_t*		page;
2276 	ulint		insert_size;
2277 	ulint		free_space;
2278 	ulint		total_data;
2279 	ulint		total_n_recs;
2280 	const rec_t*	rec;
2281 	const rec_t*	end_rec;
2282 
2283 	page = btr_cur_get_page(cursor);
2284 
2285 	ut_ad(!split_rec
2286 	      || !page_is_comp(page) == !rec_offs_comp(*offsets));
2287 	ut_ad(!split_rec
2288 	      || rec_offs_validate(split_rec, cursor->index, *offsets));
2289 
2290 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2291 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2292 
2293 	/* free_space is now the free space of a created new page */
2294 
2295 	total_data   = page_get_data_size(page) + insert_size;
2296 	total_n_recs = ulint(page_get_n_recs(page)) + 1;
2297 
2298 	/* We determine which records (from rec to end_rec, not including
2299 	end_rec) will end up on the other half page from tuple when it is
2300 	inserted. */
2301 
2302 	if (split_rec == NULL) {
2303 		rec = page_rec_get_next(page_get_infimum_rec(page));
2304 		end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
2305 
2306 	} else if (cmp_dtuple_rec(tuple, split_rec, *offsets) >= 0) {
2307 
2308 		rec = page_rec_get_next(page_get_infimum_rec(page));
2309 		end_rec = split_rec;
2310 	} else {
2311 		rec = split_rec;
2312 		end_rec = page_get_supremum_rec(page);
2313 	}
2314 
2315 	if (total_data + page_dir_calc_reserved_space(total_n_recs)
2316 	    <= free_space) {
2317 
2318 		/* Ok, there will be enough available space on the
2319 		half page where the tuple is inserted */
2320 
2321 		return(true);
2322 	}
2323 
2324 	while (rec != end_rec) {
2325 		/* In this loop we calculate the amount of reserved
2326 		space after rec is removed from page. */
2327 
2328 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2329 					   page_is_leaf(page)
2330 					   ? cursor->index->n_core_fields
2331 					   : 0,
2332 					   ULINT_UNDEFINED, heap);
2333 
2334 		total_data -= rec_offs_size(*offsets);
2335 		total_n_recs--;
2336 
2337 		if (total_data + page_dir_calc_reserved_space(total_n_recs)
2338 		    <= free_space) {
2339 
2340 			/* Ok, there will be enough available space on the
2341 			half page where the tuple is inserted */
2342 
2343 			return(true);
2344 		}
2345 
2346 		rec = page_rec_get_next_const(rec);
2347 	}
2348 
2349 	return(false);
2350 }
2351 
2352 /*******************************************************//**
2353 Inserts a data tuple to a tree on a non-leaf level. It is assumed
2354 that mtr holds an x-latch on the tree. */
2355 void
btr_insert_on_non_leaf_level_func(ulint flags,dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,unsigned line,mtr_t * mtr)2356 btr_insert_on_non_leaf_level_func(
2357 /*==============================*/
2358 	ulint		flags,	/*!< in: undo logging and locking flags */
2359 	dict_index_t*	index,	/*!< in: index */
2360 	ulint		level,	/*!< in: level, must be > 0 */
2361 	dtuple_t*	tuple,	/*!< in: the record to be inserted */
2362 	const char*	file,	/*!< in: file name */
2363 	unsigned	line,	/*!< in: line where called */
2364 	mtr_t*		mtr)	/*!< in: mtr */
2365 {
2366 	big_rec_t*	dummy_big_rec;
2367 	btr_cur_t	cursor;
2368 	dberr_t		err;
2369 	rec_t*		rec;
2370 	mem_heap_t*	heap = NULL;
2371 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
2372 	rec_offs*	offsets         = offsets_;
2373 	rec_offs_init(offsets_);
2374 	rtr_info_t	rtr_info;
2375 
2376 	ut_ad(level > 0);
2377 
2378 	if (!dict_index_is_spatial(index)) {
2379 		dberr_t err = btr_cur_search_to_nth_level(
2380 			index, level, tuple, PAGE_CUR_LE,
2381 			BTR_CONT_MODIFY_TREE,
2382 			&cursor, 0, file, line, mtr);
2383 
2384 		if (err != DB_SUCCESS) {
2385 			ib::warn() << " Error code: " << err
2386 				   << " btr_page_get_father_node_ptr_func "
2387 				   << " level: " << level
2388 				   << " called from file: "
2389 				   << file << " line: " << line
2390 				   << " table: " << index->table->name
2391 				   << " index: " << index->name;
2392 		}
2393 	} else {
2394 		/* For spatial index, initialize structures to track
2395 		its parents etc. */
2396 		rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2397 
2398 		rtr_info_update_btr(&cursor, &rtr_info);
2399 
2400 		btr_cur_search_to_nth_level(index, level, tuple,
2401 					    PAGE_CUR_RTREE_INSERT,
2402 					    BTR_CONT_MODIFY_TREE,
2403 					    &cursor, 0, file, line, mtr);
2404 	}
2405 
2406 	ut_ad(cursor.flag == BTR_CUR_BINARY);
2407 
2408 	err = btr_cur_optimistic_insert(
2409 		flags
2410 		| BTR_NO_LOCKING_FLAG
2411 		| BTR_KEEP_SYS_FLAG
2412 		| BTR_NO_UNDO_LOG_FLAG,
2413 		&cursor, &offsets, &heap,
2414 		tuple, &rec, &dummy_big_rec, 0, NULL, mtr);
2415 
2416 	if (err == DB_FAIL) {
2417 		err = btr_cur_pessimistic_insert(flags
2418 						 | BTR_NO_LOCKING_FLAG
2419 						 | BTR_KEEP_SYS_FLAG
2420 						 | BTR_NO_UNDO_LOG_FLAG,
2421 						 &cursor, &offsets, &heap,
2422 						 tuple, &rec,
2423 						 &dummy_big_rec, 0, NULL, mtr);
2424 		ut_a(err == DB_SUCCESS);
2425 	}
2426 
2427 	if (heap != NULL) {
2428 		mem_heap_free(heap);
2429 	}
2430 
2431 	if (dict_index_is_spatial(index)) {
2432 		ut_ad(cursor.rtr_info);
2433 
2434 		rtr_clean_rtr_info(&rtr_info, true);
2435 	}
2436 }
2437 
2438 /**************************************************************//**
2439 Attaches the halves of an index page on the appropriate level in an
2440 index tree. */
2441 static MY_ATTRIBUTE((nonnull))
2442 void
btr_attach_half_pages(ulint flags,dict_index_t * index,buf_block_t * block,const rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2443 btr_attach_half_pages(
2444 /*==================*/
2445 	ulint		flags,		/*!< in: undo logging and
2446 					locking flags */
2447 	dict_index_t*	index,		/*!< in: the index tree */
2448 	buf_block_t*	block,		/*!< in/out: page to be split */
2449 	const rec_t*	split_rec,	/*!< in: first record on upper
2450 					half page */
2451 	buf_block_t*	new_block,	/*!< in/out: the new half page */
2452 	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
2453 	mtr_t*		mtr)		/*!< in: mtr */
2454 {
2455 	ulint		prev_page_no;
2456 	ulint		next_page_no;
2457 	ulint		level;
2458 	page_t*		page		= buf_block_get_frame(block);
2459 	page_t*		lower_page;
2460 	page_t*		upper_page;
2461 	ulint		lower_page_no;
2462 	ulint		upper_page_no;
2463 	page_zip_des_t*	lower_page_zip;
2464 	page_zip_des_t*	upper_page_zip;
2465 	dtuple_t*	node_ptr_upper;
2466 	mem_heap_t*	heap;
2467 	buf_block_t*	prev_block = NULL;
2468 	buf_block_t*	next_block = NULL;
2469 
2470 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2471 	ut_ad(mtr_memo_contains(mtr, new_block, MTR_MEMO_PAGE_X_FIX));
2472 
2473 	/* Create a memory heap where the data tuple is stored */
2474 	heap = mem_heap_create(1024);
2475 
2476 	/* Based on split direction, decide upper and lower pages */
2477 	if (direction == FSP_DOWN) {
2478 
2479 		btr_cur_t	cursor;
2480 		rec_offs*	offsets;
2481 
2482 		lower_page = buf_block_get_frame(new_block);
2483 		lower_page_no = new_block->page.id.page_no();
2484 		lower_page_zip = buf_block_get_page_zip(new_block);
2485 		upper_page = buf_block_get_frame(block);
2486 		upper_page_no = block->page.id.page_no();
2487 		upper_page_zip = buf_block_get_page_zip(block);
2488 
2489 		/* Look up the index for the node pointer to page */
2490 		offsets = btr_page_get_father_block(NULL, heap, index,
2491 						    block, mtr, &cursor);
2492 
2493 		/* Replace the address of the old child node (= page) with the
2494 		address of the new lower half */
2495 
2496 		btr_node_ptr_set_child_page_no(
2497 			btr_cur_get_rec(&cursor),
2498 			btr_cur_get_page_zip(&cursor),
2499 			offsets, lower_page_no, mtr);
2500 		mem_heap_empty(heap);
2501 	} else {
2502 		lower_page = buf_block_get_frame(block);
2503 		lower_page_no = block->page.id.page_no();
2504 		lower_page_zip = buf_block_get_page_zip(block);
2505 		upper_page = buf_block_get_frame(new_block);
2506 		upper_page_no = new_block->page.id.page_no();
2507 		upper_page_zip = buf_block_get_page_zip(new_block);
2508 	}
2509 
2510 	/* Get the previous and next pages of page */
2511 	prev_page_no = btr_page_get_prev(page);
2512 	next_page_no = btr_page_get_next(page);
2513 
2514 	const ulint	space = block->page.id.space();
2515 
2516 	/* for consistency, both blocks should be locked, before change */
2517 	if (prev_page_no != FIL_NULL && direction == FSP_DOWN) {
2518 		prev_block = btr_block_get(
2519 			page_id_t(space, prev_page_no), block->page.size,
2520 			RW_X_LATCH, index, mtr);
2521 	}
2522 	if (next_page_no != FIL_NULL && direction != FSP_DOWN) {
2523 		next_block = btr_block_get(
2524 			page_id_t(space, next_page_no), block->page.size,
2525 			RW_X_LATCH, index, mtr);
2526 	}
2527 
2528 	/* Get the level of the split pages */
2529 	level = btr_page_get_level(buf_block_get_frame(block));
2530 	ut_ad(level == btr_page_get_level(buf_block_get_frame(new_block)));
2531 
2532 	/* Build the node pointer (= node key and page address) for the upper
2533 	half */
2534 
2535 	node_ptr_upper = dict_index_build_node_ptr(index, split_rec,
2536 						   upper_page_no, heap, level);
2537 
2538 	/* Insert it next to the pointer to the lower half. Note that this
2539 	may generate recursion leading to a split on the higher level. */
2540 
2541 	btr_insert_on_non_leaf_level(flags, index, level + 1,
2542 				     node_ptr_upper, mtr);
2543 
2544 	/* Free the memory heap */
2545 	mem_heap_free(heap);
2546 
2547 	/* Update page links of the level */
2548 
2549 	if (prev_block) {
2550 #ifdef UNIV_BTR_DEBUG
2551 		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
2552 		ut_a(btr_page_get_next(prev_block->frame)
2553 		     == block->page.id.page_no());
2554 #endif /* UNIV_BTR_DEBUG */
2555 
2556 		btr_page_set_next(buf_block_get_frame(prev_block),
2557 				  buf_block_get_page_zip(prev_block),
2558 				  lower_page_no, mtr);
2559 	}
2560 
2561 	if (next_block) {
2562 #ifdef UNIV_BTR_DEBUG
2563 		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
2564 		ut_a(btr_page_get_prev(next_block->frame)
2565 		     == page_get_page_no(page));
2566 #endif /* UNIV_BTR_DEBUG */
2567 
2568 		btr_page_set_prev(buf_block_get_frame(next_block),
2569 				  buf_block_get_page_zip(next_block),
2570 				  upper_page_no, mtr);
2571 	}
2572 
2573 	if (direction == FSP_DOWN) {
2574 		/* lower_page is new */
2575 		btr_page_set_prev(lower_page, lower_page_zip,
2576 				  prev_page_no, mtr);
2577 	} else {
2578 		ut_ad(btr_page_get_prev(lower_page) == prev_page_no);
2579 	}
2580 
2581 	btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
2582 	btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
2583 
2584 	if (direction != FSP_DOWN) {
2585 		/* upper_page is new */
2586 		btr_page_set_next(upper_page, upper_page_zip,
2587 				  next_page_no, mtr);
2588 	} else {
2589 		ut_ad(btr_page_get_next(upper_page) == next_page_no);
2590 	}
2591 }
2592 
2593 /*************************************************************//**
2594 Determine if a tuple is smaller than any record on the page.
2595 @return TRUE if smaller */
2596 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2597 bool
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,rec_offs ** offsets,ulint n_uniq,mem_heap_t ** heap)2598 btr_page_tuple_smaller(
2599 /*===================*/
2600 	btr_cur_t*	cursor,	/*!< in: b-tree cursor */
2601 	const dtuple_t*	tuple,	/*!< in: tuple to consider */
2602 	rec_offs**	offsets,/*!< in/out: temporary storage */
2603 	ulint		n_uniq,	/*!< in: number of unique fields
2604 				in the index page records */
2605 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2606 {
2607 	buf_block_t*	block;
2608 	const rec_t*	first_rec;
2609 	page_cur_t	pcur;
2610 
2611 	/* Read the first user record in the page. */
2612 	block = btr_cur_get_block(cursor);
2613 	page_cur_set_before_first(block, &pcur);
2614 	page_cur_move_to_next(&pcur);
2615 	first_rec = page_cur_get_rec(&pcur);
2616 
2617 	*offsets = rec_get_offsets(
2618 		first_rec, cursor->index, *offsets,
2619 		page_is_leaf(block->frame) ? cursor->index->n_core_fields : 0,
2620 		n_uniq, heap);
2621 
2622 	return(cmp_dtuple_rec(tuple, first_rec, *offsets) < 0);
2623 }
2624 
2625 /** Insert the tuple into the right sibling page, if the cursor is at the end
2626 of a page.
2627 @param[in]	flags	undo logging and locking flags
2628 @param[in,out]	cursor	cursor at which to insert; when the function succeeds,
2629 			the cursor is positioned before the insert point.
2630 @param[out]	offsets	offsets on inserted record
2631 @param[in,out]	heap	memory heap for allocating offsets
2632 @param[in]	tuple	tuple to insert
2633 @param[in]	n_ext	number of externally stored columns
2634 @param[in,out]	mtr	mini-transaction
2635 @return	inserted record (first record on the right sibling page);
2636 	the cursor will be positioned on the page infimum
2637 @retval	NULL if the operation was not performed */
2638 static
2639 rec_t*
btr_insert_into_right_sibling(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t * heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2640 btr_insert_into_right_sibling(
2641 	ulint		flags,
2642 	btr_cur_t*	cursor,
2643 	rec_offs**	offsets,
2644 	mem_heap_t*	heap,
2645 	const dtuple_t*	tuple,
2646 	ulint		n_ext,
2647 	mtr_t*		mtr)
2648 {
2649 	buf_block_t*	block = btr_cur_get_block(cursor);
2650 	page_t*		page = buf_block_get_frame(block);
2651 	const uint32_t	next_page_no = btr_page_get_next(page);
2652 
2653 	ut_ad(mtr_memo_contains_flagged(
2654 		      mtr, dict_index_get_lock(cursor->index),
2655 		      MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2656 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2657 	ut_ad(heap);
2658 
2659 	if (next_page_no == FIL_NULL || !page_rec_is_supremum(
2660 			page_rec_get_next(btr_cur_get_rec(cursor)))) {
2661 
2662 		return(NULL);
2663 	}
2664 
2665 	page_cur_t	next_page_cursor;
2666 	buf_block_t*	next_block;
2667 	page_t*		next_page;
2668 	btr_cur_t	next_father_cursor;
2669 	rec_t*		rec = NULL;
2670 	ulint		max_size;
2671 
2672 	const ulint	space = block->page.id.space();
2673 
2674 	next_block = btr_block_get(
2675 		page_id_t(space, next_page_no), block->page.size,
2676 		RW_X_LATCH, cursor->index, mtr);
2677 	next_page = buf_block_get_frame(next_block);
2678 
2679 	bool	is_leaf = page_is_leaf(next_page);
2680 
2681 	btr_page_get_father(
2682 		cursor->index, next_block, mtr, &next_father_cursor);
2683 
2684 	page_cur_search(
2685 		next_block, cursor->index, tuple, PAGE_CUR_LE,
2686 		&next_page_cursor);
2687 
2688 	max_size = page_get_max_insert_size_after_reorganize(next_page, 1);
2689 
2690 	/* Extends gap lock for the next page */
2691 	if (!dict_table_is_locking_disabled(cursor->index->table)) {
2692 		lock_update_split_left(next_block, block);
2693 	}
2694 
2695 	rec = page_cur_tuple_insert(
2696 		&next_page_cursor, tuple, cursor->index, offsets, &heap,
2697 		n_ext, mtr);
2698 
2699 	if (rec == NULL) {
2700 		if (is_leaf
2701 		    && next_block->page.size.is_compressed()
2702 		    && !dict_index_is_clust(cursor->index)
2703 		    && !cursor->index->table->is_temporary()) {
2704 			/* Reset the IBUF_BITMAP_FREE bits, because
2705 			page_cur_tuple_insert() will have attempted page
2706 			reorganize before failing. */
2707 			ibuf_reset_free_bits(next_block);
2708 		}
2709 		return(NULL);
2710 	}
2711 
2712 	ibool	compressed;
2713 	dberr_t	err;
2714 	ulint	level = btr_page_get_level(next_page);
2715 
2716 	/* adjust cursor position */
2717 	*btr_cur_get_page_cur(cursor) = next_page_cursor;
2718 
2719 	ut_ad(btr_cur_get_rec(cursor) == page_get_infimum_rec(next_page));
2720 	ut_ad(page_rec_get_next(page_get_infimum_rec(next_page)) == rec);
2721 
2722 	/* We have to change the parent node pointer */
2723 
2724 	compressed = btr_cur_pessimistic_delete(
2725 		&err, TRUE, &next_father_cursor,
2726 		BTR_CREATE_FLAG, false, mtr);
2727 
2728 	ut_a(err == DB_SUCCESS);
2729 
2730 	if (!compressed) {
2731 		btr_cur_compress_if_useful(&next_father_cursor, FALSE, mtr);
2732 	}
2733 
2734 	dtuple_t*	node_ptr = dict_index_build_node_ptr(
2735 		cursor->index, rec, next_block->page.id.page_no(),
2736 		heap, level);
2737 
2738 	btr_insert_on_non_leaf_level(
2739 		flags, cursor->index, level + 1, node_ptr, mtr);
2740 
2741 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2742 
2743 	if (is_leaf
2744 	    && !dict_index_is_clust(cursor->index)
2745 	    && !cursor->index->table->is_temporary()) {
2746 		/* Update the free bits of the B-tree page in the
2747 		insert buffer bitmap. */
2748 
2749 		if (next_block->page.size.is_compressed()) {
2750 			ibuf_update_free_bits_zip(next_block, mtr);
2751 		} else {
2752 			ibuf_update_free_bits_if_full(
2753 				next_block, max_size,
2754 				rec_offs_size(*offsets) + PAGE_DIR_SLOT_SIZE);
2755 		}
2756 	}
2757 
2758 	return(rec);
2759 }
2760 
2761 /*************************************************************//**
2762 Splits an index page to halves and inserts the tuple. It is assumed
2763 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2764 released within this function! NOTE that the operation of this
2765 function must always succeed, we cannot reverse it: therefore enough
2766 free disk space (2 pages) must be guaranteed to be available before
2767 this function is called.
2768 NOTE: jonaso added support for calling function with tuple == NULL
2769 which cause it to only split a page.
2770 
2771 @return inserted record or NULL if run out of space */
2772 rec_t*
btr_page_split_and_insert(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2773 btr_page_split_and_insert(
2774 /*======================*/
2775 	ulint		flags,	/*!< in: undo logging and locking flags */
2776 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
2777 				function returns, the cursor is positioned
2778 				on the predecessor of the inserted record */
2779 	rec_offs**	offsets,/*!< out: offsets on inserted record */
2780 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
2781 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2782 	ulint		n_ext,	/*!< in: number of externally stored columns */
2783 	mtr_t*		mtr)	/*!< in: mtr */
2784 {
2785 	buf_block_t*	block;
2786 	page_t*		page;
2787 	page_zip_des_t*	page_zip;
2788 	buf_block_t*	new_block;
2789 	page_t*		new_page;
2790 	page_zip_des_t*	new_page_zip;
2791 	rec_t*		split_rec;
2792 	buf_block_t*	left_block;
2793 	buf_block_t*	right_block;
2794 	page_cur_t*	page_cursor;
2795 	rec_t*		first_rec;
2796 	byte*		buf = 0; /* remove warning */
2797 	rec_t*		move_limit;
2798 	ulint		n_iterations = 0;
2799 	ulint		n_uniq;
2800 
2801 	if (cursor->index->is_spatial()) {
2802 		/* Split rtree page and update parent */
2803 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2804 						 tuple, n_ext, mtr));
2805 	}
2806 
2807 	if (!*heap) {
2808 		*heap = mem_heap_create(1024);
2809 	}
2810 	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
2811 func_start:
2812 	mem_heap_empty(*heap);
2813 	*offsets = NULL;
2814 
2815 	ut_ad(mtr_memo_contains_flagged(mtr,
2816 					dict_index_get_lock(cursor->index),
2817 					MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2818 	ut_ad(!dict_index_is_online_ddl(cursor->index)
2819 	      || (flags & BTR_CREATE_FLAG)
2820 	      || dict_index_is_clust(cursor->index));
2821 	ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
2822 				  RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
2823 
2824 	block = btr_cur_get_block(cursor);
2825 	page = buf_block_get_frame(block);
2826 	page_zip = buf_block_get_page_zip(block);
2827 
2828 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2829 	ut_ad(!page_is_empty(page));
2830 
2831 	/* try to insert to the next page if possible before split */
2832 	if (rec_t* rec = btr_insert_into_right_sibling(
2833 		    flags, cursor, offsets, *heap, tuple, n_ext, mtr)) {
2834 		return(rec);
2835 	}
2836 
2837 	/* 1. Decide the split record; split_rec == NULL means that the
2838 	tuple to be inserted should be the first record on the upper
2839 	half-page */
2840 	bool insert_left = false;
2841 	ulint hint_page_no = block->page.id.page_no() + 1;
2842 	byte direction = FSP_UP;
2843 
2844 	if (tuple && n_iterations > 0) {
2845 		split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
2846 
2847 		if (split_rec == NULL) {
2848 			insert_left = btr_page_tuple_smaller(
2849 				cursor, tuple, offsets, n_uniq, heap);
2850 		}
2851 	} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
2852 	} else if ((split_rec = btr_page_get_split_rec_to_left(cursor))) {
2853 		direction = FSP_DOWN;
2854 		hint_page_no -= 2;
2855 	} else {
2856 		/* If there is only one record in the index page, we
2857 		can't split the node in the middle by default. We need
2858 		to determine whether the new record will be inserted
2859 		to the left or right. */
2860 
2861 		if (page_get_n_recs(page) > 1) {
2862 			split_rec = page_get_middle_rec(page);
2863 		} else if (btr_page_tuple_smaller(cursor, tuple,
2864 						  offsets, n_uniq, heap)) {
2865 			split_rec = page_rec_get_next(
2866 				page_get_infimum_rec(page));
2867 		} else {
2868 			split_rec = NULL;
2869 		}
2870 	}
2871 
2872 	DBUG_EXECUTE_IF("disk_is_full",
2873 			os_has_said_disk_full = true;
2874 			return(NULL););
2875 
2876 	/* 2. Allocate a new page to the index */
2877 	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2878 				   btr_page_get_level(page), mtr, mtr);
2879 
2880 	if (!new_block) {
2881 		return(NULL);
2882 	}
2883 
2884 	new_page = buf_block_get_frame(new_block);
2885 	new_page_zip = buf_block_get_page_zip(new_block);
2886 	btr_page_create(new_block, new_page_zip, cursor->index,
2887 			btr_page_get_level(page), mtr);
2888 	/* Only record the leaf level page splits. */
2889 	if (page_is_leaf(page)) {
2890 		cursor->index->stat_defrag_n_page_split ++;
2891 		cursor->index->stat_defrag_modified_counter ++;
2892 		btr_defragment_save_defrag_stats_if_needed(cursor->index);
2893 	}
2894 
2895 	/* 3. Calculate the first record on the upper half-page, and the
2896 	first record (move_limit) on original page which ends up on the
2897 	upper half */
2898 
2899 	if (split_rec) {
2900 		first_rec = move_limit = split_rec;
2901 
2902 		*offsets = rec_get_offsets(split_rec, cursor->index, *offsets,
2903 					   page_is_leaf(page)
2904 					   ? cursor->index->n_core_fields : 0,
2905 					   n_uniq, heap);
2906 
2907 		insert_left = !tuple
2908 			|| cmp_dtuple_rec(tuple, split_rec, *offsets) < 0;
2909 
2910 		if (!insert_left && new_page_zip && n_iterations > 0) {
2911 			/* If a compressed page has already been split,
2912 			avoid further splits by inserting the record
2913 			to an empty page. */
2914 			split_rec = NULL;
2915 			goto insert_empty;
2916 		}
2917 	} else if (insert_left) {
2918 		ut_a(n_iterations > 0);
2919 		first_rec = page_rec_get_next(page_get_infimum_rec(page));
2920 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2921 	} else {
2922 insert_empty:
2923 		ut_ad(!split_rec);
2924 		ut_ad(!insert_left);
2925 		buf = UT_NEW_ARRAY_NOKEY(
2926 			byte,
2927 			rec_get_converted_size(cursor->index, tuple, n_ext));
2928 
2929 		first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
2930 						      tuple, n_ext);
2931 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2932 	}
2933 
2934 	/* 4. Do first the modifications in the tree structure */
2935 
2936 	btr_attach_half_pages(flags, cursor->index, block,
2937 			      first_rec, new_block, direction, mtr);
2938 
2939 	/* If the split is made on the leaf level and the insert will fit
2940 	on the appropriate half-page, we may release the tree x-latch.
2941 	We can then move the records after releasing the tree latch,
2942 	thus reducing the tree latch contention. */
2943 	bool insert_will_fit;
2944 	if (tuple == NULL) {
2945 		insert_will_fit = true;
2946 	} else if (split_rec) {
2947 		insert_will_fit = !new_page_zip
2948 			&& btr_page_insert_fits(cursor, split_rec,
2949 						offsets, tuple, n_ext, heap);
2950 	} else {
2951 		if (!insert_left) {
2952 			UT_DELETE_ARRAY(buf);
2953 			buf = NULL;
2954 		}
2955 
2956 		insert_will_fit = !new_page_zip
2957 			&& btr_page_insert_fits(cursor, NULL,
2958 						offsets, tuple, n_ext, heap);
2959 	}
2960 
2961 	if (!srv_read_only_mode
2962 	    && insert_will_fit
2963 	    && page_is_leaf(page)
2964 	    && !dict_index_is_online_ddl(cursor->index)) {
2965 
2966 		mtr->memo_release(
2967 			dict_index_get_lock(cursor->index),
2968 			MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK);
2969 
2970 		/* NOTE: We cannot release root block latch here, because it
2971 		has segment header and already modified in most of cases.*/
2972 	}
2973 
2974 	/* 5. Move then the records to the new page */
2975 	if (direction == FSP_DOWN) {
2976 		/*		fputs("Split left\n", stderr); */
2977 
2978 		if (0
2979 #ifdef UNIV_ZIP_COPY
2980 		    || page_zip
2981 #endif /* UNIV_ZIP_COPY */
2982 		    || !page_move_rec_list_start(new_block, block, move_limit,
2983 						 cursor->index, mtr)) {
2984 			/* For some reason, compressing new_page failed,
2985 			even though it should contain fewer records than
2986 			the original page.  Copy the page byte for byte
2987 			and then delete the records from both pages
2988 			as appropriate.  Deleting will always succeed. */
2989 			ut_a(new_page_zip);
2990 
2991 			page_zip_copy_recs(new_page_zip, new_page,
2992 					   page_zip, page, cursor->index, mtr);
2993 			page_delete_rec_list_end(move_limit - page + new_page,
2994 						 new_block, cursor->index,
2995 						 ULINT_UNDEFINED,
2996 						 ULINT_UNDEFINED, mtr);
2997 
2998 			/* Update the lock table and possible hash index. */
2999 			lock_move_rec_list_start(
3000 				new_block, block, move_limit,
3001 				new_page + PAGE_NEW_INFIMUM);
3002 
3003 			btr_search_move_or_delete_hash_entries(
3004 				new_block, block);
3005 
3006 			/* Delete the records from the source page. */
3007 
3008 			page_delete_rec_list_start(move_limit, block,
3009 						   cursor->index, mtr);
3010 		}
3011 
3012 		left_block = new_block;
3013 		right_block = block;
3014 
3015 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
3016 			lock_update_split_left(right_block, left_block);
3017 		}
3018 	} else {
3019 		/*		fputs("Split right\n", stderr); */
3020 
3021 		if (0
3022 #ifdef UNIV_ZIP_COPY
3023 		    || page_zip
3024 #endif /* UNIV_ZIP_COPY */
3025 		    || !page_move_rec_list_end(new_block, block, move_limit,
3026 					       cursor->index, mtr)) {
3027 			/* For some reason, compressing new_page failed,
3028 			even though it should contain fewer records than
3029 			the original page.  Copy the page byte for byte
3030 			and then delete the records from both pages
3031 			as appropriate.  Deleting will always succeed. */
3032 			ut_a(new_page_zip);
3033 
3034 			page_zip_copy_recs(new_page_zip, new_page,
3035 					   page_zip, page, cursor->index, mtr);
3036 			page_delete_rec_list_start(move_limit - page
3037 						   + new_page, new_block,
3038 						   cursor->index, mtr);
3039 
3040 			/* Update the lock table and possible hash index. */
3041 			lock_move_rec_list_end(new_block, block, move_limit);
3042 
3043 			btr_search_move_or_delete_hash_entries(
3044 				new_block, block);
3045 
3046 			/* Delete the records from the source page. */
3047 
3048 			page_delete_rec_list_end(move_limit, block,
3049 						 cursor->index,
3050 						 ULINT_UNDEFINED,
3051 						 ULINT_UNDEFINED, mtr);
3052 		}
3053 
3054 		left_block = block;
3055 		right_block = new_block;
3056 
3057 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
3058 			lock_update_split_right(right_block, left_block);
3059 		}
3060 	}
3061 
3062 #ifdef UNIV_ZIP_DEBUG
3063 	if (page_zip) {
3064 		ut_a(page_zip_validate(page_zip, page, cursor->index));
3065 		ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
3066 	}
3067 #endif /* UNIV_ZIP_DEBUG */
3068 
3069 	/* At this point, split_rec, move_limit and first_rec may point
3070 	to garbage on the old page. */
3071 
3072 	/* 6. The split and the tree modification is now completed. Decide the
3073 	page where the tuple should be inserted */
3074 	rec_t* rec;
3075 	buf_block_t* const insert_block = insert_left
3076 		? left_block : right_block;
3077 
3078 	if (UNIV_UNLIKELY(!tuple)) {
3079 		rec = NULL;
3080 		goto func_exit;
3081 	}
3082 
3083 	/* 7. Reposition the cursor for insert and try insertion */
3084 	page_cursor = btr_cur_get_page_cur(cursor);
3085 
3086 	page_cur_search(insert_block, cursor->index, tuple, page_cursor);
3087 
3088 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3089 				    offsets, heap, n_ext, mtr);
3090 
3091 #ifdef UNIV_ZIP_DEBUG
3092 	{
3093 		page_t*		insert_page
3094 			= buf_block_get_frame(insert_block);
3095 
3096 		page_zip_des_t*	insert_page_zip
3097 			= buf_block_get_page_zip(insert_block);
3098 
3099 		ut_a(!insert_page_zip
3100 		     || page_zip_validate(insert_page_zip, insert_page,
3101 					  cursor->index));
3102 	}
3103 #endif /* UNIV_ZIP_DEBUG */
3104 
3105 	if (rec != NULL) {
3106 
3107 		goto func_exit;
3108 	}
3109 
3110 	/* 8. If insert did not fit, try page reorganization.
3111 	For compressed pages, page_cur_tuple_insert() will have
3112 	attempted this already. */
3113 
3114 	if (page_cur_get_page_zip(page_cursor)
3115 	    || !btr_page_reorganize(page_cursor, cursor->index, mtr)) {
3116 
3117 		goto insert_failed;
3118 	}
3119 
3120 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3121 				    offsets, heap, n_ext, mtr);
3122 
3123 	if (rec == NULL) {
3124 		/* The insert did not fit on the page: loop back to the
3125 		start of the function for a new split */
3126 insert_failed:
3127 		/* We play safe and reset the free bits for new_page */
3128 		if (!dict_index_is_clust(cursor->index)
3129 		    && !cursor->index->table->is_temporary()) {
3130 			ibuf_reset_free_bits(new_block);
3131 			ibuf_reset_free_bits(block);
3132 		}
3133 
3134 		n_iterations++;
3135 		ut_ad(n_iterations < 2
3136 		      || buf_block_get_page_zip(insert_block));
3137 		ut_ad(!insert_will_fit);
3138 
3139 		goto func_start;
3140 	}
3141 
3142 func_exit:
3143 	/* Insert fit on the page: update the free bits for the
3144 	left and right pages in the same mtr */
3145 
3146 	if (!dict_index_is_clust(cursor->index)
3147 	    && !cursor->index->table->is_temporary()
3148 	    && page_is_leaf(page)) {
3149 
3150 		ibuf_update_free_bits_for_two_pages_low(
3151 			left_block, right_block, mtr);
3152 	}
3153 
3154 	MONITOR_INC(MONITOR_INDEX_SPLIT);
3155 
3156 	ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
3157 	ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
3158 
3159 	ut_ad(tuple || !rec);
3160 	ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
3161 	return(rec);
3162 }
3163 
3164 /** Removes a page from the level list of pages.
3165 @param[in]	space		space where removed
3166 @param[in]	page_size	page size
3167 @param[in,out]	page		page to remove
3168 @param[in]	index		index tree
3169 @param[in,out]	mtr		mini-transaction */
3170 dberr_t
btr_level_list_remove_func(ulint space,const page_size_t & page_size,page_t * page,dict_index_t * index,mtr_t * mtr)3171 btr_level_list_remove_func(
3172 	ulint			space,
3173 	const page_size_t&	page_size,
3174 	page_t*			page,
3175 	dict_index_t*		index,
3176 	mtr_t*			mtr)
3177 {
3178 	ut_ad(page != NULL);
3179 	ut_ad(mtr != NULL);
3180 	ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
3181 	ut_ad(space == page_get_space_id(page));
3182 	/* Get the previous and next page numbers of page */
3183 
3184 	const uint32_t	prev_page_no = btr_page_get_prev(page);
3185 	const uint32_t	next_page_no = btr_page_get_next(page);
3186 
3187 	/* Update page links of the level */
3188 
3189 	if (prev_page_no != FIL_NULL) {
3190 		buf_block_t*	prev_block
3191 			= btr_block_get(page_id_t(space, prev_page_no),
3192 					page_size, RW_X_LATCH, index, mtr);
3193 
3194 		page_t*		prev_page
3195 			= buf_block_get_frame(prev_block);
3196 #ifdef UNIV_BTR_DEBUG
3197 		ut_a(page_is_comp(prev_page) == page_is_comp(page));
3198 		ut_a(!memcmp(prev_page + FIL_PAGE_NEXT, page + FIL_PAGE_OFFSET,
3199 			     4));
3200 #endif /* UNIV_BTR_DEBUG */
3201 
3202 		btr_page_set_next(prev_page,
3203 				  buf_block_get_page_zip(prev_block),
3204 				  next_page_no, mtr);
3205 	}
3206 
3207 	if (next_page_no != FIL_NULL) {
3208 		buf_block_t*	next_block
3209 			= btr_block_get(
3210 				page_id_t(space, next_page_no), page_size,
3211 				RW_X_LATCH, index, mtr);
3212 
3213 		if (!next_block) {
3214 			return DB_ERROR;
3215 		}
3216 
3217 		page_t*		next_page
3218 			= buf_block_get_frame(next_block);
3219 #ifdef UNIV_BTR_DEBUG
3220 		ut_a(page_is_comp(next_page) == page_is_comp(page));
3221 		ut_a(!memcmp(next_page + FIL_PAGE_PREV, page + FIL_PAGE_OFFSET,
3222 			     4));
3223 #endif /* UNIV_BTR_DEBUG */
3224 
3225 		btr_page_set_prev(next_page,
3226 				  buf_block_get_page_zip(next_block),
3227 				  prev_page_no, mtr);
3228 	}
3229 
3230 	return DB_SUCCESS;
3231 }
3232 
3233 /****************************************************************//**
3234 Writes the redo log record for setting an index record as the predefined
3235 minimum record. */
3236 UNIV_INLINE
3237 void
btr_set_min_rec_mark_log(rec_t * rec,mlog_id_t type,mtr_t * mtr)3238 btr_set_min_rec_mark_log(
3239 /*=====================*/
3240 	rec_t*		rec,	/*!< in: record */
3241 	mlog_id_t	type,	/*!< in: MLOG_COMP_REC_MIN_MARK or
3242 				MLOG_REC_MIN_MARK */
3243 	mtr_t*		mtr)	/*!< in: mtr */
3244 {
3245 	mlog_write_initial_log_record(rec, type, mtr);
3246 
3247 	/* Write rec offset as a 2-byte ulint */
3248 	mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
3249 }
3250 
3251 /****************************************************************//**
3252 Parses the redo log record for setting an index record as the predefined
3253 minimum record.
3254 @return end of log record or NULL */
3255 byte*
btr_parse_set_min_rec_mark(byte * ptr,byte * end_ptr,ulint comp,page_t * page,mtr_t * mtr)3256 btr_parse_set_min_rec_mark(
3257 /*=======================*/
3258 	byte*	ptr,	/*!< in: buffer */
3259 	byte*	end_ptr,/*!< in: buffer end */
3260 	ulint	comp,	/*!< in: nonzero=compact page format */
3261 	page_t*	page,	/*!< in: page or NULL */
3262 	mtr_t*	mtr)	/*!< in: mtr or NULL */
3263 {
3264 	rec_t*	rec;
3265 
3266 	if (end_ptr < ptr + 2) {
3267 
3268 		return(NULL);
3269 	}
3270 
3271 	if (page) {
3272 		ut_a(!page_is_comp(page) == !comp);
3273 
3274 		rec = page + mach_read_from_2(ptr);
3275 
3276 		btr_set_min_rec_mark(rec, mtr);
3277 	}
3278 
3279 	return(ptr + 2);
3280 }
3281 
3282 /** Sets a record as the predefined minimum record. */
btr_set_min_rec_mark(rec_t * rec,mtr_t * mtr)3283 void btr_set_min_rec_mark(rec_t* rec, mtr_t* mtr)
3284 {
3285 	const bool comp = page_rec_is_comp(rec);
3286 
3287 	ut_ad(rec == page_rec_get_next_const(page_get_infimum_rec(
3288 						     page_align(rec))));
3289 	ut_ad(!(rec_get_info_bits(page_rec_get_next(rec), comp)
3290 		& REC_INFO_MIN_REC_FLAG));
3291 
3292 	size_t info_bits = rec_get_info_bits(rec, comp);
3293 	if (comp) {
3294 		rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3295 
3296 		btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
3297 	} else {
3298 		rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3299 
3300 		btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
3301 	}
3302 }
3303 
3304 /*************************************************************//**
3305 If page is the only on its level, this function moves its records to the
3306 father page, thus reducing the tree height.
3307 @return father block */
3308 UNIV_INTERN
3309 buf_block_t*
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3310 btr_lift_page_up(
3311 /*=============*/
3312 	dict_index_t*	index,	/*!< in: index tree */
3313 	buf_block_t*	block,	/*!< in: page which is the only on its level;
3314 				must not be empty: use
3315 				btr_discard_only_page_on_level if the last
3316 				record from the page should be removed */
3317 	mtr_t*		mtr)	/*!< in: mtr */
3318 {
3319 	buf_block_t*	father_block;
3320 	page_t*		father_page;
3321 	ulint		page_level;
3322 	page_zip_des_t*	father_page_zip;
3323 	page_t*		page		= buf_block_get_frame(block);
3324 	ulint		root_page_no;
3325 	buf_block_t*	blocks[BTR_MAX_LEVELS];
3326 	ulint		n_blocks;	/*!< last used index in blocks[] */
3327 	ulint		i;
3328 	bool		lift_father_up;
3329 	buf_block_t*	block_orig	= block;
3330 
3331 	ut_ad(!page_has_siblings(page));
3332 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3333 
3334 	page_level = btr_page_get_level(page);
3335 	root_page_no = dict_index_get_page(index);
3336 
3337 	{
3338 		btr_cur_t	cursor;
3339 		rec_offs*	offsets	= NULL;
3340 		mem_heap_t*	heap	= mem_heap_create(
3341 			sizeof(*offsets)
3342 			* (REC_OFFS_HEADER_SIZE + 1 + 1
3343 			   + unsigned(index->n_fields)));
3344 		buf_block_t*	b;
3345 
3346 		if (dict_index_is_spatial(index)) {
3347 			offsets = rtr_page_get_father_block(
3348 				NULL, heap, index, block, mtr,
3349 				NULL, &cursor);
3350 		} else {
3351 			offsets = btr_page_get_father_block(offsets, heap,
3352 							    index, block,
3353 							    mtr, &cursor);
3354 		}
3355 		father_block = btr_cur_get_block(&cursor);
3356 		father_page_zip = buf_block_get_page_zip(father_block);
3357 		father_page = buf_block_get_frame(father_block);
3358 
3359 		n_blocks = 0;
3360 
3361 		/* Store all ancestor pages so we can reset their
3362 		levels later on.  We have to do all the searches on
3363 		the tree now because later on, after we've replaced
3364 		the first level, the tree is in an inconsistent state
3365 		and can not be searched. */
3366 		for (b = father_block;
3367 		     b->page.id.page_no() != root_page_no; ) {
3368 			ut_a(n_blocks < BTR_MAX_LEVELS);
3369 
3370 			if (dict_index_is_spatial(index)) {
3371 				offsets = rtr_page_get_father_block(
3372 					NULL, heap, index, b, mtr,
3373 					NULL, &cursor);
3374 			} else {
3375 				offsets = btr_page_get_father_block(offsets,
3376 								    heap,
3377 								    index, b,
3378 								    mtr,
3379 								    &cursor);
3380 			}
3381 
3382 			blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
3383 		}
3384 
3385 		lift_father_up = (n_blocks && page_level == 0);
3386 		if (lift_father_up) {
3387 			/* The father page also should be the only on its level (not
3388 			root). We should lift up the father page at first.
3389 			Because the leaf page should be lifted up only for root page.
3390 			The freeing page is based on page_level (==0 or !=0)
3391 			to choose segment. If the page_level is changed ==0 from !=0,
3392 			later freeing of the page doesn't find the page allocation
3393 			to be freed.*/
3394 
3395 			block = father_block;
3396 			page = buf_block_get_frame(block);
3397 			page_level = btr_page_get_level(page);
3398 
3399 			ut_ad(!page_has_siblings(page));
3400 			ut_ad(mtr_memo_contains(
3401 				      mtr, block, MTR_MEMO_PAGE_X_FIX));
3402 
3403 			father_block = blocks[0];
3404 			father_page_zip = buf_block_get_page_zip(father_block);
3405 			father_page = buf_block_get_frame(father_block);
3406 		}
3407 
3408 		mem_heap_free(heap);
3409 	}
3410 
3411 	btr_search_drop_page_hash_index(block);
3412 
3413 	/* Make the father empty */
3414 	btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
3415 	/* btr_page_empty() is supposed to zero-initialize the field. */
3416 	ut_ad(!page_get_instant(father_block->frame));
3417 
3418 	if (index->is_instant()
3419 	    && father_block->page.id.page_no() == root_page_no) {
3420 		ut_ad(!father_page_zip);
3421 		byte* page_type = father_block->frame + FIL_PAGE_TYPE;
3422 		ut_ad(mach_read_from_2(page_type) == FIL_PAGE_INDEX);
3423 		mlog_write_ulint(page_type, FIL_PAGE_TYPE_INSTANT,
3424 				 MLOG_2BYTES, mtr);
3425 		page_set_instant(father_block->frame,
3426 				 index->n_core_fields, mtr);
3427 	}
3428 
3429 	page_level++;
3430 
3431 	/* Copy the records to the father page one by one. */
3432 	if (0
3433 #ifdef UNIV_ZIP_COPY
3434 	    || father_page_zip
3435 #endif /* UNIV_ZIP_COPY */
3436 	    || !page_copy_rec_list_end(father_block, block,
3437 				       page_get_infimum_rec(page),
3438 				       index, mtr)) {
3439 		const page_zip_des_t*	page_zip
3440 			= buf_block_get_page_zip(block);
3441 		ut_a(father_page_zip);
3442 		ut_a(page_zip);
3443 
3444 		/* Copy the page byte for byte. */
3445 		page_zip_copy_recs(father_page_zip, father_page,
3446 				   page_zip, page, index, mtr);
3447 
3448 		/* Update the lock table and possible hash index. */
3449 
3450 		lock_move_rec_list_end(father_block, block,
3451 				       page_get_infimum_rec(page));
3452 
3453 		/* Also update the predicate locks */
3454 		if (dict_index_is_spatial(index)) {
3455 			lock_prdt_rec_move(father_block, block);
3456 		} else {
3457 			btr_search_move_or_delete_hash_entries(
3458 				father_block, block);
3459 		}
3460 	}
3461 
3462 	if (!dict_table_is_locking_disabled(index->table)) {
3463 		/* Free predicate page locks on the block */
3464 		if (dict_index_is_spatial(index)) {
3465 			lock_mutex_enter();
3466 			lock_prdt_page_free_from_discard(
3467 				block, lock_sys.prdt_page_hash);
3468 			lock_mutex_exit();
3469 		}
3470 		lock_update_copy_and_discard(father_block, block);
3471 	}
3472 
3473 	/* Go upward to root page, decrementing levels by one. */
3474 	for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
3475 		page_t*		page	= buf_block_get_frame(blocks[i]);
3476 		page_zip_des_t*	page_zip= buf_block_get_page_zip(blocks[i]);
3477 
3478 		ut_ad(btr_page_get_level(page) == page_level + 1);
3479 
3480 		btr_page_set_level(page, page_zip, page_level, mtr);
3481 #ifdef UNIV_ZIP_DEBUG
3482 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3483 #endif /* UNIV_ZIP_DEBUG */
3484 	}
3485 
3486 	if (dict_index_is_spatial(index)) {
3487 		rtr_check_discard_page(index, NULL, block);
3488 	}
3489 
3490 	/* Free the file page */
3491 	btr_page_free(index, block, mtr);
3492 
3493 	/* We play it safe and reset the free bits for the father */
3494 	if (!dict_index_is_clust(index)
3495 	    && !index->table->is_temporary()) {
3496 		ibuf_reset_free_bits(father_block);
3497 	}
3498 	ut_ad(page_validate(father_page, index));
3499 	ut_ad(btr_check_node_ptr(index, father_block, mtr));
3500 
3501 	return(lift_father_up ? block_orig : father_block);
3502 }
3503 
3504 /*************************************************************//**
3505 Tries to merge the page first to the left immediate brother if such a
3506 brother exists, and the node pointers to the current page and to the brother
3507 reside on the same page. If the left brother does not satisfy these
3508 conditions, looks at the right brother. If the page is the only one on that
3509 level lifts the records of the page to the father page, thus reducing the
3510 tree height. It is assumed that mtr holds an x-latch on the tree and on the
3511 page. If cursor is on the leaf level, mtr must also hold x-latches to the
3512 brothers, if they exist.
3513 @return TRUE on success */
3514 ibool
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3515 btr_compress(
3516 /*=========*/
3517 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to merge
3518 				or lift; the page must not be empty:
3519 				when deleting records, use btr_discard_page()
3520 				if the page would become empty */
3521 	ibool		adjust,	/*!< in: TRUE if should adjust the
3522 				cursor position even if compression occurs */
3523 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3524 {
3525 	dict_index_t*	index;
3526 	ulint		left_page_no;
3527 	ulint		right_page_no;
3528 	buf_block_t*	merge_block;
3529 	page_t*		merge_page = NULL;
3530 	page_zip_des_t*	merge_page_zip;
3531 	ibool		is_left;
3532 	buf_block_t*	block;
3533 	page_t*		page;
3534 	btr_cur_t	father_cursor;
3535 	mem_heap_t*	heap;
3536 	rec_offs*	offsets;
3537 	ulint		nth_rec = 0; /* remove bogus warning */
3538 	bool		mbr_changed = false;
3539 #ifdef UNIV_DEBUG
3540 	bool		leftmost_child;
3541 #endif
3542 	DBUG_ENTER("btr_compress");
3543 
3544 	block = btr_cur_get_block(cursor);
3545 	page = btr_cur_get_page(cursor);
3546 	index = btr_cur_get_index(cursor);
3547 
3548 	btr_assert_not_corrupted(block, index);
3549 
3550 #ifdef UNIV_DEBUG
3551 	if (dict_index_is_spatial(index)) {
3552 		ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3553 						MTR_MEMO_X_LOCK));
3554 	} else {
3555 		ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3556 						MTR_MEMO_X_LOCK
3557 						| MTR_MEMO_SX_LOCK));
3558 	}
3559 #endif /* UNIV_DEBUG */
3560 
3561 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3562 
3563 	const page_size_t	page_size(index->table->space->flags);
3564 
3565 	MONITOR_INC(MONITOR_INDEX_MERGE_ATTEMPTS);
3566 
3567 	left_page_no = btr_page_get_prev(page);
3568 	right_page_no = btr_page_get_next(page);
3569 
3570 #ifdef UNIV_DEBUG
3571 	if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3572 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3573 			page_rec_get_next(page_get_infimum_rec(page)),
3574 			page_is_comp(page)));
3575 	}
3576 #endif /* UNIV_DEBUG */
3577 
3578 	heap = mem_heap_create(100);
3579 
3580 	if (dict_index_is_spatial(index)) {
3581 		offsets = rtr_page_get_father_block(
3582 			NULL, heap, index, block, mtr, cursor, &father_cursor);
3583 		ut_ad(cursor->page_cur.block->page.id.page_no()
3584 		      == block->page.id.page_no());
3585 		rec_t*  my_rec = father_cursor.page_cur.rec;
3586 
3587 		ulint page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3588 
3589 		if (page_no != block->page.id.page_no()) {
3590 			ib::info() << "father positioned on page "
3591 				<< page_no << "instead of "
3592 				<< block->page.id.page_no();
3593 			offsets = btr_page_get_father_block(
3594 				NULL, heap, index, block, mtr, &father_cursor);
3595 		}
3596 	} else {
3597 		offsets = btr_page_get_father_block(
3598 			NULL, heap, index, block, mtr, &father_cursor);
3599 	}
3600 
3601 	if (adjust) {
3602 		nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3603 		ut_ad(nth_rec > 0);
3604 	}
3605 
3606 	if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3607 		/* The page is the only one on the level, lift the records
3608 		to the father */
3609 
3610 		merge_block = btr_lift_page_up(index, block, mtr);
3611 		goto func_exit;
3612 	}
3613 
3614 	ut_d(leftmost_child =
3615 		left_page_no != FIL_NULL
3616 		&& (page_rec_get_next(
3617 			page_get_infimum_rec(
3618 				btr_cur_get_page(&father_cursor)))
3619 		    == btr_cur_get_rec(&father_cursor)));
3620 
3621 	/* Decide the page to which we try to merge and which will inherit
3622 	the locks */
3623 
3624 	is_left = btr_can_merge_with_page(cursor, left_page_no,
3625 					  &merge_block, mtr);
3626 
3627 	DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3628 retry:
3629 	if (!is_left
3630 	   && !btr_can_merge_with_page(cursor, right_page_no, &merge_block,
3631 				       mtr)) {
3632 		if (!merge_block) {
3633 			merge_page = NULL;
3634 		}
3635 		goto err_exit;
3636 	}
3637 
3638 	merge_page = buf_block_get_frame(merge_block);
3639 
3640 #ifdef UNIV_BTR_DEBUG
3641 	if (is_left) {
3642 		ut_a(btr_page_get_next(merge_page)
3643 		     == block->page.id.page_no());
3644 	} else {
3645 		ut_a(btr_page_get_prev(merge_page)
3646 		     == block->page.id.page_no());
3647 	}
3648 #endif /* UNIV_BTR_DEBUG */
3649 
3650 #ifdef UNIV_GIS_DEBUG
3651 	if (dict_index_is_spatial(index)) {
3652 		if (is_left) {
3653 			fprintf(stderr, "GIS_DIAG: merge left  %ld to %ld \n",
3654 				(long) block->page.id.page_no(), left_page_no);
3655 		} else {
3656 			fprintf(stderr, "GIS_DIAG: merge right %ld to %ld\n",
3657 				(long) block->page.id.page_no(), right_page_no);
3658 		}
3659 	}
3660 #endif /* UNIV_GIS_DEBUG */
3661 
3662 	ut_ad(page_validate(merge_page, index));
3663 
3664 	merge_page_zip = buf_block_get_page_zip(merge_block);
3665 #ifdef UNIV_ZIP_DEBUG
3666 	if (merge_page_zip) {
3667 		const page_zip_des_t*	page_zip
3668 			= buf_block_get_page_zip(block);
3669 		ut_a(page_zip);
3670 		ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3671 		ut_a(page_zip_validate(page_zip, page, index));
3672 	}
3673 #endif /* UNIV_ZIP_DEBUG */
3674 
3675 	/* Move records to the merge page */
3676 	if (is_left) {
3677 		btr_cur_t	cursor2;
3678 		rtr_mbr_t	new_mbr;
3679 		rec_offs*	offsets2 = NULL;
3680 
3681 		/* For rtree, we need to update father's mbr. */
3682 		if (index->is_spatial()) {
3683 			/* We only support merge pages with the same parent
3684 			page */
3685 			if (!rtr_check_same_block(
3686 				index, &cursor2,
3687 				btr_cur_get_block(&father_cursor),
3688 				merge_block, heap)) {
3689 				is_left = false;
3690 				goto retry;
3691 			}
3692 
3693 			/* Set rtr_info for cursor2, since it is
3694 			necessary in recursive page merge. */
3695 			cursor2.rtr_info = cursor->rtr_info;
3696 			cursor2.tree_height = cursor->tree_height;
3697 
3698 			offsets2 = rec_get_offsets(
3699 				btr_cur_get_rec(&cursor2), index, NULL,
3700 				page_is_leaf(cursor2.page_cur.block->frame)
3701 				? index->n_fields : 0,
3702 				ULINT_UNDEFINED, &heap);
3703 
3704 			/* Check if parent entry needs to be updated */
3705 			mbr_changed = rtr_merge_mbr_changed(
3706 				&cursor2, &father_cursor,
3707 				offsets2, offsets, &new_mbr);
3708 		}
3709 
3710 		rec_t*	orig_pred = page_copy_rec_list_start(
3711 			merge_block, block, page_get_supremum_rec(page),
3712 			index, mtr);
3713 
3714 		if (!orig_pred) {
3715 			goto err_exit;
3716 		}
3717 
3718 		btr_search_drop_page_hash_index(block);
3719 
3720 		/* Remove the page from the level list */
3721 		if (DB_SUCCESS != btr_level_list_remove(index->table->space_id,
3722 							page_size, page, index,
3723 							mtr)) {
3724 			goto err_exit;
3725 		}
3726 
3727 		if (dict_index_is_spatial(index)) {
3728 			rec_t*  my_rec = father_cursor.page_cur.rec;
3729 
3730 			ulint page_no = btr_node_ptr_get_child_page_no(
3731 						my_rec, offsets);
3732 
3733 			if (page_no != block->page.id.page_no()) {
3734 
3735 				ib::fatal() << "father positioned on "
3736 					<< page_no << " instead of "
3737 					<< block->page.id.page_no();
3738 
3739 				ut_ad(0);
3740 			}
3741 
3742 			if (mbr_changed) {
3743 #ifdef UNIV_DEBUG
3744 				bool	success = rtr_update_mbr_field(
3745 					&cursor2, offsets2, &father_cursor,
3746 					merge_page, &new_mbr, NULL, mtr);
3747 
3748 				ut_ad(success);
3749 #else
3750 				rtr_update_mbr_field(
3751 					&cursor2, offsets2, &father_cursor,
3752 					merge_page, &new_mbr, NULL, mtr);
3753 #endif
3754 			} else {
3755 				rtr_node_ptr_delete(&father_cursor, mtr);
3756 			}
3757 
3758 			/* No GAP lock needs to be worrying about */
3759 			lock_mutex_enter();
3760 			lock_prdt_page_free_from_discard(
3761 				block, lock_sys.prdt_page_hash);
3762 			lock_rec_free_all_from_discard_page(block);
3763 			lock_mutex_exit();
3764 		} else {
3765 			btr_cur_node_ptr_delete(&father_cursor, mtr);
3766 			if (!dict_table_is_locking_disabled(index->table)) {
3767 				lock_update_merge_left(
3768 					merge_block, orig_pred, block);
3769 			}
3770 		}
3771 
3772 		if (adjust) {
3773 			nth_rec += page_rec_get_n_recs_before(orig_pred);
3774 		}
3775 	} else {
3776 		rec_t*		orig_succ;
3777 		ibool		compressed;
3778 		dberr_t		err;
3779 		btr_cur_t	cursor2;
3780 					/* father cursor pointing to node ptr
3781 					of the right sibling */
3782 #ifdef UNIV_BTR_DEBUG
3783 		byte		fil_page_prev[4];
3784 #endif /* UNIV_BTR_DEBUG */
3785 
3786 		if (dict_index_is_spatial(index)) {
3787 			cursor2.rtr_info = NULL;
3788 
3789 			/* For spatial index, we disallow merge of blocks
3790 			with different parents, since the merge would need
3791 			to update entry (for MBR and Primary key) in the
3792 			parent of block being merged */
3793 			if (!rtr_check_same_block(
3794 				index, &cursor2,
3795 				btr_cur_get_block(&father_cursor),
3796 				merge_block, heap)) {
3797 				goto err_exit;
3798 			}
3799 
3800 			/* Set rtr_info for cursor2, since it is
3801 			necessary in recursive page merge. */
3802 			cursor2.rtr_info = cursor->rtr_info;
3803 			cursor2.tree_height = cursor->tree_height;
3804 		} else {
3805 			btr_page_get_father(index, merge_block, mtr, &cursor2);
3806 		}
3807 
3808 		if (merge_page_zip && left_page_no == FIL_NULL) {
3809 
3810 			/* The function page_zip_compress(), which will be
3811 			invoked by page_copy_rec_list_end() below,
3812 			requires that FIL_PAGE_PREV be FIL_NULL.
3813 			Clear the field, but prepare to restore it. */
3814 #ifdef UNIV_BTR_DEBUG
3815 			memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3816 #endif /* UNIV_BTR_DEBUG */
3817 			compile_time_assert(FIL_NULL == 0xffffffffU);
3818 			memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
3819 		}
3820 
3821 		orig_succ = page_copy_rec_list_end(merge_block, block,
3822 						   page_get_infimum_rec(page),
3823 						   cursor->index, mtr);
3824 
3825 		if (!orig_succ) {
3826 			ut_a(merge_page_zip);
3827 #ifdef UNIV_BTR_DEBUG
3828 			if (left_page_no == FIL_NULL) {
3829 				/* FIL_PAGE_PREV was restored from
3830 				merge_page_zip. */
3831 				ut_a(!memcmp(fil_page_prev,
3832 					     merge_page + FIL_PAGE_PREV, 4));
3833 			}
3834 #endif /* UNIV_BTR_DEBUG */
3835 			goto err_exit;
3836 		}
3837 
3838 		btr_search_drop_page_hash_index(block);
3839 
3840 #ifdef UNIV_BTR_DEBUG
3841 		if (merge_page_zip && left_page_no == FIL_NULL) {
3842 
3843 			/* Restore FIL_PAGE_PREV in order to avoid an assertion
3844 			failure in btr_level_list_remove(), which will set
3845 			the field again to FIL_NULL.  Even though this makes
3846 			merge_page and merge_page_zip inconsistent for a
3847 			split second, it is harmless, because the pages
3848 			are X-latched. */
3849 			memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3850 		}
3851 #endif /* UNIV_BTR_DEBUG */
3852 
3853 		/* Remove the page from the level list */
3854 		if (DB_SUCCESS != btr_level_list_remove(index->table->space_id,
3855 						page_size, page, index, mtr)) {
3856 			goto err_exit;
3857 		}
3858 
3859 		ut_ad(btr_node_ptr_get_child_page_no(
3860 			btr_cur_get_rec(&father_cursor), offsets)
3861 			== block->page.id.page_no());
3862 
3863 		/* Replace the address of the old child node (= page) with the
3864 		address of the merge page to the right */
3865 		btr_node_ptr_set_child_page_no(
3866 			btr_cur_get_rec(&father_cursor),
3867 			btr_cur_get_page_zip(&father_cursor),
3868 			offsets, right_page_no, mtr);
3869 
3870 #ifdef UNIV_DEBUG
3871 		if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3872 			ut_ad(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3873 				page_rec_get_next(page_get_infimum_rec(
3874 					buf_block_get_frame(merge_block))),
3875 				page_is_comp(page)));
3876 		}
3877 #endif /* UNIV_DEBUG */
3878 
3879 		/* For rtree, we need to update father's mbr. */
3880 		if (index->is_spatial()) {
3881 			rec_offs* offsets2;
3882 			ulint	rec_info;
3883 
3884 			offsets2 = rec_get_offsets(
3885 				btr_cur_get_rec(&cursor2), index, NULL,
3886 				page_is_leaf(cursor2.page_cur.block->frame)
3887 				? index->n_fields : 0,
3888 				ULINT_UNDEFINED, &heap);
3889 
3890 			ut_ad(btr_node_ptr_get_child_page_no(
3891 				btr_cur_get_rec(&cursor2), offsets2)
3892 				== right_page_no);
3893 
3894 			rec_info = rec_get_info_bits(
3895 				btr_cur_get_rec(&father_cursor),
3896 				rec_offs_comp(offsets));
3897 			if (rec_info & REC_INFO_MIN_REC_FLAG) {
3898 				/* When the father node ptr is minimal rec,
3899 				we will keep it and delete the node ptr of
3900 				merge page. */
3901 				rtr_merge_and_update_mbr(&father_cursor,
3902 							 &cursor2,
3903 							 offsets, offsets2,
3904 							 merge_page, mtr);
3905 			} else {
3906 				/* Otherwise, we will keep the node ptr of
3907 				merge page and delete the father node ptr.
3908 				This is for keeping the rec order in upper
3909 				level. */
3910 				rtr_merge_and_update_mbr(&cursor2,
3911 							 &father_cursor,
3912 							 offsets2, offsets,
3913 							 merge_page, mtr);
3914 			}
3915 			lock_mutex_enter();
3916 			lock_prdt_page_free_from_discard(
3917 				block, lock_sys.prdt_page_hash);
3918 			lock_rec_free_all_from_discard_page(block);
3919 			lock_mutex_exit();
3920 		} else {
3921 
3922 			compressed = btr_cur_pessimistic_delete(&err, TRUE,
3923 								&cursor2,
3924 								BTR_CREATE_FLAG,
3925 								false, mtr);
3926 			ut_a(err == DB_SUCCESS);
3927 
3928 			if (!compressed) {
3929 				btr_cur_compress_if_useful(&cursor2,
3930 							   FALSE,
3931 							   mtr);
3932 			}
3933 
3934 			if (!dict_table_is_locking_disabled(index->table)) {
3935 				lock_update_merge_right(
3936 					merge_block, orig_succ, block);
3937 			}
3938 		}
3939 	}
3940 
3941 	if (!dict_index_is_clust(index)
3942 	    && !index->table->is_temporary()
3943 	    && page_is_leaf(merge_page)) {
3944 		/* Update the free bits of the B-tree page in the
3945 		insert buffer bitmap.  This has to be done in a
3946 		separate mini-transaction that is committed before the
3947 		main mini-transaction.  We cannot update the insert
3948 		buffer bitmap in this mini-transaction, because
3949 		btr_compress() can be invoked recursively without
3950 		committing the mini-transaction in between.  Since
3951 		insert buffer bitmap pages have a lower rank than
3952 		B-tree pages, we must not access other pages in the
3953 		same mini-transaction after accessing an insert buffer
3954 		bitmap page. */
3955 
3956 		/* The free bits in the insert buffer bitmap must
3957 		never exceed the free space on a page.  It is safe to
3958 		decrement or reset the bits in the bitmap in a
3959 		mini-transaction that is committed before the
3960 		mini-transaction that affects the free space. */
3961 
3962 		/* It is unsafe to increment the bits in a separately
3963 		committed mini-transaction, because in crash recovery,
3964 		the free bits could momentarily be set too high. */
3965 
3966 		if (page_size.is_compressed()) {
3967 			/* Because the free bits may be incremented
3968 			and we cannot update the insert buffer bitmap
3969 			in the same mini-transaction, the only safe
3970 			thing we can do here is the pessimistic
3971 			approach: reset the free bits. */
3972 			ibuf_reset_free_bits(merge_block);
3973 		} else {
3974 			/* On uncompressed pages, the free bits will
3975 			never increase here.  Thus, it is safe to
3976 			write the bits accurately in a separate
3977 			mini-transaction. */
3978 			ibuf_update_free_bits_if_full(merge_block,
3979 						      srv_page_size,
3980 						      ULINT_UNDEFINED);
3981 		}
3982 	}
3983 
3984 	ut_ad(page_validate(merge_page, index));
3985 #ifdef UNIV_ZIP_DEBUG
3986 	ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page,
3987 						  index));
3988 #endif /* UNIV_ZIP_DEBUG */
3989 
3990 	if (dict_index_is_spatial(index)) {
3991 #ifdef UNIV_GIS_DEBUG
3992 		fprintf(stderr, "GIS_DIAG: compressed away  %ld\n",
3993 			(long) block->page.id.page_no());
3994 		fprintf(stderr, "GIS_DIAG: merged to %ld\n",
3995 			(long) merge_block->page.id.page_no());
3996 #endif
3997 
3998 		rtr_check_discard_page(index, NULL, block);
3999 	}
4000 
4001 	/* Free the file page */
4002 	btr_page_free(index, block, mtr);
4003 
4004 	/* btr_check_node_ptr() needs parent block latched.
4005 	If the merge_block's parent block is not same,
4006 	we cannot use btr_check_node_ptr() */
4007 	ut_ad(leftmost_child
4008 	      || btr_check_node_ptr(index, merge_block, mtr));
4009 func_exit:
4010 	mem_heap_free(heap);
4011 
4012 	if (adjust) {
4013 		ut_ad(nth_rec > 0);
4014 		btr_cur_position(
4015 			index,
4016 			page_rec_get_nth(merge_block->frame, nth_rec),
4017 			merge_block, cursor);
4018 	}
4019 
4020 	MONITOR_INC(MONITOR_INDEX_MERGE_SUCCESSFUL);
4021 
4022 	DBUG_RETURN(TRUE);
4023 
4024 err_exit:
4025 	/* We play it safe and reset the free bits. */
4026 	if (page_size.is_compressed()
4027 	    && merge_page
4028 	    && page_is_leaf(merge_page)
4029 	    && !dict_index_is_clust(index)) {
4030 
4031 		ibuf_reset_free_bits(merge_block);
4032 	}
4033 
4034 	mem_heap_free(heap);
4035 	DBUG_RETURN(FALSE);
4036 }
4037 
4038 /*************************************************************//**
4039 Discards a page that is the only page on its level.  This will empty
4040 the whole B-tree, leaving just an empty root page.  This function
4041 should almost never be reached, because btr_compress(), which is invoked in
4042 delete operations, calls btr_lift_page_up() to flatten the B-tree. */
4043 ATTRIBUTE_COLD
4044 static
4045 void
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4046 btr_discard_only_page_on_level(
4047 /*===========================*/
4048 	dict_index_t*	index,	/*!< in: index tree */
4049 	buf_block_t*	block,	/*!< in: page which is the only on its level */
4050 	mtr_t*		mtr)	/*!< in: mtr */
4051 {
4052 	ulint		page_level = 0;
4053 	trx_id_t	max_trx_id;
4054 
4055 	ut_ad(!index->is_dummy);
4056 
4057 	/* Save the PAGE_MAX_TRX_ID from the leaf page. */
4058 	max_trx_id = page_get_max_trx_id(buf_block_get_frame(block));
4059 
4060 	while (block->page.id.page_no() != dict_index_get_page(index)) {
4061 		btr_cur_t	cursor;
4062 		buf_block_t*	father;
4063 		const page_t*	page	= buf_block_get_frame(block);
4064 
4065 		ut_a(page_get_n_recs(page) == 1);
4066 		ut_a(page_level == btr_page_get_level(page));
4067 		ut_a(!page_has_siblings(page));
4068 		ut_ad(fil_page_index_page_check(page));
4069 		ut_ad(block->page.id.space() == index->table->space->id);
4070 		ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4071 		btr_search_drop_page_hash_index(block);
4072 
4073 		if (dict_index_is_spatial(index)) {
4074 			/* Check any concurrent search having this page */
4075 			rtr_check_discard_page(index, NULL, block);
4076 			rtr_page_get_father(index, block, mtr, NULL, &cursor);
4077 		} else {
4078 			btr_page_get_father(index, block, mtr, &cursor);
4079 		}
4080 		father = btr_cur_get_block(&cursor);
4081 
4082 		if (!dict_table_is_locking_disabled(index->table)) {
4083 			lock_update_discard(
4084 				father, PAGE_HEAP_NO_SUPREMUM, block);
4085 		}
4086 
4087 		/* Free the file page */
4088 		btr_page_free(index, block, mtr);
4089 
4090 		block = father;
4091 		page_level++;
4092 	}
4093 
4094 	/* block is the root page, which must be empty, except
4095 	for the node pointer to the (now discarded) block(s). */
4096 	ut_ad(!page_has_siblings(block->frame));
4097 
4098 #ifdef UNIV_BTR_DEBUG
4099 	if (!dict_index_is_ibuf(index)) {
4100 		const page_t*	root	= buf_block_get_frame(block);
4101 		const ulint	space	= index->table->space_id;
4102 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
4103 					    + root, space));
4104 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
4105 					    + root, space));
4106 	}
4107 #endif /* UNIV_BTR_DEBUG */
4108 
4109 	btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
4110 	ut_ad(page_is_leaf(buf_block_get_frame(block)));
4111 	/* btr_page_empty() is supposed to zero-initialize the field. */
4112 	ut_ad(!page_get_instant(block->frame));
4113 
4114 	if (index->is_primary()) {
4115 		/* Concurrent access is prevented by the root_block->lock
4116 		X-latch, so this should be safe. */
4117 		index->remove_instant();
4118 	} else if (!index->table->is_temporary()) {
4119 		/* We play it safe and reset the free bits for the root */
4120 		ibuf_reset_free_bits(block);
4121 
4122 		ut_a(max_trx_id);
4123 		page_set_max_trx_id(block,
4124 				    buf_block_get_page_zip(block),
4125 				    max_trx_id, mtr);
4126 	}
4127 }
4128 
4129 /*************************************************************//**
4130 Discards a page from a B-tree. This is used to remove the last record from
4131 a B-tree page: the whole page must be removed at the same time. This cannot
4132 be used for the root page, which is allowed to be empty. */
4133 void
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)4134 btr_discard_page(
4135 /*=============*/
4136 	btr_cur_t*	cursor,	/*!< in: cursor on the page to discard: not on
4137 				the root page */
4138 	mtr_t*		mtr)	/*!< in: mtr */
4139 {
4140 	dict_index_t*	index;
4141 	ulint		left_page_no;
4142 	ulint		right_page_no;
4143 	buf_block_t*	merge_block;
4144 	page_t*		merge_page;
4145 	buf_block_t*	block;
4146 	page_t*		page;
4147 	rec_t*		node_ptr;
4148 	btr_cur_t	parent_cursor;
4149 
4150 	block = btr_cur_get_block(cursor);
4151 	index = btr_cur_get_index(cursor);
4152 
4153 	ut_ad(dict_index_get_page(index) != block->page.id.page_no());
4154 
4155 	ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
4156 					MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
4157 
4158 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4159 
4160 	MONITOR_INC(MONITOR_INDEX_DISCARD);
4161 
4162 	if (dict_index_is_spatial(index)) {
4163 		rtr_page_get_father(index, block, mtr, cursor, &parent_cursor);
4164 	} else {
4165 		btr_page_get_father(index, block, mtr, &parent_cursor);
4166 	}
4167 
4168 	/* Decide the page which will inherit the locks */
4169 
4170 	left_page_no = btr_page_get_prev(buf_block_get_frame(block));
4171 	right_page_no = btr_page_get_next(buf_block_get_frame(block));
4172 
4173 	const page_size_t	page_size(index->table->space->flags);
4174 	ut_d(bool parent_is_different = false);
4175 	if (left_page_no != FIL_NULL) {
4176 		merge_block = btr_block_get(
4177 			page_id_t(index->table->space_id, left_page_no),
4178 			page_size, RW_X_LATCH, index, mtr);
4179 
4180 		merge_page = buf_block_get_frame(merge_block);
4181 #ifdef UNIV_BTR_DEBUG
4182 		ut_a(btr_page_get_next(merge_page)
4183 		     == block->page.id.page_no());
4184 #endif /* UNIV_BTR_DEBUG */
4185 		ut_d(parent_is_different =
4186 			(page_rec_get_next(
4187 				page_get_infimum_rec(
4188 					btr_cur_get_page(
4189 						&parent_cursor)))
4190 			 == btr_cur_get_rec(&parent_cursor)));
4191 	} else if (right_page_no != FIL_NULL) {
4192 		merge_block = btr_block_get(
4193 			page_id_t(index->table->space_id, right_page_no),
4194 			page_size, RW_X_LATCH, index, mtr);
4195 
4196 		merge_page = buf_block_get_frame(merge_block);
4197 #ifdef UNIV_BTR_DEBUG
4198 		ut_a(btr_page_get_prev(merge_page)
4199 		     == block->page.id.page_no());
4200 #endif /* UNIV_BTR_DEBUG */
4201 		ut_d(parent_is_different = page_rec_is_supremum(
4202 			page_rec_get_next(btr_cur_get_rec(&parent_cursor))));
4203 	} else {
4204 		btr_discard_only_page_on_level(index, block, mtr);
4205 
4206 		return;
4207 	}
4208 
4209 	page = buf_block_get_frame(block);
4210 	ut_a(page_is_comp(merge_page) == page_is_comp(page));
4211 	btr_search_drop_page_hash_index(block);
4212 
4213 	if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
4214 
4215 		/* We have to mark the leftmost node pointer on the right
4216 		side page as the predefined minimum record */
4217 		node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));
4218 
4219 		ut_ad(page_rec_is_user_rec(node_ptr));
4220 
4221 		/* This will make page_zip_validate() fail on merge_page
4222 		until btr_level_list_remove() completes.  This is harmless,
4223 		because everything will take place within a single
4224 		mini-transaction and because writing to the redo log
4225 		is an atomic operation (performed by mtr_commit()). */
4226 		btr_set_min_rec_mark(node_ptr, mtr);
4227 	}
4228 
4229 	if (dict_index_is_spatial(index)) {
4230 		rtr_node_ptr_delete(&parent_cursor, mtr);
4231 	} else {
4232 		btr_cur_node_ptr_delete(&parent_cursor, mtr);
4233 	}
4234 
4235 	/* Remove the page from the level list */
4236 	ut_a(DB_SUCCESS == btr_level_list_remove(index->table->space_id,
4237 						 page_size, page, index, mtr));
4238 
4239 #ifdef UNIV_ZIP_DEBUG
4240 	{
4241 		page_zip_des_t*	merge_page_zip
4242 			= buf_block_get_page_zip(merge_block);
4243 		ut_a(!merge_page_zip
4244 		     || page_zip_validate(merge_page_zip, merge_page, index));
4245 	}
4246 #endif /* UNIV_ZIP_DEBUG */
4247 
4248 	if (!dict_table_is_locking_disabled(index->table)) {
4249 		if (left_page_no != FIL_NULL) {
4250 			lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
4251 					    block);
4252 		} else {
4253 			lock_update_discard(merge_block,
4254 					    lock_get_min_heap_no(merge_block),
4255 					    block);
4256 		}
4257 	}
4258 
4259 	if (dict_index_is_spatial(index)) {
4260 		rtr_check_discard_page(index, cursor, block);
4261 	}
4262 
4263 	/* Free the file page */
4264 	btr_page_free(index, block, mtr);
4265 
4266 	/* btr_check_node_ptr() needs parent block latched.
4267 	If the merge_block's parent block is not same,
4268 	we cannot use btr_check_node_ptr() */
4269 	ut_ad(parent_is_different
4270 	      || btr_check_node_ptr(index, merge_block, mtr));
4271 
4272 	if (btr_cur_get_block(&parent_cursor)->page.id.page_no() == index->page
4273 	    && !page_has_siblings(btr_cur_get_page(&parent_cursor))
4274 	    && page_get_n_recs(btr_cur_get_page(&parent_cursor)) == 1) {
4275 		btr_lift_page_up(index, merge_block, mtr);
4276 	}
4277 }
4278 
4279 #ifdef UNIV_BTR_PRINT
4280 /*************************************************************//**
4281 Prints size info of a B-tree. */
4282 void
btr_print_size(dict_index_t * index)4283 btr_print_size(
4284 /*===========*/
4285 	dict_index_t*	index)	/*!< in: index tree */
4286 {
4287 	page_t*		root;
4288 	fseg_header_t*	seg;
4289 	mtr_t		mtr;
4290 
4291 	if (dict_index_is_ibuf(index)) {
4292 		fputs("Sorry, cannot print info of an ibuf tree:"
4293 		      " use ibuf functions\n", stderr);
4294 
4295 		return;
4296 	}
4297 
4298 	mtr_start(&mtr);
4299 
4300 	root = btr_root_get(index, &mtr);
4301 
4302 	seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4303 
4304 	fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
4305 	fseg_print(seg, &mtr);
4306 
4307 	if (!dict_index_is_ibuf(index)) {
4308 
4309 		seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
4310 
4311 		fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
4312 		fseg_print(seg, &mtr);
4313 	}
4314 
4315 	mtr_commit(&mtr);
4316 }
4317 
4318 /************************************************************//**
4319 Prints recursively index tree pages. */
4320 static
4321 void
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,rec_offs ** offsets,mtr_t * mtr)4322 btr_print_recursive(
4323 /*================*/
4324 	dict_index_t*	index,	/*!< in: index tree */
4325 	buf_block_t*	block,	/*!< in: index page */
4326 	ulint		width,	/*!< in: print this many entries from start
4327 				and end */
4328 	mem_heap_t**	heap,	/*!< in/out: heap for rec_get_offsets() */
4329 	rec_offs**	offsets,/*!< in/out: buffer for rec_get_offsets() */
4330 	mtr_t*		mtr)	/*!< in: mtr */
4331 {
4332 	const page_t*	page	= buf_block_get_frame(block);
4333 	page_cur_t	cursor;
4334 	ulint		n_recs;
4335 	ulint		i	= 0;
4336 	mtr_t		mtr2;
4337 
4338 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_SX_FIX));
4339 
4340 	ib::info() << "NODE ON LEVEL " << btr_page_get_level(page)
4341 		<< " page " << block->page.id;
4342 
4343 	page_print(block, index, width, width);
4344 
4345 	n_recs = page_get_n_recs(page);
4346 
4347 	page_cur_set_before_first(block, &cursor);
4348 	page_cur_move_to_next(&cursor);
4349 
4350 	while (!page_cur_is_after_last(&cursor)) {
4351 
4352 		if (page_is_leaf(page)) {
4353 
4354 			/* If this is the leaf level, do nothing */
4355 
4356 		} else if ((i <= width) || (i >= n_recs - width)) {
4357 
4358 			const rec_t*	node_ptr;
4359 
4360 			mtr_start(&mtr2);
4361 
4362 			node_ptr = page_cur_get_rec(&cursor);
4363 
4364 			*offsets = rec_get_offsets(
4365 				node_ptr, index, *offsets, 0,
4366 				ULINT_UNDEFINED, heap);
4367 			btr_print_recursive(index,
4368 					    btr_node_ptr_get_child(node_ptr,
4369 								   index,
4370 								   *offsets,
4371 								   &mtr2),
4372 					    width, heap, offsets, &mtr2);
4373 			mtr_commit(&mtr2);
4374 		}
4375 
4376 		page_cur_move_to_next(&cursor);
4377 		i++;
4378 	}
4379 }
4380 
4381 /**************************************************************//**
4382 Prints directories and other info of all nodes in the tree. */
4383 void
btr_print_index(dict_index_t * index,ulint width)4384 btr_print_index(
4385 /*============*/
4386 	dict_index_t*	index,	/*!< in: index */
4387 	ulint		width)	/*!< in: print this many entries from start
4388 				and end */
4389 {
4390 	mtr_t		mtr;
4391 	buf_block_t*	root;
4392 	mem_heap_t*	heap	= NULL;
4393 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
4394 	rec_offs*	offsets	= offsets_;
4395 	rec_offs_init(offsets_);
4396 
4397 	fputs("--------------------------\n"
4398 	      "INDEX TREE PRINT\n", stderr);
4399 
4400 	mtr_start(&mtr);
4401 
4402 	root = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4403 
4404 	btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
4405 	if (heap) {
4406 		mem_heap_free(heap);
4407 	}
4408 
4409 	mtr_commit(&mtr);
4410 
4411 	ut_ad(btr_validate_index(index, 0, false));
4412 }
4413 #endif /* UNIV_BTR_PRINT */
4414 
4415 #ifdef UNIV_DEBUG
4416 /************************************************************//**
4417 Checks that the node pointer to a page is appropriate.
4418 @return TRUE */
4419 ibool
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4420 btr_check_node_ptr(
4421 /*===============*/
4422 	dict_index_t*	index,	/*!< in: index tree */
4423 	buf_block_t*	block,	/*!< in: index page */
4424 	mtr_t*		mtr)	/*!< in: mtr */
4425 {
4426 	mem_heap_t*	heap;
4427 	dtuple_t*	tuple;
4428 	rec_offs*	offsets;
4429 	btr_cur_t	cursor;
4430 	page_t*		page = buf_block_get_frame(block);
4431 
4432 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4433 
4434 	if (dict_index_get_page(index) == block->page.id.page_no()) {
4435 
4436 		return(TRUE);
4437 	}
4438 
4439 	heap = mem_heap_create(256);
4440 
4441 	if (dict_index_is_spatial(index)) {
4442 		offsets = rtr_page_get_father_block(NULL, heap, index, block, mtr,
4443 						    NULL, &cursor);
4444 	} else {
4445 		offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
4446 						    &cursor);
4447 	}
4448 
4449 	if (page_is_leaf(page)) {
4450 
4451 		goto func_exit;
4452 	}
4453 
4454 	tuple = dict_index_build_node_ptr(
4455 		index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
4456 		btr_page_get_level(page));
4457 
4458 	/* For spatial index, the MBR in the parent rec could be different
4459 	with that of first rec of child, their relationship should be
4460 	"WITHIN" relationship */
4461 	if (dict_index_is_spatial(index)) {
4462 		ut_a(!cmp_dtuple_rec_with_gis(
4463 			tuple, btr_cur_get_rec(&cursor),
4464 			offsets, PAGE_CUR_WITHIN));
4465 	} else {
4466 		ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
4467 	}
4468 func_exit:
4469 	mem_heap_free(heap);
4470 
4471 	return(TRUE);
4472 }
4473 #endif /* UNIV_DEBUG */
4474 
4475 /************************************************************//**
4476 Display identification information for a record. */
4477 static
4478 void
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)4479 btr_index_rec_validate_report(
4480 /*==========================*/
4481 	const page_t*		page,	/*!< in: index page */
4482 	const rec_t*		rec,	/*!< in: index record */
4483 	const dict_index_t*	index)	/*!< in: index */
4484 {
4485 	ib::info() << "Record in index " << index->name
4486 		<< " of table " << index->table->name
4487 		<< ", page " << page_id_t(page_get_space_id(page),
4488 					  page_get_page_no(page))
4489 		<< ", at offset " << page_offset(rec);
4490 }
4491 
4492 /************************************************************//**
4493 Checks the size and number of fields in a record based on the definition of
4494 the index.
4495 @return TRUE if ok */
4496 ibool
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)4497 btr_index_rec_validate(
4498 /*===================*/
4499 	const rec_t*		rec,		/*!< in: index record */
4500 	const dict_index_t*	index,		/*!< in: index */
4501 	ibool			dump_on_error)	/*!< in: TRUE if the function
4502 						should print hex dump of record
4503 						and page on error */
4504 {
4505 	ulint		len;
4506 	const page_t*	page;
4507 	mem_heap_t*	heap	= NULL;
4508 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
4509 	rec_offs*	offsets	= offsets_;
4510 	rec_offs_init(offsets_);
4511 
4512 	page = page_align(rec);
4513 
4514 	ut_ad(index->n_core_fields);
4515 
4516 	if (index->is_ibuf()) {
4517 		/* The insert buffer index tree can contain records from any
4518 		other index: we cannot check the number of fields or
4519 		their length */
4520 
4521 		return(TRUE);
4522 	}
4523 
4524 #ifdef VIRTUAL_INDEX_DEBUG
4525 	if (dict_index_has_virtual(index)) {
4526 		fprintf(stderr, "index name is %s\n", index->name());
4527 	}
4528 #endif
4529 	if ((ibool)!!page_is_comp(page) != dict_table_is_comp(index->table)) {
4530 		btr_index_rec_validate_report(page, rec, index);
4531 
4532 		ib::error() << "Compact flag=" << !!page_is_comp(page)
4533 			<< ", should be " << dict_table_is_comp(index->table);
4534 
4535 		return(FALSE);
4536 	}
4537 
4538 	if (!page_is_comp(page)) {
4539 		const ulint n_rec_fields = rec_get_n_fields_old(rec);
4540 		if (n_rec_fields == DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD
4541 		    && index->id == DICT_INDEXES_ID) {
4542 			/* A record for older SYS_INDEXES table
4543 			(missing merge_threshold column) is acceptable. */
4544 		} else if (n_rec_fields < index->n_core_fields
4545 			   || n_rec_fields > index->n_fields) {
4546 			btr_index_rec_validate_report(page, rec, index);
4547 
4548 			ib::error() << "Has " << rec_get_n_fields_old(rec)
4549 				    << " fields, should have "
4550 				    << index->n_core_fields << ".."
4551 				    << index->n_fields;
4552 
4553 			if (dump_on_error) {
4554 				fputs("InnoDB: corrupt record ", stderr);
4555 				rec_print_old(stderr, rec);
4556 				putc('\n', stderr);
4557 			}
4558 			return(FALSE);
4559 		}
4560 	}
4561 
4562 	offsets = rec_get_offsets(rec, index, offsets, page_is_leaf(page)
4563 				  ? index->n_core_fields : 0,
4564 				  ULINT_UNDEFINED, &heap);
4565 
4566 	for (unsigned i = 0; i < index->n_fields; i++) {
4567 		dict_field_t*	field = dict_index_get_nth_field(index, i);
4568 		ulint		fixed_size = dict_col_get_fixed_size(
4569 						dict_field_get_col(field),
4570 						page_is_comp(page));
4571 
4572 		rec_get_nth_field_offs(offsets, i, &len);
4573 
4574 		if (rec_offs_nth_extern(offsets, i)) {
4575 
4576 			const byte* data = rec_get_nth_field(
4577 				rec, offsets, i, &len);
4578 			len -= BTR_EXTERN_FIELD_REF_SIZE;
4579 			ulint extern_len = mach_read_from_4(
4580 				data + len + BTR_EXTERN_LEN + 4);
4581 			if (fixed_size == extern_len) {
4582 				continue;
4583 			}
4584 		}
4585 
4586 		/* Note that if fixed_size != 0, it equals the
4587 		length of a fixed-size column in the clustered index.
4588 		We should adjust it here.
4589 		A prefix index of the column is of fixed, but different
4590 		length.  When fixed_size == 0, prefix_len is the maximum
4591 		length of the prefix index column. */
4592 
4593 		if (len_is_stored(len)
4594 		    && (field->prefix_len
4595 			? len > field->prefix_len
4596 			: (fixed_size && len != fixed_size))) {
4597 			btr_index_rec_validate_report(page, rec, index);
4598 
4599 			ib::error	error;
4600 
4601 			error << "Field " << i << " len is " << len
4602 				<< ", should be " << fixed_size;
4603 
4604 			if (dump_on_error) {
4605 				error << "; ";
4606 				rec_print(error.m_oss, rec,
4607 					  rec_get_info_bits(
4608 						  rec, rec_offs_comp(offsets)),
4609 					  offsets);
4610 			}
4611 			if (heap) {
4612 				mem_heap_free(heap);
4613 			}
4614 			return(FALSE);
4615 		}
4616 	}
4617 
4618 #ifdef VIRTUAL_INDEX_DEBUG
4619 	if (dict_index_has_virtual(index)) {
4620 		rec_print_new(stderr, rec, offsets);
4621 	}
4622 #endif
4623 
4624 	if (heap) {
4625 		mem_heap_free(heap);
4626 	}
4627 	return(TRUE);
4628 }
4629 
4630 /************************************************************//**
4631 Checks the size and number of fields in records based on the definition of
4632 the index.
4633 @return TRUE if ok */
4634 static
4635 ibool
btr_index_page_validate(buf_block_t * block,dict_index_t * index)4636 btr_index_page_validate(
4637 /*====================*/
4638 	buf_block_t*	block,	/*!< in: index page */
4639 	dict_index_t*	index)	/*!< in: index */
4640 {
4641 	page_cur_t	cur;
4642 	ibool		ret	= TRUE;
4643 #ifndef DBUG_OFF
4644 	ulint		nth	= 1;
4645 #endif /* !DBUG_OFF */
4646 
4647 	page_cur_set_before_first(block, &cur);
4648 
4649 	/* Directory slot 0 should only contain the infimum record. */
4650 	DBUG_EXECUTE_IF("check_table_rec_next",
4651 			ut_a(page_rec_get_nth_const(
4652 				     page_cur_get_page(&cur), 0)
4653 			     == cur.rec);
4654 			ut_a(page_dir_slot_get_n_owned(
4655 				     page_dir_get_nth_slot(
4656 					     page_cur_get_page(&cur), 0))
4657 			     == 1););
4658 
4659 	page_cur_move_to_next(&cur);
4660 
4661 	for (;;) {
4662 		if (page_cur_is_after_last(&cur)) {
4663 
4664 			break;
4665 		}
4666 
4667 		if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4668 
4669 			return(FALSE);
4670 		}
4671 
4672 		/* Verify that page_rec_get_nth_const() is correctly
4673 		retrieving each record. */
4674 		DBUG_EXECUTE_IF("check_table_rec_next",
4675 				ut_a(cur.rec == page_rec_get_nth_const(
4676 					     page_cur_get_page(&cur),
4677 					     page_rec_get_n_recs_before(
4678 						     cur.rec)));
4679 				ut_a(nth++ == page_rec_get_n_recs_before(
4680 					     cur.rec)););
4681 
4682 		page_cur_move_to_next(&cur);
4683 	}
4684 
4685 	return(ret);
4686 }
4687 
4688 /************************************************************//**
4689 Report an error on one page of an index tree. */
4690 static
4691 void
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4692 btr_validate_report1(
4693 /*=================*/
4694 	dict_index_t*		index,	/*!< in: index */
4695 	ulint			level,	/*!< in: B-tree level */
4696 	const buf_block_t*	block)	/*!< in: index page */
4697 {
4698 	ib::error	error;
4699 	error << "In page " << block->page.id.page_no()
4700 		<< " of index " << index->name
4701 		<< " of table " << index->table->name;
4702 
4703 	if (level > 0) {
4704 		error << ", index tree level " << level;
4705 	}
4706 }
4707 
4708 /************************************************************//**
4709 Report an error on two pages of an index tree. */
4710 static
4711 void
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4712 btr_validate_report2(
4713 /*=================*/
4714 	const dict_index_t*	index,	/*!< in: index */
4715 	ulint			level,	/*!< in: B-tree level */
4716 	const buf_block_t*	block1,	/*!< in: first index page */
4717 	const buf_block_t*	block2)	/*!< in: second index page */
4718 {
4719 	ib::error	error;
4720 	error << "In pages " << block1->page.id
4721 		<< " and " << block2->page.id << " of index " << index->name
4722 		<< " of table " << index->table->name;
4723 
4724 	if (level > 0) {
4725 		error << ", index tree level " << level;
4726 	}
4727 }
4728 
4729 /************************************************************//**
4730 Validates index tree level.
4731 @return TRUE if ok */
4732 static
4733 bool
btr_validate_level(dict_index_t * index,const trx_t * trx,ulint level,bool lockout)4734 btr_validate_level(
4735 /*===============*/
4736 	dict_index_t*	index,	/*!< in: index tree */
4737 	const trx_t*	trx,	/*!< in: transaction or NULL */
4738 	ulint		level,	/*!< in: level number */
4739 	bool		lockout)/*!< in: true if X-latch index is intended */
4740 {
4741 	buf_block_t*	block;
4742 	page_t*		page;
4743 	buf_block_t*	right_block = 0; /* remove warning */
4744 	page_t*		right_page = 0; /* remove warning */
4745 	page_t*		father_page;
4746 	btr_cur_t	node_cur;
4747 	btr_cur_t	right_node_cur;
4748 	rec_t*		rec;
4749 	ulint		right_page_no;
4750 	ulint		left_page_no;
4751 	page_cur_t	cursor;
4752 	dtuple_t*	node_ptr_tuple;
4753 	bool		ret	= true;
4754 	mtr_t		mtr;
4755 	mem_heap_t*	heap	= mem_heap_create(256);
4756 	rec_offs*	offsets	= NULL;
4757 	rec_offs*	offsets2= NULL;
4758 #ifdef UNIV_ZIP_DEBUG
4759 	page_zip_des_t*	page_zip;
4760 #endif /* UNIV_ZIP_DEBUG */
4761 	ulint		savepoint = 0;
4762 	ulint		savepoint2 = 0;
4763 	ulint		parent_page_no = FIL_NULL;
4764 	ulint		parent_right_page_no = FIL_NULL;
4765 	bool		rightmost_child = false;
4766 
4767 	mtr.start();
4768 
4769 	if (!srv_read_only_mode) {
4770 		if (lockout) {
4771 			mtr_x_lock_index(index, &mtr);
4772 		} else {
4773 			mtr_sx_lock_index(index, &mtr);
4774 		}
4775 	}
4776 
4777 	block = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4778 	page = buf_block_get_frame(block);
4779 
4780 	fil_space_t*		space	= index->table->space;
4781 	const page_size_t	table_page_size(
4782 		dict_table_page_size(index->table));
4783 	const page_size_t	space_page_size(space->flags);
4784 
4785 	if (!table_page_size.equals_to(space_page_size)) {
4786 
4787 		ib::warn() << "Flags mismatch: table=" << index->table->flags
4788 			<< ", tablespace=" << space->flags;
4789 
4790 		mtr_commit(&mtr);
4791 
4792 		return(false);
4793 	}
4794 
4795 	while (level != btr_page_get_level(page)) {
4796 		const rec_t*	node_ptr;
4797 
4798 		if (fseg_page_is_free(space, block->page.id.page_no())) {
4799 
4800 			btr_validate_report1(index, level, block);
4801 
4802 			ib::warn() << "Page is free";
4803 
4804 			ret = false;
4805 		}
4806 
4807 		ut_a(index->table->space_id == block->page.id.space());
4808 		ut_a(block->page.id.space() == page_get_space_id(page));
4809 #ifdef UNIV_ZIP_DEBUG
4810 		page_zip = buf_block_get_page_zip(block);
4811 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4812 #endif /* UNIV_ZIP_DEBUG */
4813 		ut_a(!page_is_leaf(page));
4814 
4815 		page_cur_set_before_first(block, &cursor);
4816 		page_cur_move_to_next(&cursor);
4817 
4818 		node_ptr = page_cur_get_rec(&cursor);
4819 		offsets = rec_get_offsets(node_ptr, index, offsets, 0,
4820 					  ULINT_UNDEFINED, &heap);
4821 
4822 		savepoint2 = mtr_set_savepoint(&mtr);
4823 		block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4824 		page = buf_block_get_frame(block);
4825 
4826 		/* For R-Tree, since record order might not be the same as
4827 		linked index page in the lower level, we need to travers
4828 		backwards to get the first page rec in this level.
4829 		This is only used for index validation. Spatial index
4830 		does not use such scan for any of its DML or query
4831 		operations  */
4832 		if (dict_index_is_spatial(index)) {
4833 			left_page_no = btr_page_get_prev(page);
4834 
4835 			while (left_page_no != FIL_NULL) {
4836 				/* To obey latch order of tree blocks,
4837 				we should release the right_block once to
4838 				obtain lock of the uncle block. */
4839 				mtr_release_block_at_savepoint(
4840 					&mtr, savepoint2, block);
4841 
4842 				savepoint2 = mtr_set_savepoint(&mtr);
4843 				block = btr_block_get(
4844 					page_id_t(index->table->space_id,
4845 						  left_page_no),
4846 					table_page_size,
4847 					RW_SX_LATCH, index, &mtr);
4848 				page = buf_block_get_frame(block);
4849 				left_page_no = btr_page_get_prev(page);
4850 			}
4851 		}
4852 	}
4853 
4854 	/* Now we are on the desired level. Loop through the pages on that
4855 	level. */
4856 
4857 loop:
4858 	mem_heap_empty(heap);
4859 	offsets = offsets2 = NULL;
4860 	if (!srv_read_only_mode) {
4861 		if (lockout) {
4862 			mtr_x_lock_index(index, &mtr);
4863 		} else {
4864 			mtr_sx_lock_index(index, &mtr);
4865 		}
4866 	}
4867 
4868 #ifdef UNIV_ZIP_DEBUG
4869 	page_zip = buf_block_get_page_zip(block);
4870 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4871 #endif /* UNIV_ZIP_DEBUG */
4872 
4873 	ut_a(block->page.id.space() == index->table->space_id);
4874 
4875 	if (fseg_page_is_free(space, block->page.id.page_no())) {
4876 
4877 		btr_validate_report1(index, level, block);
4878 
4879 		ib::warn() << "Page is marked as free";
4880 		ret = false;
4881 
4882 	} else if (btr_page_get_index_id(page) != index->id) {
4883 
4884 		ib::error() << "Page index id " << btr_page_get_index_id(page)
4885 			<< " != data dictionary index id " << index->id;
4886 
4887 		ret = false;
4888 
4889 	} else if (!page_validate(page, index)) {
4890 
4891 		btr_validate_report1(index, level, block);
4892 		ret = false;
4893 
4894 	} else if (level == 0 && !btr_index_page_validate(block, index)) {
4895 
4896 		/* We are on level 0. Check that the records have the right
4897 		number of fields, and field lengths are right. */
4898 
4899 		ret = false;
4900 	}
4901 
4902 	ut_a(btr_page_get_level(page) == level);
4903 
4904 	right_page_no = btr_page_get_next(page);
4905 	left_page_no = btr_page_get_prev(page);
4906 
4907 	ut_a(!page_is_empty(page)
4908 	     || (level == 0
4909 		 && page_get_page_no(page) == dict_index_get_page(index)));
4910 
4911 	if (right_page_no != FIL_NULL) {
4912 		const rec_t*	right_rec;
4913 		savepoint = mtr_set_savepoint(&mtr);
4914 
4915 		right_block = btr_block_get(
4916 			page_id_t(index->table->space_id, right_page_no),
4917 			table_page_size,
4918 			RW_SX_LATCH, index, &mtr);
4919 
4920 		right_page = buf_block_get_frame(right_block);
4921 
4922 		if (btr_page_get_prev(right_page) != page_get_page_no(page)) {
4923 			btr_validate_report2(index, level, block, right_block);
4924 			fputs("InnoDB: broken FIL_PAGE_NEXT"
4925 			      " or FIL_PAGE_PREV links\n", stderr);
4926 
4927 			ret = false;
4928 		}
4929 
4930 		if (page_is_comp(right_page) != page_is_comp(page)) {
4931 			btr_validate_report2(index, level, block, right_block);
4932 			fputs("InnoDB: 'compact' flag mismatch\n", stderr);
4933 
4934 			ret = false;
4935 
4936 			goto node_ptr_fails;
4937 		}
4938 
4939 		rec = page_rec_get_prev(page_get_supremum_rec(page));
4940 		right_rec = page_rec_get_next(page_get_infimum_rec(
4941 						      right_page));
4942 		offsets = rec_get_offsets(rec, index, offsets,
4943 					  page_is_leaf(page)
4944 					  ? index->n_core_fields : 0,
4945 					  ULINT_UNDEFINED, &heap);
4946 		offsets2 = rec_get_offsets(right_rec, index, offsets2,
4947 					   page_is_leaf(right_page)
4948 					   ? index->n_core_fields : 0,
4949 					   ULINT_UNDEFINED, &heap);
4950 
4951 		/* For spatial index, we cannot guarantee the key ordering
4952 		across pages, so skip the record compare verification for
4953 		now. Will enhanced in special R-Tree index validation scheme */
4954 		if (!dict_index_is_spatial(index)
4955 		    && cmp_rec_rec(rec, right_rec,
4956 				   offsets, offsets2, index) >= 0) {
4957 
4958 			btr_validate_report2(index, level, block, right_block);
4959 
4960 			fputs("InnoDB: records in wrong order"
4961 			      " on adjacent pages\n", stderr);
4962 
4963 			fputs("InnoDB: record ", stderr);
4964 			rec = page_rec_get_prev(page_get_supremum_rec(page));
4965 			rec_print(stderr, rec, index);
4966 			putc('\n', stderr);
4967 			fputs("InnoDB: record ", stderr);
4968 			rec = page_rec_get_next(
4969 				page_get_infimum_rec(right_page));
4970 			rec_print(stderr, rec, index);
4971 			putc('\n', stderr);
4972 
4973 			ret = false;
4974 		}
4975 	}
4976 
4977 	if (level > 0 && left_page_no == FIL_NULL) {
4978 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
4979 			     page_rec_get_next(page_get_infimum_rec(page)),
4980 			     page_is_comp(page)));
4981 	}
4982 
4983 	/* Similarly skip the father node check for spatial index for now,
4984 	for a couple of reasons:
4985 	1) As mentioned, there is no ordering relationship between records
4986 	in parent level and linked pages in the child level.
4987 	2) Search parent from root is very costly for R-tree.
4988 	We will add special validation mechanism for R-tree later (WL #7520) */
4989 	if (!dict_index_is_spatial(index)
4990 	    && block->page.id.page_no() != dict_index_get_page(index)) {
4991 
4992 		/* Check father node pointers */
4993 		rec_t*	node_ptr;
4994 
4995 		btr_cur_position(
4996 			index, page_rec_get_next(page_get_infimum_rec(page)),
4997 			block, &node_cur);
4998 		offsets = btr_page_get_father_node_ptr_for_validate(
4999 			offsets, heap, &node_cur, &mtr);
5000 
5001 		father_page = btr_cur_get_page(&node_cur);
5002 		node_ptr = btr_cur_get_rec(&node_cur);
5003 
5004 		parent_page_no = page_get_page_no(father_page);
5005 		parent_right_page_no = btr_page_get_next(father_page);
5006 		rightmost_child = page_rec_is_supremum(
5007 					page_rec_get_next(node_ptr));
5008 
5009 		btr_cur_position(
5010 			index,
5011 			page_rec_get_prev(page_get_supremum_rec(page)),
5012 			block, &node_cur);
5013 
5014 		offsets = btr_page_get_father_node_ptr_for_validate(
5015 				offsets, heap, &node_cur, &mtr);
5016 
5017 		if (node_ptr != btr_cur_get_rec(&node_cur)
5018 		    || btr_node_ptr_get_child_page_no(node_ptr, offsets)
5019 				     != block->page.id.page_no()) {
5020 
5021 			btr_validate_report1(index, level, block);
5022 
5023 			fputs("InnoDB: node pointer to the page is wrong\n",
5024 			      stderr);
5025 
5026 			fputs("InnoDB: node ptr ", stderr);
5027 			rec_print(stderr, node_ptr, index);
5028 
5029 			rec = btr_cur_get_rec(&node_cur);
5030 			fprintf(stderr, "\n"
5031 				"InnoDB: node ptr child page n:o "
5032 				ULINTPF "\n",
5033 				btr_node_ptr_get_child_page_no(rec, offsets));
5034 
5035 			fputs("InnoDB: record on page ", stderr);
5036 			rec_print_new(stderr, rec, offsets);
5037 			putc('\n', stderr);
5038 			ret = false;
5039 
5040 			goto node_ptr_fails;
5041 		}
5042 
5043 		if (!page_is_leaf(page)) {
5044 			node_ptr_tuple = dict_index_build_node_ptr(
5045 				index,
5046 				page_rec_get_next(page_get_infimum_rec(page)),
5047 				0, heap, btr_page_get_level(page));
5048 
5049 			if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
5050 					   offsets)) {
5051 				const rec_t* first_rec = page_rec_get_next(
5052 					page_get_infimum_rec(page));
5053 
5054 				btr_validate_report1(index, level, block);
5055 
5056 				ib::error() << "Node ptrs differ on levels > 0";
5057 
5058 				fputs("InnoDB: node ptr ",stderr);
5059 				rec_print_new(stderr, node_ptr, offsets);
5060 				fputs("InnoDB: first rec ", stderr);
5061 				rec_print(stderr, first_rec, index);
5062 				putc('\n', stderr);
5063 				ret = false;
5064 
5065 				goto node_ptr_fails;
5066 			}
5067 		}
5068 
5069 		if (left_page_no == FIL_NULL) {
5070 			ut_a(node_ptr == page_rec_get_next(
5071 				     page_get_infimum_rec(father_page)));
5072 			ut_a(!page_has_prev(father_page));
5073 		}
5074 
5075 		if (right_page_no == FIL_NULL) {
5076 			ut_a(node_ptr == page_rec_get_prev(
5077 				     page_get_supremum_rec(father_page)));
5078 			ut_a(!page_has_next(father_page));
5079 		} else {
5080 			const rec_t*	right_node_ptr;
5081 
5082 			right_node_ptr = page_rec_get_next(node_ptr);
5083 
5084 			if (!lockout && rightmost_child) {
5085 
5086 				/* To obey latch order of tree blocks,
5087 				we should release the right_block once to
5088 				obtain lock of the uncle block. */
5089 				mtr_release_block_at_savepoint(
5090 					&mtr, savepoint, right_block);
5091 
5092 				if (parent_right_page_no != FIL_NULL) {
5093 					btr_block_get(
5094 						page_id_t(index->table
5095 							  ->space_id,
5096 							  parent_right_page_no),
5097 						table_page_size,
5098 						RW_SX_LATCH, index, &mtr);
5099 				}
5100 
5101 				right_block = btr_block_get(
5102 					page_id_t(index->table->space_id,
5103 						  right_page_no),
5104 					table_page_size,
5105 					RW_SX_LATCH, index, &mtr);
5106 			}
5107 
5108 			btr_cur_position(
5109 				index, page_rec_get_next(
5110 					page_get_infimum_rec(
5111 						buf_block_get_frame(
5112 							right_block))),
5113 				right_block, &right_node_cur);
5114 
5115 			offsets = btr_page_get_father_node_ptr_for_validate(
5116 					offsets, heap, &right_node_cur, &mtr);
5117 
5118 			if (right_node_ptr
5119 			    != page_get_supremum_rec(father_page)) {
5120 
5121 				if (btr_cur_get_rec(&right_node_cur)
5122 				    != right_node_ptr) {
5123 					ret = false;
5124 					fputs("InnoDB: node pointer to"
5125 					      " the right page is wrong\n",
5126 					      stderr);
5127 
5128 					btr_validate_report1(index, level,
5129 							     block);
5130 				}
5131 			} else {
5132 				page_t*	right_father_page
5133 					= btr_cur_get_page(&right_node_cur);
5134 
5135 				if (btr_cur_get_rec(&right_node_cur)
5136 				    != page_rec_get_next(
5137 					    page_get_infimum_rec(
5138 						    right_father_page))) {
5139 					ret = false;
5140 					fputs("InnoDB: node pointer 2 to"
5141 					      " the right page is wrong\n",
5142 					      stderr);
5143 
5144 					btr_validate_report1(index, level,
5145 							     block);
5146 				}
5147 
5148 				if (page_get_page_no(right_father_page)
5149 				    != btr_page_get_next(father_page)) {
5150 
5151 					ret = false;
5152 					fputs("InnoDB: node pointer 3 to"
5153 					      " the right page is wrong\n",
5154 					      stderr);
5155 
5156 					btr_validate_report1(index, level,
5157 							     block);
5158 				}
5159 			}
5160 		}
5161 	}
5162 
5163 node_ptr_fails:
5164 	/* Commit the mini-transaction to release the latch on 'page'.
5165 	Re-acquire the latch on right_page, which will become 'page'
5166 	on the next loop.  The page has already been checked. */
5167 	mtr.commit();
5168 
5169 	if (trx_is_interrupted(trx)) {
5170 		/* On interrupt, return the current status. */
5171 	} else if (right_page_no != FIL_NULL) {
5172 
5173 		mtr.start();
5174 
5175 		if (!lockout) {
5176 			if (rightmost_child) {
5177 				if (parent_right_page_no != FIL_NULL) {
5178 					btr_block_get(
5179 						page_id_t(
5180 							index->table->space_id,
5181 							parent_right_page_no),
5182 						table_page_size,
5183 						RW_SX_LATCH, index, &mtr);
5184 				}
5185 			} else if (parent_page_no != FIL_NULL) {
5186 				btr_block_get(
5187 					page_id_t(index->table->space_id,
5188 						  parent_page_no),
5189 					table_page_size,
5190 					RW_SX_LATCH, index, &mtr);
5191 			}
5192 		}
5193 
5194 		block = btr_block_get(
5195 			page_id_t(index->table->space_id, right_page_no),
5196 			table_page_size,
5197 			RW_SX_LATCH, index, &mtr);
5198 
5199 		page = buf_block_get_frame(block);
5200 
5201 		goto loop;
5202 	}
5203 
5204 	mem_heap_free(heap);
5205 
5206 	return(ret);
5207 }
5208 
5209 /**************************************************************//**
5210 Do an index level validation of spaital index tree.
5211 @return	true if no error found */
5212 static
5213 bool
btr_validate_spatial_index(dict_index_t * index,const trx_t * trx)5214 btr_validate_spatial_index(
5215 /*=======================*/
5216 	dict_index_t*	index,	/*!< in: index */
5217 	const trx_t*	trx)	/*!< in: transaction or NULL */
5218 {
5219 
5220 	mtr_t	mtr;
5221 	bool	ok = true;
5222 
5223 	mtr.start();
5224 
5225 	mtr_x_lock_index(index, &mtr);
5226 
5227 	page_t*	root = btr_root_get(index, &mtr);
5228 	ulint	n = btr_page_get_level(root);
5229 
5230 #ifdef UNIV_RTR_DEBUG
5231 	fprintf(stderr, "R-tree level is %lu\n", n);
5232 #endif /* UNIV_RTR_DEBUG */
5233 
5234 	for (ulint i = 0; i <= n; ++i) {
5235 #ifdef UNIV_RTR_DEBUG
5236 		fprintf(stderr, "Level %lu:\n", n - i);
5237 #endif /* UNIV_RTR_DEBUG */
5238 
5239 		if (!btr_validate_level(index, trx, n - i, true)) {
5240 			ok = false;
5241 			break;
5242 		}
5243 	}
5244 
5245 	mtr.commit();
5246 
5247 	return(ok);
5248 }
5249 
5250 /**************************************************************//**
5251 Checks the consistency of an index tree.
5252 @return	DB_SUCCESS if ok, error code if not */
5253 dberr_t
btr_validate_index(dict_index_t * index,const trx_t * trx,bool lockout)5254 btr_validate_index(
5255 /*===============*/
5256 	dict_index_t*	index,	/*!< in: index */
5257 	const trx_t*	trx,	/*!< in: transaction or NULL */
5258 	bool		lockout)/*!< in: true if X-latch index is intended */
5259 {
5260 	dberr_t err = DB_SUCCESS;
5261 
5262 	/* Full Text index are implemented by auxiliary tables,
5263 	not the B-tree */
5264 	if (dict_index_is_online_ddl(index) || (index->type & DICT_FTS)) {
5265 		return(err);
5266 	}
5267 
5268 	if (dict_index_is_spatial(index)) {
5269 		if(!btr_validate_spatial_index(index, trx)) {
5270 			err = DB_ERROR;
5271 		}
5272 		return(err);
5273 	}
5274 
5275 	mtr_t		mtr;
5276 
5277 	mtr_start(&mtr);
5278 
5279 	if (!srv_read_only_mode) {
5280 		if (lockout) {
5281 			mtr_x_lock_index(index, &mtr);
5282 		} else {
5283 			mtr_sx_lock_index(index, &mtr);
5284 		}
5285 	}
5286 
5287 	page_t*	root = btr_root_get(index, &mtr);
5288 
5289 	if (!root) {
5290 		err = DB_CORRUPTION;
5291 		mtr_commit(&mtr);
5292 		return err;
5293 	}
5294 
5295 	ulint	n = btr_page_get_level(root);
5296 
5297 	for (ulint i = 0; i <= n; ++i) {
5298 
5299 		if (!btr_validate_level(index, trx, n - i, lockout)) {
5300 			err = DB_CORRUPTION;
5301 		}
5302 	}
5303 
5304 	mtr_commit(&mtr);
5305 
5306 	return(err);
5307 }
5308 
5309 /**************************************************************//**
5310 Checks if the page in the cursor can be merged with given page.
5311 If necessary, re-organize the merge_page.
5312 @return	true if possible to merge. */
5313 static
5314 bool
btr_can_merge_with_page(btr_cur_t * cursor,ulint page_no,buf_block_t ** merge_block,mtr_t * mtr)5315 btr_can_merge_with_page(
5316 /*====================*/
5317 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
5318 	ulint		page_no,	/*!< in: a sibling page */
5319 	buf_block_t**	merge_block,	/*!< out: the merge block */
5320 	mtr_t*		mtr)		/*!< in: mini-transaction */
5321 {
5322 	dict_index_t*	index;
5323 	page_t*		page;
5324 	ulint		n_recs;
5325 	ulint		data_size;
5326 	ulint		max_ins_size_reorg;
5327 	ulint		max_ins_size;
5328 	buf_block_t*	mblock;
5329 	page_t*		mpage;
5330 	DBUG_ENTER("btr_can_merge_with_page");
5331 
5332 	if (page_no == FIL_NULL) {
5333 		*merge_block = NULL;
5334 		DBUG_RETURN(false);
5335 	}
5336 
5337 	index = btr_cur_get_index(cursor);
5338 	page = btr_cur_get_page(cursor);
5339 
5340 	const page_id_t		page_id(index->table->space_id, page_no);
5341 	const page_size_t	page_size(index->table->space->flags);
5342 
5343 	mblock = btr_block_get(page_id, page_size, RW_X_LATCH, index, mtr);
5344 	mpage = buf_block_get_frame(mblock);
5345 
5346 	n_recs = page_get_n_recs(page);
5347 	data_size = page_get_data_size(page);
5348 
5349 	max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
5350 		mpage, n_recs);
5351 
5352 	if (data_size > max_ins_size_reorg) {
5353 		goto error;
5354 	}
5355 
5356 	/* If compression padding tells us that merging will result in
5357 	too packed up page i.e.: which is likely to cause compression
5358 	failure then don't merge the pages. */
5359 	if (page_size.is_compressed() && page_is_leaf(mpage)
5360 	    && (page_get_data_size(mpage) + data_size
5361 		>= dict_index_zip_pad_optimal_page_size(index))) {
5362 
5363 		goto error;
5364 	}
5365 
5366 	max_ins_size = page_get_max_insert_size(mpage, n_recs);
5367 
5368 	if (data_size > max_ins_size) {
5369 
5370 		/* We have to reorganize mpage */
5371 
5372 		if (!btr_page_reorganize_block(
5373 			    false, page_zip_level, mblock, index, mtr)) {
5374 
5375 			goto error;
5376 		}
5377 
5378 		max_ins_size = page_get_max_insert_size(mpage, n_recs);
5379 
5380 		ut_ad(page_validate(mpage, index));
5381 		ut_ad(max_ins_size == max_ins_size_reorg);
5382 
5383 		if (data_size > max_ins_size) {
5384 
5385 			/* Add fault tolerance, though this should
5386 			never happen */
5387 
5388 			goto error;
5389 		}
5390 	}
5391 
5392 	*merge_block = mblock;
5393 	DBUG_RETURN(true);
5394 
5395 error:
5396 	*merge_block = NULL;
5397 	DBUG_RETURN(false);
5398 }
5399