1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2014, 2021, MariaDB Corporation.
6 
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10 
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18 
19 *****************************************************************************/
20 
21 /**************************************************//**
22 @file btr/btr0btr.cc
23 The B-tree
24 
25 Created 6/2/1994 Heikki Tuuri
26 *******************************************************/
27 
28 #include "btr0btr.h"
29 
30 #include "page0page.h"
31 #include "page0zip.h"
32 #include "gis0rtree.h"
33 
34 #include "btr0cur.h"
35 #include "btr0sea.h"
36 #include "btr0pcur.h"
37 #include "btr0defragment.h"
38 #include "rem0cmp.h"
39 #include "lock0lock.h"
40 #include "ibuf0ibuf.h"
41 #include "trx0trx.h"
42 #include "srv0mon.h"
43 #include "gis0geo.h"
44 #include "dict0boot.h"
45 #include "row0sel.h" /* row_search_max_autoinc() */
46 
47 Atomic_counter<uint32_t> btr_validate_index_running;
48 
49 /**************************************************************//**
50 Checks if the page in the cursor can be merged with given page.
51 If necessary, re-organize the merge_page.
52 @return	true if possible to merge. */
53 static
54 bool
55 btr_can_merge_with_page(
56 /*====================*/
57 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
58 	uint32_t	page_no,	/*!< in: a sibling page */
59 	buf_block_t**	merge_block,	/*!< out: the merge block */
60 	mtr_t*		mtr);		/*!< in: mini-transaction */
61 
62 /** Report that an index page is corrupted.
63 @param[in]	buffer block
64 @param[in]	index tree */
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)65 void btr_corruption_report(const buf_block_t* block, const dict_index_t* index)
66 {
67 	ib::fatal()
68 		<< "Flag mismatch in page " << block->page.id()
69 		<< " index " << index->name
70 		<< " of table " << index->table->name;
71 }
72 
73 /*
74 Latching strategy of the InnoDB B-tree
75 --------------------------------------
76 
77 Node pointer page latches acquisition is protected by index->lock latch.
78 
79 Before MariaDB 10.2.2, all node pointer pages were protected by index->lock
80 either in S (shared) or X (exclusive) mode and block->lock was not acquired on
81 node pointer pages.
82 
83 After MariaDB 10.2.2, block->lock S-latch or X-latch is used to protect
84 node pointer pages and obtaiment of node pointer page latches is protected by
85 index->lock.
86 
87 (0) Definition: B-tree level.
88 
89 (0.1) The leaf pages of the B-tree are at level 0.
90 
91 (0.2) The parent of a page at level L has level L+1. (The level of the
92 root page is equal to the tree height.)
93 
94 (0.3) The B-tree lock (index->lock) is the parent of the root page and
95 has a level = tree height + 1.
96 
97 Index->lock has 3 possible locking modes:
98 
99 (1) S-latch:
100 
101 (1.1) All latches for pages must be obtained in descending order of tree level.
102 
103 (1.2) Before obtaining the first node pointer page latch at a given B-tree
104 level, parent latch must be held (at level +1 ).
105 
106 (1.3) If a node pointer page is already latched at the same level
107 we can only obtain latch to its right sibling page latch at the same level.
108 
109 (1.4) Release of the node pointer page latches must be done in
110 child-to-parent order. (Prevents deadlocks when obtained index->lock
111 in SX mode).
112 
113 (1.4.1) Level L node pointer page latch can be released only when
114 no latches at children level i.e. level < L are hold.
115 
116 (1.4.2) All latches from node pointer pages must be released so
117 that no latches are obtained between.
118 
119 (1.5) [implied by (1.1), (1.2)] Root page latch must be first node pointer
120 latch obtained.
121 
122 (2) SX-latch:
123 
124 In this case rules (1.2) and (1.3) from S-latch case are relaxed and
125 merged into (2.2) and rule (1.4) is removed. Thus, latch acquisition
126 can be skipped at some tree levels and latches can be obtained in
127 a less restricted order.
128 
129 (2.1) [identical to (1.1)]: All latches for pages must be obtained in descending
130 order of tree level.
131 
132 (2.2) When a node pointer latch at level L is obtained,
133 the left sibling page latch in the same level or some ancestor
134 page latch (at level > L) must be hold.
135 
136 (2.3) [implied by (2.1), (2.2)] The first node pointer page latch obtained can
137 be any node pointer page.
138 
139 (3) X-latch:
140 
141 Node pointer latches can be obtained in any order.
142 
143 NOTE: New rules after MariaDB 10.2.2 does not affect the latching rules of leaf pages:
144 
145 index->lock S-latch is needed in read for the node pointer traversal. When the leaf
146 level is reached, index-lock can be released (and with the MariaDB 10.2.2 changes, all
147 node pointer latches). Left to right index travelsal in leaf page level can be safely done
148 by obtaining right sibling leaf page latch and then releasing the old page latch.
149 
150 Single leaf page modifications (BTR_MODIFY_LEAF) are protected by index->lock
151 S-latch.
152 
153 B-tree operations involving page splits or merges (BTR_MODIFY_TREE) and page
154 allocations are protected by index->lock X-latch.
155 
156 Node pointers
157 -------------
158 Leaf pages of a B-tree contain the index records stored in the
159 tree. On levels n > 0 we store 'node pointers' to pages on level
160 n - 1. For each page there is exactly one node pointer stored:
161 thus the our tree is an ordinary B-tree, not a B-link tree.
162 
163 A node pointer contains a prefix P of an index record. The prefix
164 is long enough so that it determines an index record uniquely.
165 The file page number of the child page is added as the last
166 field. To the child page we can store node pointers or index records
167 which are >= P in the alphabetical order, but < P1 if there is
168 a next node pointer on the level, and P1 is its prefix.
169 
170 If a node pointer with a prefix P points to a non-leaf child,
171 then the leftmost record in the child must have the same
172 prefix P. If it points to a leaf node, the child is not required
173 to contain any record with a prefix equal to P. The leaf case
174 is decided this way to allow arbitrary deletions in a leaf node
175 without touching upper levels of the tree.
176 
177 We have predefined a special minimum record which we
178 define as the smallest record in any alphabetical order.
179 A minimum record is denoted by setting a bit in the record
180 header. A minimum record acts as the prefix of a node pointer
181 which points to a leftmost node on any level of the tree.
182 
183 File page allocation
184 --------------------
185 In the root node of a B-tree there are two file segment headers.
186 The leaf pages of a tree are allocated from one file segment, to
187 make them consecutive on disk if possible. From the other file segment
188 we allocate pages for the non-leaf levels of the tree.
189 */
190 
191 #ifdef UNIV_BTR_DEBUG
192 /**************************************************************//**
193 Checks a file segment header within a B-tree root page.
194 @return TRUE if valid */
195 static
196 ibool
btr_root_fseg_validate(const fseg_header_t * seg_header,ulint space)197 btr_root_fseg_validate(
198 /*===================*/
199 	const fseg_header_t*	seg_header,	/*!< in: segment header */
200 	ulint			space)		/*!< in: tablespace identifier */
201 {
202 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
203 
204 	ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
205 	ut_a(offset >= FIL_PAGE_DATA);
206 	ut_a(offset <= srv_page_size - FIL_PAGE_DATA_END);
207 	return(TRUE);
208 }
209 #endif /* UNIV_BTR_DEBUG */
210 
211 /**************************************************************//**
212 Gets the root node of a tree and x- or s-latches it.
213 @return root page, x- or s-latched */
214 buf_block_t*
btr_root_block_get(const dict_index_t * index,rw_lock_type_t mode,mtr_t * mtr)215 btr_root_block_get(
216 /*===============*/
217 	const dict_index_t*	index,	/*!< in: index tree */
218 	rw_lock_type_t		mode,	/*!< in: either RW_S_LATCH
219 					or RW_X_LATCH */
220 	mtr_t*			mtr)	/*!< in: mtr */
221 {
222 	if (!index->table || !index->table->space) {
223 		return NULL;
224 	}
225 
226 	buf_block_t* block = btr_block_get(*index, index->page, mode, false,
227 					   mtr);
228 
229 	if (!block) {
230 		index->table->file_unreadable = true;
231 
232 		ib_push_warning(
233 			static_cast<THD*>(NULL), DB_DECRYPTION_FAILED,
234 			"Table %s in file %s is encrypted but encryption service or"
235 			" used key_id is not available. "
236 			" Can't continue reading table.",
237 			index->table->name.m_name,
238 			UT_LIST_GET_FIRST(index->table->space->chain)->name);
239 
240 		return NULL;
241 	}
242 
243 	btr_assert_not_corrupted(block, index);
244 
245 #ifdef UNIV_BTR_DEBUG
246 	if (!dict_index_is_ibuf(index)) {
247 		const page_t*	root = buf_block_get_frame(block);
248 
249 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
250 					    + root, index->table->space_id));
251 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
252 					    + root, index->table->space_id));
253 	}
254 #endif /* UNIV_BTR_DEBUG */
255 
256 	return(block);
257 }
258 
259 /**************************************************************//**
260 Gets the root node of a tree and sx-latches it for segment access.
261 @return root page, sx-latched */
262 page_t*
btr_root_get(const dict_index_t * index,mtr_t * mtr)263 btr_root_get(
264 /*=========*/
265 	const dict_index_t*	index,	/*!< in: index tree */
266 	mtr_t*			mtr)	/*!< in: mtr */
267 {
268 	/* Intended to be used for segment list access.
269 	SX lock doesn't block reading user data by other threads.
270 	And block the segment list access by others.*/
271 	buf_block_t* root = btr_root_block_get(index, RW_SX_LATCH, mtr);
272 	return(root ? buf_block_get_frame(root) : NULL);
273 }
274 
275 /**************************************************************//**
276 Gets the height of the B-tree (the level of the root, when the leaf
277 level is assumed to be 0). The caller must hold an S or X latch on
278 the index.
279 @return tree height (level of the root) */
280 ulint
btr_height_get(const dict_index_t * index,mtr_t * mtr)281 btr_height_get(
282 /*===========*/
283 	const dict_index_t*	index,	/*!< in: index tree */
284 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
285 {
286 	ulint		height=0;
287 	buf_block_t*	root_block;
288 
289 	ut_ad(srv_read_only_mode
290 	      || mtr->memo_contains_flagged(&index->lock, MTR_MEMO_S_LOCK
291 					    | MTR_MEMO_X_LOCK
292 					    | MTR_MEMO_SX_LOCK));
293 
294 	/* S latches the page */
295 	root_block = btr_root_block_get(index, RW_S_LATCH, mtr);
296 
297 	if (root_block) {
298 		height = btr_page_get_level(buf_block_get_frame(root_block));
299 
300 		/* Release the S latch on the root page. */
301 		mtr->memo_release(root_block, MTR_MEMO_PAGE_S_FIX);
302 
303 		ut_d(sync_check_unlock(&root_block->lock));
304 	}
305 
306 	return(height);
307 }
308 
309 /**************************************************************//**
310 Checks a file segment header within a B-tree root page and updates
311 the segment header space id.
312 @return TRUE if valid */
313 static
314 bool
btr_root_fseg_adjust_on_import(fseg_header_t * seg_header,page_zip_des_t * page_zip,ulint space)315 btr_root_fseg_adjust_on_import(
316 /*===========================*/
317 	fseg_header_t*	seg_header,	/*!< in/out: segment header */
318 	page_zip_des_t*	page_zip,	/*!< in/out: compressed page,
319 					or NULL */
320 	ulint		space)		/*!< in: tablespace identifier */
321 {
322 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
323 
324 	if (offset < FIL_PAGE_DATA
325 	    || offset > srv_page_size - FIL_PAGE_DATA_END) {
326 		return false;
327 	}
328 
329 	seg_header += FSEG_HDR_SPACE;
330 
331 	mach_write_to_4(seg_header, space);
332 	if (UNIV_LIKELY_NULL(page_zip)) {
333 		memcpy(page_zip->data + page_offset(seg_header), seg_header,
334 		       4);
335 	}
336 
337 	return true;
338 }
339 
340 /**************************************************************//**
341 Checks and adjusts the root node of a tree during IMPORT TABLESPACE.
342 @return error code, or DB_SUCCESS */
343 dberr_t
btr_root_adjust_on_import(const dict_index_t * index)344 btr_root_adjust_on_import(
345 /*======================*/
346 	const dict_index_t*	index)	/*!< in: index tree */
347 {
348 	dberr_t			err;
349 	mtr_t			mtr;
350 	page_t*			page;
351 	page_zip_des_t*		page_zip;
352 	dict_table_t*		table = index->table;
353 
354 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_3",
355 			return(DB_CORRUPTION););
356 
357 	mtr_start(&mtr);
358 
359 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
360 
361 	buf_block_t* block = buf_page_get_gen(
362 		page_id_t(table->space->id, index->page),
363 		table->space->zip_size(), RW_X_LATCH, NULL, BUF_GET,
364 		__FILE__, __LINE__,
365 		&mtr, &err);
366 	if (!block) {
367 		ut_ad(err != DB_SUCCESS);
368 		goto func_exit;
369 	}
370 
371 	buf_block_dbg_add_level(block, SYNC_TREE_NODE);
372 
373 	page = buf_block_get_frame(block);
374 	page_zip = buf_block_get_page_zip(block);
375 
376 	if (!fil_page_index_page_check(page) || page_has_siblings(page)) {
377 		err = DB_CORRUPTION;
378 
379 	} else if (dict_index_is_clust(index)) {
380 		bool	page_is_compact_format;
381 
382 		page_is_compact_format = page_is_comp(page) > 0;
383 
384 		/* Check if the page format and table format agree. */
385 		if (page_is_compact_format != dict_table_is_comp(table)) {
386 			err = DB_CORRUPTION;
387 		} else {
388 			/* Check that the table flags and the tablespace
389 			flags match. */
390 			ulint tf = dict_tf_to_fsp_flags(table->flags);
391 			ulint sf = table->space->flags;
392 			sf &= ~FSP_FLAGS_MEM_MASK;
393 			tf &= ~FSP_FLAGS_MEM_MASK;
394 			if (fil_space_t::is_flags_equal(tf, sf)
395 			    || fil_space_t::is_flags_equal(sf, tf)) {
396 				mutex_enter(&fil_system.mutex);
397 				table->space->flags = (table->space->flags
398 						       & ~FSP_FLAGS_MEM_MASK)
399 					| (tf & FSP_FLAGS_MEM_MASK);
400 				mutex_exit(&fil_system.mutex);
401 				err = DB_SUCCESS;
402 			} else {
403 				err = DB_CORRUPTION;
404 			}
405 		}
406 	} else {
407 		err = DB_SUCCESS;
408 	}
409 
410 	/* Check and adjust the file segment headers, if all OK so far. */
411 	if (err == DB_SUCCESS
412 	    && (!btr_root_fseg_adjust_on_import(
413 			FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
414 			+ page, page_zip, table->space_id)
415 		|| !btr_root_fseg_adjust_on_import(
416 			FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
417 			+ page, page_zip, table->space_id))) {
418 
419 		err = DB_CORRUPTION;
420 	}
421 
422 func_exit:
423 	mtr_commit(&mtr);
424 
425 	return(err);
426 }
427 
428 /**************************************************************//**
429 Creates a new index page (not the root, and also not
430 used in page reorganization).  @see btr_page_empty(). */
431 void
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)432 btr_page_create(
433 /*============*/
434 	buf_block_t*	block,	/*!< in/out: page to be created */
435 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
436 	dict_index_t*	index,	/*!< in: index */
437 	ulint		level,	/*!< in: the B-tree level of the page */
438 	mtr_t*		mtr)	/*!< in: mtr */
439 {
440   ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
441   byte *index_id= my_assume_aligned<2>(PAGE_HEADER + PAGE_INDEX_ID +
442                                        block->frame);
443 
444   if (UNIV_LIKELY_NULL(page_zip))
445   {
446     mach_write_to_8(index_id, index->id);
447     page_create_zip(block, index, level, 0, mtr);
448   }
449   else
450   {
451     page_create(block, mtr, dict_table_is_comp(index->table));
452     if (index->is_spatial())
453     {
454       static_assert(((FIL_PAGE_INDEX & 0xff00) | byte(FIL_PAGE_RTREE)) ==
455                     FIL_PAGE_RTREE, "compatibility");
456       mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
457                     byte(FIL_PAGE_RTREE));
458       if (mach_read_from_8(block->frame + FIL_RTREE_SPLIT_SEQ_NUM))
459         mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM, 8, 0);
460     }
461     /* Set the level of the new index page */
462     mtr->write<2,mtr_t::MAYBE_NOP>(*block,
463                                    my_assume_aligned<2>(PAGE_HEADER +
464                                                         PAGE_LEVEL +
465                                                         block->frame), level);
466     mtr->write<8,mtr_t::MAYBE_NOP>(*block, index_id, index->id);
467   }
468 }
469 
470 /**************************************************************//**
471 Allocates a new file page to be used in an ibuf tree. Takes the page from
472 the free list of the tree, which must contain pages!
473 @return new allocated block, x-latched */
474 static
475 buf_block_t*
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)476 btr_page_alloc_for_ibuf(
477 /*====================*/
478 	dict_index_t*	index,	/*!< in: index tree */
479 	mtr_t*		mtr)	/*!< in: mtr */
480 {
481 	buf_block_t*	new_block;
482 
483 	buf_block_t* root = btr_root_block_get(index, RW_SX_LATCH, mtr);
484 
485 	fil_addr_t node_addr = flst_get_first(PAGE_HEADER
486 					      + PAGE_BTR_IBUF_FREE_LIST
487 					      + root->frame);
488 	ut_a(node_addr.page != FIL_NULL);
489 
490 	new_block = buf_page_get(
491 		page_id_t(index->table->space_id, node_addr.page),
492 		index->table->space->zip_size(),
493 		RW_X_LATCH, mtr);
494 
495 	buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
496 
497 	flst_remove(root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
498 		    new_block, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
499 		    mtr);
500 	ut_d(flst_validate(root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr));
501 
502 	return(new_block);
503 }
504 
505 /**************************************************************//**
506 Allocates a new file page to be used in an index tree. NOTE: we assume
507 that the caller has made the reservation for free extents!
508 @retval NULL if no page could be allocated */
509 static MY_ATTRIBUTE((nonnull, warn_unused_result))
510 buf_block_t*
btr_page_alloc_low(dict_index_t * index,uint32_t hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)511 btr_page_alloc_low(
512 /*===============*/
513 	dict_index_t*	index,		/*!< in: index */
514 	uint32_t	hint_page_no,	/*!< in: hint of a good page */
515 	byte		file_direction,	/*!< in: direction where a possible
516 					page split is made */
517 	ulint		level,		/*!< in: level where the page is placed
518 					in the tree */
519 	mtr_t*		mtr,		/*!< in/out: mini-transaction
520 					for the allocation */
521 	mtr_t*		init_mtr)	/*!< in/out: mtr or another
522 					mini-transaction in which the
523 					page should be initialized. */
524 {
525 	page_t* root = btr_root_get(index, mtr);
526 
527 	fseg_header_t* seg_header = (level
528 				     ? PAGE_HEADER + PAGE_BTR_SEG_TOP
529 				     : PAGE_HEADER + PAGE_BTR_SEG_LEAF)
530 		+ root;
531 
532 	/* Parameter TRUE below states that the caller has made the
533 	reservation for free extents, and thus we know that a page can
534 	be allocated: */
535 
536 	buf_block_t* block = fseg_alloc_free_page_general(
537 		seg_header, hint_page_no, file_direction,
538 		true, mtr, init_mtr);
539 
540 	return block;
541 }
542 
543 /**************************************************************//**
544 Allocates a new file page to be used in an index tree. NOTE: we assume
545 that the caller has made the reservation for free extents!
546 @retval NULL if no page could be allocated */
547 buf_block_t*
btr_page_alloc(dict_index_t * index,uint32_t hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)548 btr_page_alloc(
549 /*===========*/
550 	dict_index_t*	index,		/*!< in: index */
551 	uint32_t	hint_page_no,	/*!< in: hint of a good page */
552 	byte		file_direction,	/*!< in: direction where a possible
553 					page split is made */
554 	ulint		level,		/*!< in: level where the page is placed
555 					in the tree */
556 	mtr_t*		mtr,		/*!< in/out: mini-transaction
557 					for the allocation */
558 	mtr_t*		init_mtr)	/*!< in/out: mini-transaction
559 					for x-latching and initializing
560 					the page */
561 {
562 	buf_block_t*	new_block;
563 
564 	if (dict_index_is_ibuf(index)) {
565 
566 		return(btr_page_alloc_for_ibuf(index, mtr));
567 	}
568 
569 	new_block = btr_page_alloc_low(
570 		index, hint_page_no, file_direction, level, mtr, init_mtr);
571 
572 	if (new_block) {
573 		buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
574 	}
575 
576 	return(new_block);
577 }
578 
579 /**************************************************************//**
580 Gets the number of pages in a B-tree.
581 @return number of pages, or ULINT_UNDEFINED if the index is unavailable */
582 ulint
btr_get_size(const dict_index_t * index,ulint flag,mtr_t * mtr)583 btr_get_size(
584 /*=========*/
585 	const dict_index_t*	index,	/*!< in: index */
586 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
587 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
588 				is s-latched */
589 {
590 	ulint		n=0;
591 
592 	ut_ad(srv_read_only_mode
593 	      || mtr->memo_contains(index->lock, MTR_MEMO_S_LOCK));
594 	ut_ad(flag == BTR_N_LEAF_PAGES || flag == BTR_TOTAL_SIZE);
595 
596 	if (index->page == FIL_NULL
597 	    || dict_index_is_online_ddl(index)
598 	    || !index->is_committed()
599 	    || !index->table->space) {
600 		return(ULINT_UNDEFINED);
601 	}
602 
603 	buf_block_t* root = btr_root_block_get(index, RW_SX_LATCH, mtr);
604 	if (!root) {
605 		return ULINT_UNDEFINED;
606 	}
607 	mtr_x_lock_space(index->table->space, mtr);
608 	if (flag == BTR_N_LEAF_PAGES) {
609 		fseg_n_reserved_pages(*root, PAGE_HEADER + PAGE_BTR_SEG_LEAF
610 				      + root->frame, &n, mtr);
611 	} else {
612 		ulint dummy;
613 		n = fseg_n_reserved_pages(*root, PAGE_HEADER + PAGE_BTR_SEG_TOP
614 					  + root->frame, &dummy, mtr);
615 		n += fseg_n_reserved_pages(*root,
616 					   PAGE_HEADER + PAGE_BTR_SEG_LEAF
617 					   + root->frame, &dummy, mtr);
618 	}
619 
620 	return(n);
621 }
622 
623 /**************************************************************//**
624 Gets the number of reserved and used pages in a B-tree.
625 @return	number of pages reserved, or ULINT_UNDEFINED if the index
626 is unavailable */
627 UNIV_INTERN
628 ulint
btr_get_size_and_reserved(dict_index_t * index,ulint flag,ulint * used,mtr_t * mtr)629 btr_get_size_and_reserved(
630 /*======================*/
631 	dict_index_t*	index,	/*!< in: index */
632 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
633 	ulint*		used,	/*!< out: number of pages used (<= reserved) */
634 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
635 				is s-latched */
636 {
637 	ulint		dummy;
638 
639 	ut_ad(mtr->memo_contains(index->lock, MTR_MEMO_S_LOCK));
640 	ut_a(flag == BTR_N_LEAF_PAGES || flag == BTR_TOTAL_SIZE);
641 
642 	if (index->page == FIL_NULL
643 	    || dict_index_is_online_ddl(index)
644 	    || !index->is_committed()
645 	    || !index->table->space) {
646 		return(ULINT_UNDEFINED);
647 	}
648 
649 	buf_block_t* root = btr_root_block_get(index, RW_SX_LATCH, mtr);
650 	*used = 0;
651 	if (!root) {
652 		return ULINT_UNDEFINED;
653 	}
654 
655 	mtr_x_lock_space(index->table->space, mtr);
656 
657 	ulint n = fseg_n_reserved_pages(*root, PAGE_HEADER + PAGE_BTR_SEG_LEAF
658 					+ root->frame, used, mtr);
659 	if (flag == BTR_TOTAL_SIZE) {
660 		n += fseg_n_reserved_pages(*root,
661 					   PAGE_HEADER + PAGE_BTR_SEG_TOP
662 					   + root->frame, &dummy, mtr);
663 		*used += dummy;
664 	}
665 
666 	return(n);
667 }
668 
669 /**************************************************************//**
670 Frees a page used in an ibuf tree. Puts the page to the free list of the
671 ibuf tree. */
672 static
673 void
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)674 btr_page_free_for_ibuf(
675 /*===================*/
676 	dict_index_t*	index,	/*!< in: index tree */
677 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
678 	mtr_t*		mtr)	/*!< in: mtr */
679 {
680 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
681 
682 	buf_block_t* root = btr_root_block_get(index, RW_SX_LATCH, mtr);
683 
684 	flst_add_first(root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
685 		       block, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
686 
687 	ut_d(flst_validate(root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr));
688 }
689 
690 /** Free an index page.
691 @param[in,out]	index	index tree
692 @param[in,out]	block	block to be freed
693 @param[in,out]	mtr	mini-transaction
694 @param[in]	blob	whether this is freeing a BLOB page */
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr,bool blob)695 void btr_page_free(dict_index_t* index, buf_block_t* block, mtr_t* mtr,
696 		   bool blob)
697 {
698 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
699 #ifdef BTR_CUR_HASH_ADAPT
700 	if (block->index && !block->index->freed()) {
701 		ut_ad(!blob);
702 		ut_ad(page_is_leaf(block->frame));
703 	}
704 #endif
705 	const page_id_t id(block->page.id());
706 	ut_ad(index->table->space_id == id.space());
707 	/* The root page is freed by btr_free_root(). */
708 	ut_ad(id.page_no() != index->page);
709 	ut_ad(mtr->is_named_space(index->table->space));
710 
711 	/* The page gets invalid for optimistic searches: increment the frame
712 	modify clock */
713 
714 	buf_block_modify_clock_inc(block);
715 
716 	if (dict_index_is_ibuf(index)) {
717 		btr_page_free_for_ibuf(index, block, mtr);
718 		return;
719 	}
720 
721 	/* TODO: Discard any operations for block from mtr->log.
722 	The page will be freed, so previous changes to it by this
723 	mini-transaction should not matter. */
724 	page_t* root = btr_root_get(index, mtr);
725 	fseg_header_t* seg_header = &root[blob || page_is_leaf(block->frame)
726 					  ? PAGE_HEADER + PAGE_BTR_SEG_LEAF
727 					  : PAGE_HEADER + PAGE_BTR_SEG_TOP];
728 	fil_space_t* space= index->table->space;
729 	const uint32_t page= id.page_no();
730 
731 	fseg_free_page(seg_header, space, page, mtr);
732 	buf_page_free(space, page, mtr, __FILE__, __LINE__);
733 
734 	/* The page was marked free in the allocation bitmap, but it
735 	should remain exclusively latched until mtr_t::commit() or until it
736 	is explicitly freed from the mini-transaction. */
737 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
738 }
739 
740 /** Set the child page number in a node pointer record.
741 @param[in,out]  block   non-leaf index page
742 @param[in,out]  rec     node pointer record in the page
743 @param[in]      offsets rec_get_offsets(rec)
744 @param[in]      page_no child page number
745 @param[in,out]  mtr     mini-transaction
746 Sets the child node file address in a node pointer. */
btr_node_ptr_set_child_page_no(buf_block_t * block,rec_t * rec,const rec_offs * offsets,ulint page_no,mtr_t * mtr)747 inline void btr_node_ptr_set_child_page_no(buf_block_t *block,
748                                            rec_t *rec, const rec_offs *offsets,
749                                            ulint page_no, mtr_t *mtr)
750 {
751   ut_ad(rec_offs_validate(rec, NULL, offsets));
752   ut_ad(!page_rec_is_leaf(rec));
753   ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
754 
755   const ulint offs= rec_offs_data_size(offsets);
756   ut_ad(rec_offs_nth_size(offsets, rec_offs_n_fields(offsets) - 1) ==
757         REC_NODE_PTR_SIZE);
758 
759   if (UNIV_LIKELY_NULL(block->page.zip.data))
760     page_zip_write_node_ptr(block, rec, offs, page_no, mtr);
761   else
762     mtr->write<4>(*block, rec + offs - REC_NODE_PTR_SIZE, page_no);
763 }
764 
765 /************************************************************//**
766 Returns the child page of a node pointer and sx-latches it.
767 @return child page, sx-latched */
768 static
769 buf_block_t*
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const rec_offs * offsets,mtr_t * mtr)770 btr_node_ptr_get_child(
771 /*===================*/
772 	const rec_t*	node_ptr,/*!< in: node pointer */
773 	dict_index_t*	index,	/*!< in: index */
774 	const rec_offs*	offsets,/*!< in: array returned by rec_get_offsets() */
775 	mtr_t*		mtr)	/*!< in: mtr */
776 {
777 	ut_ad(rec_offs_validate(node_ptr, index, offsets));
778 	ut_ad(index->table->space_id
779 	      == page_get_space_id(page_align(node_ptr)));
780 
781 	return btr_block_get(
782 		*index, btr_node_ptr_get_child_page_no(node_ptr, offsets),
783 		RW_SX_LATCH, btr_page_get_level(page_align(node_ptr)) == 1,
784 		mtr);
785 }
786 
787 /************************************************************//**
788 Returns the upper level node pointer to a page. It is assumed that mtr holds
789 an sx-latch on the tree.
790 @return rec_get_offsets() of the node pointer record */
791 static
792 rec_offs*
btr_page_get_father_node_ptr_func(rec_offs * offsets,mem_heap_t * heap,btr_cur_t * cursor,ulint latch_mode,const char * file,unsigned line,mtr_t * mtr)793 btr_page_get_father_node_ptr_func(
794 /*==============================*/
795 	rec_offs*	offsets,/*!< in: work area for the return value */
796 	mem_heap_t*	heap,	/*!< in: memory heap to use */
797 	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
798 				out: cursor on node pointer record,
799 				its page x-latched */
800 	ulint		latch_mode,/*!< in: BTR_CONT_MODIFY_TREE
801 				or BTR_CONT_SEARCH_TREE */
802 	const char*	file,	/*!< in: file name */
803 	unsigned	line,	/*!< in: line where called */
804 	mtr_t*		mtr)	/*!< in: mtr */
805 {
806 	dtuple_t*	tuple;
807 	rec_t*		user_rec;
808 	rec_t*		node_ptr;
809 	ulint		level;
810 	ulint		page_no;
811 	dict_index_t*	index;
812 
813 	ut_ad(latch_mode == BTR_CONT_MODIFY_TREE
814 	      || latch_mode == BTR_CONT_SEARCH_TREE);
815 
816 	page_no = btr_cur_get_block(cursor)->page.id().page_no();
817 	index = btr_cur_get_index(cursor);
818 	ut_ad(!dict_index_is_spatial(index));
819 
820 	ut_ad(srv_read_only_mode
821 	      || mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
822 					    | MTR_MEMO_SX_LOCK));
823 
824 	ut_ad(dict_index_get_page(index) != page_no);
825 
826 	level = btr_page_get_level(btr_cur_get_page(cursor));
827 
828 	user_rec = btr_cur_get_rec(cursor);
829 	ut_a(page_rec_is_user_rec(user_rec));
830 
831 	tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
832 	dberr_t err = DB_SUCCESS;
833 
834 	err = btr_cur_search_to_nth_level(
835 		index, level + 1, tuple,
836 		PAGE_CUR_LE, latch_mode, cursor, 0,
837 		file, line, mtr);
838 
839 	if (err != DB_SUCCESS) {
840 		ib::warn() << " Error code: " << err
841 			<< " btr_page_get_father_node_ptr_func "
842 			<< " level: " << level + 1
843 			<< " called from file: "
844 			<< file << " line: " << line
845 			<< " table: " << index->table->name
846 			<< " index: " << index->name();
847 	}
848 
849 	node_ptr = btr_cur_get_rec(cursor);
850 
851 	offsets = rec_get_offsets(node_ptr, index, offsets, 0,
852 				  ULINT_UNDEFINED, &heap);
853 
854 	if (btr_node_ptr_get_child_page_no(node_ptr, offsets) != page_no) {
855 		rec_t*	print_rec;
856 
857 		ib::error()
858 			<< "Corruption of an index tree: table "
859 			<< index->table->name
860 			<< " index " << index->name
861 			<< ", father ptr page no "
862 			<< btr_node_ptr_get_child_page_no(node_ptr, offsets)
863 			<< ", child page no " << page_no;
864 
865 		print_rec = page_rec_get_next(
866 			page_get_infimum_rec(page_align(user_rec)));
867 		offsets = rec_get_offsets(print_rec, index, offsets,
868 					  page_rec_is_leaf(user_rec)
869 					  ? index->n_core_fields : 0,
870 					  ULINT_UNDEFINED, &heap);
871 		page_rec_print(print_rec, offsets);
872 		offsets = rec_get_offsets(node_ptr, index, offsets, 0,
873 					  ULINT_UNDEFINED, &heap);
874 		page_rec_print(node_ptr, offsets);
875 
876 		ib::fatal()
877 			<< "You should dump + drop + reimport the table to"
878 			<< " fix the corruption. If the crash happens at"
879 			<< " database startup. " << FORCE_RECOVERY_MSG
880 			<< " Then dump + drop + reimport.";
881 	}
882 
883 	return(offsets);
884 }
885 
886 #define btr_page_get_father_node_ptr(of,heap,cur,mtr)			\
887 	btr_page_get_father_node_ptr_func(				\
888 		of,heap,cur,BTR_CONT_MODIFY_TREE,__FILE__,__LINE__,mtr)
889 
890 #define btr_page_get_father_node_ptr_for_validate(of,heap,cur,mtr)	\
891 	btr_page_get_father_node_ptr_func(				\
892 		of,heap,cur,BTR_CONT_SEARCH_TREE,__FILE__,__LINE__,mtr)
893 
894 /************************************************************//**
895 Returns the upper level node pointer to a page. It is assumed that mtr holds
896 an x-latch on the tree.
897 @return rec_get_offsets() of the node pointer record */
898 static
899 rec_offs*
btr_page_get_father_block(rec_offs * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)900 btr_page_get_father_block(
901 /*======================*/
902 	rec_offs*	offsets,/*!< in: work area for the return value */
903 	mem_heap_t*	heap,	/*!< in: memory heap to use */
904 	dict_index_t*	index,	/*!< in: b-tree index */
905 	buf_block_t*	block,	/*!< in: child page in the index */
906 	mtr_t*		mtr,	/*!< in: mtr */
907 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
908 				its page x-latched */
909 {
910 	rec_t*	rec
911 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
912 								 block)));
913 	btr_cur_position(index, rec, block, cursor);
914 	return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
915 }
916 
917 /** Seek to the parent page of a B-tree page.
918 @param[in,out]	index	b-tree
919 @param[in]	block	child page
920 @param[in,out]	mtr	mini-transaction
921 @param[out]	cursor	cursor pointing to the x-latched parent page */
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)922 void btr_page_get_father(dict_index_t* index, buf_block_t* block, mtr_t* mtr,
923 			 btr_cur_t* cursor)
924 {
925 	mem_heap_t*	heap;
926 	rec_t*		rec
927 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
928 								 block)));
929 	btr_cur_position(index, rec, block, cursor);
930 
931 	heap = mem_heap_create(100);
932 	btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
933 	mem_heap_free(heap);
934 }
935 
936 #ifdef UNIV_DEBUG
937 /** PAGE_INDEX_ID value for freed index B-trees */
938 constexpr index_id_t	BTR_FREED_INDEX_ID = 0;
939 #endif
940 
941 /** Free a B-tree root page. btr_free_but_not_root() must already
942 have been called.
943 In a persistent tablespace, the caller must invoke fsp_init_file_page()
944 before mtr.commit().
945 @param[in,out]	block		index root page
946 @param[in,out]	mtr		mini-transaction */
btr_free_root(buf_block_t * block,mtr_t * mtr)947 static void btr_free_root(buf_block_t *block, mtr_t *mtr)
948 {
949   ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX |
950                                    MTR_MEMO_PAGE_SX_FIX));
951   ut_ad(mtr->is_named_space(block->page.id().space()));
952 
953   btr_search_drop_page_hash_index(block);
954 
955 #ifdef UNIV_BTR_DEBUG
956   ut_a(btr_root_fseg_validate(PAGE_HEADER + PAGE_BTR_SEG_TOP + block->frame,
957 			      block->page.id().space()));
958 #endif /* UNIV_BTR_DEBUG */
959 
960   /* Free the entire segment in small steps. */
961   while (!fseg_free_step(PAGE_HEADER + PAGE_BTR_SEG_TOP + block->frame, mtr));
962 }
963 
964 /** Prepare to free a B-tree.
965 @param[in]	page_id		page id
966 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
967 @param[in]	index_id	PAGE_INDEX_ID contents
968 @param[in,out]	mtr		mini-transaction
969 @return root block, to invoke btr_free_but_not_root() and btr_free_root()
970 @retval NULL if the page is no longer a matching B-tree page */
971 static MY_ATTRIBUTE((warn_unused_result))
972 buf_block_t*
btr_free_root_check(const page_id_t page_id,ulint zip_size,index_id_t index_id,mtr_t * mtr)973 btr_free_root_check(
974 	const page_id_t		page_id,
975 	ulint			zip_size,
976 	index_id_t		index_id,
977 	mtr_t*			mtr)
978 {
979 	ut_ad(page_id.space() != SRV_TMP_SPACE_ID);
980 	ut_ad(index_id != BTR_FREED_INDEX_ID);
981 
982 	buf_block_t*	block = buf_page_get(
983 		page_id, zip_size, RW_X_LATCH, mtr);
984 
985 	if (block) {
986 		buf_block_dbg_add_level(block, SYNC_TREE_NODE);
987 
988 		if (fil_page_index_page_check(block->frame)
989 		    && index_id == btr_page_get_index_id(block->frame)) {
990 			/* This should be a root page.
991 			It should not be possible to reassign the same
992 			index_id for some other index in the tablespace. */
993 			ut_ad(!page_has_siblings(block->frame));
994 		} else {
995 			block = NULL;
996 		}
997 	}
998 
999 	return(block);
1000 }
1001 
1002 /** Create the root node for a new index tree.
1003 @param[in]	type			type of the index
1004 @param[in]	index_id		index id
1005 @param[in,out]	space			tablespace where created
1006 @param[in]	index			index, or NULL to create a system table
1007 @param[in,out]	mtr			mini-transaction
1008 @return	page number of the created root
1009 @retval	FIL_NULL	if did not succeed */
1010 uint32_t
btr_create(ulint type,fil_space_t * space,index_id_t index_id,dict_index_t * index,mtr_t * mtr)1011 btr_create(
1012 	ulint			type,
1013 	fil_space_t*		space,
1014 	index_id_t		index_id,
1015 	dict_index_t*		index,
1016 	mtr_t*			mtr)
1017 {
1018 	buf_block_t*		block;
1019 
1020 	ut_ad(mtr->is_named_space(space));
1021 	ut_ad(index_id != BTR_FREED_INDEX_ID);
1022 
1023 	/* Create the two new segments (one, in the case of an ibuf tree) for
1024 	the index tree; the segment headers are put on the allocated root page
1025 	(for an ibuf tree, not in the root, but on a separate ibuf header
1026 	page) */
1027 
1028 	if (UNIV_UNLIKELY(type & DICT_IBUF)) {
1029 		/* Allocate first the ibuf header page */
1030 		buf_block_t*	ibuf_hdr_block = fseg_create(
1031 			space, IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
1032 
1033 		if (ibuf_hdr_block == NULL) {
1034 			return(FIL_NULL);
1035 		}
1036 
1037 		buf_block_dbg_add_level(
1038 			ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
1039 
1040 		ut_ad(ibuf_hdr_block->page.id().page_no()
1041 		      == IBUF_HEADER_PAGE_NO);
1042 		/* Allocate then the next page to the segment: it will be the
1043 		tree root page */
1044 
1045 		block = fseg_alloc_free_page(
1046 			buf_block_get_frame(ibuf_hdr_block)
1047 			+ IBUF_HEADER + IBUF_TREE_SEG_HEADER,
1048 			IBUF_TREE_ROOT_PAGE_NO,
1049 			FSP_UP, mtr);
1050 
1051 		if (block == NULL) {
1052 			return(FIL_NULL);
1053 		}
1054 
1055 		ut_ad(block->page.id() == page_id_t(0,IBUF_TREE_ROOT_PAGE_NO));
1056 
1057 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
1058 
1059 		flst_init(block, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
1060 	} else {
1061 		block = fseg_create(space, PAGE_HEADER + PAGE_BTR_SEG_TOP,
1062 				    mtr);
1063 
1064 		if (block == NULL) {
1065 			return(FIL_NULL);
1066 		}
1067 
1068 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1069 
1070 		if (!fseg_create(space, PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr,
1071 				 false, block)) {
1072 			/* Not enough space for new segment, free root
1073 			segment before return. */
1074 			btr_free_root(block, mtr);
1075 			return(FIL_NULL);
1076 		}
1077 
1078 		/* The fseg create acquires a second latch on the page,
1079 		therefore we must declare it: */
1080 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1081 	}
1082 
1083 	ut_ad(!page_has_siblings(block->frame));
1084 
1085 	constexpr uint16_t field = PAGE_HEADER + PAGE_INDEX_ID;
1086 
1087 	byte* page_index_id = my_assume_aligned<2>(field + block->frame);
1088 
1089 	/* Create a new index page on the allocated segment page */
1090 	if (UNIV_LIKELY_NULL(block->page.zip.data)) {
1091 		mach_write_to_8(page_index_id, index_id);
1092 		ut_ad(!page_has_siblings(block->page.zip.data));
1093 		page_create_zip(block, index, 0, 0, mtr);
1094 	} else {
1095 		page_create(block, mtr,
1096 			    index && index->table->not_redundant());
1097 		if (index && index->is_spatial()) {
1098 			static_assert(((FIL_PAGE_INDEX & 0xff00)
1099 				       | byte(FIL_PAGE_RTREE))
1100 				      == FIL_PAGE_RTREE, "compatibility");
1101 			mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
1102 				      byte(FIL_PAGE_RTREE));
1103 			if (mach_read_from_8(block->frame
1104 					     + FIL_RTREE_SPLIT_SEQ_NUM)) {
1105 				mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM,
1106 					    8, 0);
1107 			}
1108 		}
1109 		/* Set the level of the new index page */
1110 		mtr->write<2,mtr_t::MAYBE_NOP>(*block, PAGE_HEADER + PAGE_LEVEL
1111 					       + block->frame, 0U);
1112 		mtr->write<8,mtr_t::MAYBE_NOP>(*block, page_index_id,
1113 					       index_id);
1114 	}
1115 
1116 	/* We reset the free bits for the page in a separate
1117 	mini-transaction to allow creation of several trees in the
1118 	same mtr, otherwise the latch on a bitmap page would prevent
1119 	it because of the latching order.
1120 
1121 	Note: Insert Buffering is disabled for temporary tables given that
1122 	most temporary tables are smaller in size and short-lived. */
1123 	if (!(type & DICT_CLUSTERED)
1124 	    && (!index || !index->table->is_temporary())) {
1125 		ibuf_reset_free_bits(block);
1126 	}
1127 
1128 	/* In the following assertion we test that two records of maximum
1129 	allowed size fit on the root page: this fact is needed to ensure
1130 	correctness of split algorithms */
1131 
1132 	ut_ad(page_get_max_insert_size(block->frame, 2)
1133 	      > 2 * BTR_PAGE_MAX_REC_SIZE);
1134 
1135 	return(block->page.id().page_no());
1136 }
1137 
1138 /** Free a B-tree except the root page. The root page MUST be freed after
1139 this by calling btr_free_root.
1140 @param[in,out]	block		root page
1141 @param[in]	log_mode	mtr logging mode */
1142 static
1143 void
btr_free_but_not_root(buf_block_t * block,mtr_log_t log_mode)1144 btr_free_but_not_root(
1145 	buf_block_t*	block,
1146 	mtr_log_t	log_mode)
1147 {
1148 	mtr_t	mtr;
1149 
1150 	ut_ad(fil_page_index_page_check(block->frame));
1151 	ut_ad(!page_has_siblings(block->frame));
1152 leaf_loop:
1153 	mtr_start(&mtr);
1154 	mtr_set_log_mode(&mtr, log_mode);
1155 	mtr.set_named_space_id(block->page.id().space());
1156 
1157 	page_t*	root = block->frame;
1158 
1159 	if (!root) {
1160 		mtr_commit(&mtr);
1161 		return;
1162 	}
1163 
1164 #ifdef UNIV_BTR_DEBUG
1165 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1166 				    + root, block->page.id().space()));
1167 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1168 				    + root, block->page.id().space()));
1169 #endif /* UNIV_BTR_DEBUG */
1170 
1171 	/* NOTE: page hash indexes are dropped when a page is freed inside
1172 	fsp0fsp. */
1173 
1174 	bool finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
1175 				       &mtr);
1176 	mtr_commit(&mtr);
1177 
1178 	if (!finished) {
1179 
1180 		goto leaf_loop;
1181 	}
1182 top_loop:
1183 	mtr_start(&mtr);
1184 	mtr_set_log_mode(&mtr, log_mode);
1185 	mtr.set_named_space_id(block->page.id().space());
1186 
1187 	root = block->frame;
1188 
1189 #ifdef UNIV_BTR_DEBUG
1190 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1191 				    + root, block->page.id().space()));
1192 #endif /* UNIV_BTR_DEBUG */
1193 
1194 	finished = fseg_free_step_not_header(
1195 		root + PAGE_HEADER + PAGE_BTR_SEG_TOP, &mtr);
1196 	mtr_commit(&mtr);
1197 
1198 	if (!finished) {
1199 		goto top_loop;
1200 	}
1201 }
1202 
1203 /** Free a persistent index tree if it exists.
1204 @param[in]	page_id		root page id
1205 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
1206 @param[in]	index_id	PAGE_INDEX_ID contents
1207 @param[in,out]	mtr		mini-transaction */
1208 void
btr_free_if_exists(const page_id_t page_id,ulint zip_size,index_id_t index_id,mtr_t * mtr)1209 btr_free_if_exists(
1210 	const page_id_t		page_id,
1211 	ulint			zip_size,
1212 	index_id_t		index_id,
1213 	mtr_t*			mtr)
1214 {
1215 	buf_block_t* root = btr_free_root_check(
1216 		page_id, zip_size, index_id, mtr);
1217 
1218 	if (root == NULL) {
1219 		return;
1220 	}
1221 
1222 	btr_free_but_not_root(root, mtr->get_log_mode());
1223 	mtr->set_named_space_id(page_id.space());
1224 	btr_free_root(root, mtr);
1225 }
1226 
1227 /** Free an index tree in a temporary tablespace.
1228 @param[in]	page_id		root page id */
btr_free(const page_id_t page_id)1229 void btr_free(const page_id_t page_id)
1230 {
1231 	mtr_t		mtr;
1232 	mtr.start();
1233 	mtr.set_log_mode(MTR_LOG_NO_REDO);
1234 
1235 	buf_block_t*	block = buf_page_get(page_id, 0, RW_X_LATCH, &mtr);
1236 
1237 	if (block) {
1238 		btr_free_but_not_root(block, MTR_LOG_NO_REDO);
1239 		btr_free_root(block, &mtr);
1240 	}
1241 	mtr.commit();
1242 }
1243 
1244 /** Read the last used AUTO_INCREMENT value from PAGE_ROOT_AUTO_INC.
1245 @param[in,out]	index	clustered index
1246 @return	the last used AUTO_INCREMENT value
1247 @retval	0 on error or if no AUTO_INCREMENT value was used yet */
1248 ib_uint64_t
btr_read_autoinc(dict_index_t * index)1249 btr_read_autoinc(dict_index_t* index)
1250 {
1251 	ut_ad(index->is_primary());
1252 	ut_ad(index->table->persistent_autoinc);
1253 	ut_ad(!index->table->is_temporary());
1254 	mtr_t		mtr;
1255 	mtr.start();
1256 	ib_uint64_t	autoinc;
1257 	if (buf_block_t* block = buf_page_get(
1258 		    page_id_t(index->table->space_id, index->page),
1259 		    index->table->space->zip_size(),
1260 		    RW_S_LATCH, &mtr)) {
1261 		autoinc = page_get_autoinc(block->frame);
1262 	} else {
1263 		autoinc = 0;
1264 	}
1265 	mtr.commit();
1266 	return autoinc;
1267 }
1268 
1269 /** Read the last used AUTO_INCREMENT value from PAGE_ROOT_AUTO_INC,
1270 or fall back to MAX(auto_increment_column).
1271 @param[in]	table	table containing an AUTO_INCREMENT column
1272 @param[in]	col_no	index of the AUTO_INCREMENT column
1273 @return	the AUTO_INCREMENT value
1274 @retval	0 on error or if no AUTO_INCREMENT value was used yet */
1275 ib_uint64_t
btr_read_autoinc_with_fallback(const dict_table_t * table,unsigned col_no)1276 btr_read_autoinc_with_fallback(const dict_table_t* table, unsigned col_no)
1277 {
1278 	ut_ad(table->persistent_autoinc);
1279 	ut_ad(!table->is_temporary());
1280 
1281 	dict_index_t*	index = dict_table_get_first_index(table);
1282 
1283 	if (index == NULL) {
1284 		return 0;
1285 	}
1286 
1287 	mtr_t		mtr;
1288 	mtr.start();
1289 	buf_block_t*	block = buf_page_get(
1290 		page_id_t(index->table->space_id, index->page),
1291 		index->table->space->zip_size(),
1292 		RW_S_LATCH, &mtr);
1293 
1294 	ib_uint64_t	autoinc	= block ? page_get_autoinc(block->frame) : 0;
1295 	const bool	retry	= block && autoinc == 0
1296 		&& !page_is_empty(block->frame);
1297 	mtr.commit();
1298 
1299 	if (retry) {
1300 		/* This should be an old data file where
1301 		PAGE_ROOT_AUTO_INC was initialized to 0.
1302 		Fall back to reading MAX(autoinc_col).
1303 		There should be an index on it. */
1304 		const dict_col_t*	autoinc_col
1305 			= dict_table_get_nth_col(table, col_no);
1306 		while (index && index->fields[0].col != autoinc_col) {
1307 			index = dict_table_get_next_index(index);
1308 		}
1309 
1310 		if (index) {
1311 			autoinc = row_search_max_autoinc(index);
1312 		}
1313 	}
1314 
1315 	return autoinc;
1316 }
1317 
1318 /** Write the next available AUTO_INCREMENT value to PAGE_ROOT_AUTO_INC.
1319 @param[in,out]	index	clustered index
1320 @param[in]	autoinc	the AUTO_INCREMENT value
1321 @param[in]	reset	whether to reset the AUTO_INCREMENT
1322 			to a possibly smaller value than currently
1323 			exists in the page */
1324 void
btr_write_autoinc(dict_index_t * index,ib_uint64_t autoinc,bool reset)1325 btr_write_autoinc(dict_index_t* index, ib_uint64_t autoinc, bool reset)
1326 {
1327 	ut_ad(index->is_primary());
1328 	ut_ad(index->table->persistent_autoinc);
1329 	ut_ad(!index->table->is_temporary());
1330 
1331 	mtr_t		mtr;
1332 	mtr.start();
1333 	fil_space_t* space = index->table->space;
1334 	mtr.set_named_space(space);
1335 	page_set_autoinc(buf_page_get(page_id_t(space->id, index->page),
1336 				      space->zip_size(),
1337 				      RW_SX_LATCH, &mtr),
1338 			 autoinc, &mtr, reset);
1339 	mtr.commit();
1340 }
1341 
1342 /** Reorganize an index page.
1343 @param cursor      index page cursor
1344 @param index       the index that the cursor belongs to
1345 @param mtr         mini-transaction */
btr_page_reorganize_low(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1346 static void btr_page_reorganize_low(page_cur_t *cursor, dict_index_t *index,
1347                                     mtr_t *mtr)
1348 {
1349   const mtr_log_t log_mode= mtr->set_log_mode(MTR_LOG_NO_REDO);
1350 
1351   buf_block_t *const block= cursor->block;
1352 
1353   ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
1354   ut_ad(!is_buf_block_get_page_zip(block));
1355   btr_assert_not_corrupted(block, index);
1356   ut_ad(fil_page_index_page_check(block->frame));
1357   ut_ad(index->is_dummy ||
1358         block->page.id().space() == index->table->space->id);
1359   ut_ad(index->is_dummy || block->page.id().page_no() != index->page ||
1360         !page_has_siblings(block->frame));
1361 
1362   buf_block_t *old= buf_block_alloc();
1363   /* Copy the old page to temporary space */
1364   memcpy_aligned<UNIV_PAGE_SIZE_MIN>(old->frame, block->frame, srv_page_size);
1365 
1366   btr_search_drop_page_hash_index(block);
1367 
1368   /* Save the cursor position. */
1369   const ulint pos= page_rec_get_n_recs_before(cursor->rec);
1370 
1371   page_create(block, mtr, index->table->not_redundant());
1372   if (index->is_spatial())
1373     block->frame[FIL_PAGE_TYPE + 1]= byte(FIL_PAGE_RTREE);
1374 
1375   static_assert(((FIL_PAGE_INDEX & 0xff00) | byte(FIL_PAGE_RTREE)) ==
1376                 FIL_PAGE_RTREE, "compatibility");
1377 
1378   /* Copy the records from the temporary space to the recreated page;
1379   do not copy the lock bits yet */
1380 
1381   page_copy_rec_list_end_no_locks(block, old, page_get_infimum_rec(old->frame),
1382                                   index, mtr);
1383 
1384   /* Copy the PAGE_MAX_TRX_ID or PAGE_ROOT_AUTO_INC. */
1385   ut_ad(!page_get_max_trx_id(block->frame));
1386   memcpy_aligned<8>(PAGE_MAX_TRX_ID + PAGE_HEADER + block->frame,
1387                     PAGE_MAX_TRX_ID + PAGE_HEADER + old->frame, 8);
1388 #ifdef UNIV_DEBUG
1389   if (page_get_max_trx_id(block->frame))
1390     /* PAGE_MAX_TRX_ID must be zero on non-leaf pages other than
1391     clustered index root pages. */
1392     ut_ad(dict_index_is_sec_or_ibuf(index)
1393           ? page_is_leaf(block->frame)
1394           : block->page.id().page_no() == index->page);
1395   else
1396     /* PAGE_MAX_TRX_ID is unused in clustered index pages (other than
1397     the root where it is repurposed as PAGE_ROOT_AUTO_INC), non-leaf
1398     pages, and in temporary tables.  It was always zero-initialized in
1399     page_create().  PAGE_MAX_TRX_ID must be nonzero on
1400     dict_index_is_sec_or_ibuf() leaf pages. */
1401     ut_ad(index->table->is_temporary() || !page_is_leaf(block->frame) ||
1402           !dict_index_is_sec_or_ibuf(index));
1403 #endif
1404 
1405   const uint16_t data_size1= page_get_data_size(old->frame);
1406   const uint16_t data_size2= page_get_data_size(block->frame);
1407   const ulint max1= page_get_max_insert_size_after_reorganize(old->frame, 1);
1408   const ulint max2= page_get_max_insert_size_after_reorganize(block->frame, 1);
1409 
1410   if (UNIV_UNLIKELY(data_size1 != data_size2 || max1 != max2))
1411     ib::fatal() << "Page old data size " << data_size1
1412                 << " new data size " << data_size2
1413                 << ", page old max ins size " << max1
1414                 << " new max ins size " << max2;
1415 
1416   /* Restore the cursor position. */
1417   if (pos)
1418     cursor->rec = page_rec_get_nth(block->frame, pos);
1419   else
1420     ut_ad(cursor->rec == page_get_infimum_rec(block->frame));
1421 
1422   if (block->page.id().page_no() == index->page &&
1423       fil_page_get_type(old->frame) == FIL_PAGE_TYPE_INSTANT)
1424   {
1425     /* Preserve the PAGE_INSTANT information. */
1426     ut_ad(index->is_instant());
1427     memcpy_aligned<2>(FIL_PAGE_TYPE + block->frame,
1428                       FIL_PAGE_TYPE + old->frame, 2);
1429     memcpy_aligned<2>(PAGE_HEADER + PAGE_INSTANT + block->frame,
1430                       PAGE_HEADER + PAGE_INSTANT + old->frame, 2);
1431     if (!index->table->instant);
1432     else if (page_is_comp(block->frame))
1433     {
1434       memcpy(PAGE_NEW_INFIMUM + block->frame,
1435              PAGE_NEW_INFIMUM + old->frame, 8);
1436       memcpy(PAGE_NEW_SUPREMUM + block->frame,
1437              PAGE_NEW_SUPREMUM + old->frame, 8);
1438     }
1439     else
1440     {
1441       memcpy(PAGE_OLD_INFIMUM + block->frame,
1442              PAGE_OLD_INFIMUM + old->frame, 8);
1443       memcpy(PAGE_OLD_SUPREMUM + block->frame,
1444              PAGE_OLD_SUPREMUM + old->frame, 8);
1445     }
1446   }
1447 
1448   ut_ad(!memcmp(old->frame, block->frame, PAGE_HEADER));
1449   ut_ad(!memcmp(old->frame + PAGE_MAX_TRX_ID + PAGE_HEADER,
1450                 block->frame + PAGE_MAX_TRX_ID + PAGE_HEADER,
1451                 PAGE_DATA - (PAGE_MAX_TRX_ID + PAGE_HEADER)));
1452 
1453   if (!dict_table_is_locking_disabled(index->table))
1454     lock_move_reorganize_page(block, old);
1455 
1456   /* Write log for the changes, if needed. */
1457   mtr->set_log_mode(log_mode);
1458   if (log_mode == MTR_LOG_ALL)
1459   {
1460     /* Check and log the changes in the page header. */
1461     ulint a, e;
1462     for (a= PAGE_HEADER, e= PAGE_MAX_TRX_ID + PAGE_HEADER; a < e; a++)
1463     {
1464       if (old->frame[a] == block->frame[a])
1465         continue;
1466       while (--e, old->frame[e] == block->frame[e]);
1467       e++;
1468       ut_ad(a < e);
1469       /* Write log for the changed page header fields. */
1470       mtr->memcpy(*block, a, e - a);
1471       break;
1472     }
1473 
1474     const uint16_t top= page_header_get_offs(block->frame, PAGE_HEAP_TOP);
1475 
1476     if (page_is_comp(block->frame))
1477     {
1478       /* info_bits=0, n_owned=1, heap_no=0, status */
1479       ut_ad(!memcmp(PAGE_NEW_INFIMUM - REC_N_NEW_EXTRA_BYTES + block->frame,
1480                     PAGE_NEW_INFIMUM - REC_N_NEW_EXTRA_BYTES + old->frame, 3));
1481       /* If the 'next' pointer of the infimum record has changed, log it. */
1482       a= PAGE_NEW_INFIMUM - 2;
1483       e= a + 2;
1484       if (block->frame[a] == old->frame[a])
1485         a++;
1486       if (--e, block->frame[e] != old->frame[e])
1487         e++;
1488       if (ulint len= e - a)
1489         mtr->memcpy(*block, a, len);
1490       /* The infimum record itself must not change. */
1491       ut_ad(!memcmp(PAGE_NEW_INFIMUM + block->frame,
1492                     PAGE_NEW_INFIMUM + old->frame, 8));
1493       /* Log any change of the n_owned of the supremum record. */
1494       a= PAGE_NEW_SUPREMUM - REC_N_NEW_EXTRA_BYTES;
1495       if (block->frame[a] != old->frame[a])
1496         mtr->memcpy(*block, a, 1);
1497       /* The rest of the supremum record must not change. */
1498       ut_ad(!memcmp(&block->frame[a + 1], &old->frame[a + 1],
1499                     PAGE_NEW_SUPREMUM_END - PAGE_NEW_SUPREMUM +
1500                     REC_N_NEW_EXTRA_BYTES - 1));
1501 
1502       /* Log the differences in the payload. */
1503       for (a= PAGE_NEW_SUPREMUM_END, e= top; a < e; a++)
1504       {
1505         if (old->frame[a] == block->frame[a])
1506           continue;
1507         while (--e, old->frame[e] == block->frame[e]);
1508         e++;
1509         ut_ad(a < e);
1510 	/* TODO: write MEMMOVE records to minimize this further! */
1511         mtr->memcpy(*block, a, e - a);
1512 	break;
1513       }
1514     }
1515     else
1516     {
1517       /* info_bits=0, n_owned=1, heap_no=0, number of fields, 1-byte format */
1518       ut_ad(!memcmp(PAGE_OLD_INFIMUM - REC_N_OLD_EXTRA_BYTES + block->frame,
1519                     PAGE_OLD_INFIMUM - REC_N_OLD_EXTRA_BYTES + old->frame, 4));
1520       /* If the 'next' pointer of the infimum record has changed, log it. */
1521       a= PAGE_OLD_INFIMUM - 2;
1522       e= a + 2;
1523       if (block->frame[a] == old->frame[a])
1524         a++;
1525       if (--e, block->frame[e] != old->frame[e])
1526         e++;
1527       if (ulint len= e - a)
1528         mtr->memcpy(*block, a, len);
1529       /* The infimum record itself must not change. */
1530       ut_ad(!memcmp(PAGE_OLD_INFIMUM + block->frame,
1531                     PAGE_OLD_INFIMUM + old->frame, 8));
1532       /* Log any change of the n_owned of the supremum record. */
1533       a= PAGE_OLD_SUPREMUM - REC_N_OLD_EXTRA_BYTES;
1534       if (block->frame[a] != old->frame[a])
1535         mtr->memcpy(*block, a, 1);
1536       ut_ad(!memcmp(&block->frame[a + 1], &old->frame[a + 1],
1537                     PAGE_OLD_SUPREMUM_END - PAGE_OLD_SUPREMUM +
1538                     REC_N_OLD_EXTRA_BYTES - 1));
1539 
1540       /* Log the differences in the payload. */
1541       for (a= PAGE_OLD_SUPREMUM_END, e= top; a < e; a++)
1542       {
1543         if (old->frame[a] == block->frame[a])
1544           continue;
1545         while (--e, old->frame[e] == block->frame[e]);
1546         e++;
1547         ut_ad(a < e);
1548 	/* TODO: write MEMMOVE records to minimize this further! */
1549         mtr->memcpy(*block, a, e - a);
1550 	break;
1551       }
1552     }
1553 
1554     e= srv_page_size - PAGE_DIR;
1555     a= e - PAGE_DIR_SLOT_SIZE * page_dir_get_n_slots(block->frame);
1556 
1557     /* Zero out the payload area. */
1558     mtr->memset(*block, top, a - top, 0);
1559 
1560     /* Log changes to the page directory. */
1561     for (; a < e; a++)
1562     {
1563       if (old->frame[a] == block->frame[a])
1564         continue;
1565       while (--e, old->frame[e] == block->frame[e]);
1566       e++;
1567       ut_ad(a < e);
1568       /* Write log for the changed page directory slots. */
1569       mtr->memcpy(*block, a, e - a);
1570       break;
1571     }
1572   }
1573 
1574   buf_block_free(old);
1575 
1576   MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
1577   MONITOR_INC(MONITOR_INDEX_REORG_SUCCESSFUL);
1578 }
1579 
1580 /*************************************************************//**
1581 Reorganizes an index page.
1582 
1583 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1584 if this is a compressed leaf page in a secondary index. This has to
1585 be done either within the same mini-transaction, or by invoking
1586 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1587 IBUF_BITMAP_FREE is unaffected by reorganization.
1588 
1589 @retval true if the operation was successful
1590 @retval false if it is a compressed page, and recompression failed */
1591 bool
btr_page_reorganize_block(ulint z_level,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1592 btr_page_reorganize_block(
1593 	ulint		z_level,/*!< in: compression level to be used
1594 				if dealing with compressed page */
1595 	buf_block_t*	block,	/*!< in/out: B-tree page */
1596 	dict_index_t*	index,	/*!< in: the index tree of the page */
1597 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1598 {
1599 	if (buf_block_get_page_zip(block)) {
1600 		return page_zip_reorganize(block, index, z_level, mtr, true);
1601 	}
1602 
1603 	page_cur_t	cur;
1604 	page_cur_set_before_first(block, &cur);
1605 
1606 	btr_page_reorganize_low(&cur, index, mtr);
1607 	return true;
1608 }
1609 
1610 /*************************************************************//**
1611 Reorganizes an index page.
1612 
1613 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1614 if this is a compressed leaf page in a secondary index. This has to
1615 be done either within the same mini-transaction, or by invoking
1616 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1617 IBUF_BITMAP_FREE is unaffected by reorganization.
1618 
1619 @retval true if the operation was successful
1620 @retval false if it is a compressed page, and recompression failed */
1621 bool
btr_page_reorganize(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1622 btr_page_reorganize(
1623 /*================*/
1624 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1625 	dict_index_t*	index,	/*!< in: the index tree of the page */
1626 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1627 {
1628 	if (!buf_block_get_page_zip(cursor->block)) {
1629 		btr_page_reorganize_low(cursor, index, mtr);
1630 		return true;
1631 	}
1632 
1633 	ulint pos = page_rec_get_n_recs_before(cursor->rec);
1634 	if (!page_zip_reorganize(cursor->block, index, page_zip_level, mtr,
1635 				 true)) {
1636 		return false;
1637 	}
1638 	if (pos) {
1639 		cursor->rec = page_rec_get_nth(cursor->block->frame, pos);
1640 	} else {
1641 		ut_ad(cursor->rec == page_get_infimum_rec(
1642 			      cursor->block->frame));
1643 	}
1644 
1645 	return true;
1646 }
1647 
1648 /** Empty an index page (possibly the root page). @see btr_page_create().
1649 @param[in,out]	block		page to be emptied
1650 @param[in,out]	page_zip	compressed page frame, or NULL
1651 @param[in]	index		index of the page
1652 @param[in]	level		B-tree level of the page (0=leaf)
1653 @param[in,out]	mtr		mini-transaction */
1654 void
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1655 btr_page_empty(
1656 	buf_block_t*	block,
1657 	page_zip_des_t*	page_zip,
1658 	dict_index_t*	index,
1659 	ulint		level,
1660 	mtr_t*		mtr)
1661 {
1662 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
1663 	ut_ad(page_zip == buf_block_get_page_zip(block));
1664 	ut_ad(!index->is_dummy);
1665 	ut_ad(index->table->space->id == block->page.id().space());
1666 #ifdef UNIV_ZIP_DEBUG
1667 	ut_a(!page_zip || page_zip_validate(page_zip, block->frame, index));
1668 #endif /* UNIV_ZIP_DEBUG */
1669 
1670 	btr_search_drop_page_hash_index(block);
1671 
1672 	/* Recreate the page: note that global data on page (possible
1673 	segment headers, next page-field, etc.) is preserved intact */
1674 
1675 	/* Preserve PAGE_ROOT_AUTO_INC when creating a clustered index
1676 	root page. */
1677 	const ib_uint64_t	autoinc
1678 		= dict_index_is_clust(index)
1679 		&& index->page == block->page.id().page_no()
1680 		? page_get_autoinc(block->frame)
1681 		: 0;
1682 
1683 	if (page_zip) {
1684 		page_create_zip(block, index, level, autoinc, mtr);
1685 	} else {
1686 		page_create(block, mtr, index->table->not_redundant());
1687 		if (index->is_spatial()) {
1688 			static_assert(((FIL_PAGE_INDEX & 0xff00)
1689 				       | byte(FIL_PAGE_RTREE))
1690 				      == FIL_PAGE_RTREE, "compatibility");
1691 			mtr->write<1>(*block, FIL_PAGE_TYPE + 1 + block->frame,
1692 				      byte(FIL_PAGE_RTREE));
1693 			if (mach_read_from_8(block->frame
1694 					     + FIL_RTREE_SPLIT_SEQ_NUM)) {
1695 				mtr->memset(block, FIL_RTREE_SPLIT_SEQ_NUM,
1696 					    8, 0);
1697 			}
1698 		}
1699 		mtr->write<2,mtr_t::MAYBE_NOP>(*block, PAGE_HEADER + PAGE_LEVEL
1700 					       + block->frame, level);
1701 		if (autoinc) {
1702 			mtr->write<8>(*block, PAGE_HEADER + PAGE_MAX_TRX_ID
1703 				      + block->frame, autoinc);
1704 		}
1705 	}
1706 }
1707 
1708 /** Write instant ALTER TABLE metadata to a root page.
1709 @param[in,out]	root	clustered index root page
1710 @param[in]	index	clustered index with instant ALTER TABLE
1711 @param[in,out]	mtr	mini-transaction */
btr_set_instant(buf_block_t * root,const dict_index_t & index,mtr_t * mtr)1712 void btr_set_instant(buf_block_t* root, const dict_index_t& index, mtr_t* mtr)
1713 {
1714 	ut_ad(index.n_core_fields > 0);
1715 	ut_ad(index.n_core_fields < REC_MAX_N_FIELDS);
1716 	ut_ad(index.is_instant());
1717 	ut_ad(fil_page_get_type(root->frame) == FIL_PAGE_TYPE_INSTANT
1718 	      || fil_page_get_type(root->frame) == FIL_PAGE_INDEX);
1719 	ut_ad(!page_has_siblings(root->frame));
1720 	ut_ad(root->page.id().page_no() == index.page);
1721 
1722 	rec_t* infimum = page_get_infimum_rec(root->frame);
1723 	rec_t* supremum = page_get_supremum_rec(root->frame);
1724 	byte* page_type = root->frame + FIL_PAGE_TYPE;
1725 	uint16_t i = page_header_get_field(root->frame, PAGE_INSTANT);
1726 
1727 	switch (mach_read_from_2(page_type)) {
1728 	case FIL_PAGE_TYPE_INSTANT:
1729 		ut_ad(page_get_instant(root->frame) == index.n_core_fields);
1730 		if (memcmp(infimum, "infimum", 8)
1731 		    || memcmp(supremum, "supremum", 8)) {
1732 			ut_ad(index.table->instant);
1733 			ut_ad(!memcmp(infimum, field_ref_zero, 8));
1734 			ut_ad(!memcmp(supremum, field_ref_zero, 7));
1735 			/* The n_core_null_bytes only matters for
1736 			ROW_FORMAT=COMPACT and ROW_FORMAT=DYNAMIC tables. */
1737 			ut_ad(supremum[7] == index.n_core_null_bytes
1738 			      || !index.table->not_redundant());
1739 			return;
1740 		}
1741 		break;
1742 	default:
1743 		ut_ad("wrong page type" == 0);
1744 		/* fall through */
1745 	case FIL_PAGE_INDEX:
1746 		ut_ad(!page_is_comp(root->frame)
1747 		      || !page_get_instant(root->frame));
1748 		ut_ad(!memcmp(infimum, "infimum", 8));
1749 		ut_ad(!memcmp(supremum, "supremum", 8));
1750 		mtr->write<2>(*root, page_type, FIL_PAGE_TYPE_INSTANT);
1751 		ut_ad(i <= PAGE_NO_DIRECTION);
1752 		i |= static_cast<uint16_t>(index.n_core_fields << 3);
1753 		mtr->write<2>(*root, PAGE_HEADER + PAGE_INSTANT + root->frame,
1754 			      i);
1755 		break;
1756 	}
1757 
1758 	if (index.table->instant) {
1759 		mtr->memset(root, infimum - root->frame, 8, 0);
1760 		mtr->memset(root, supremum - root->frame, 7, 0);
1761 		mtr->write<1,mtr_t::MAYBE_NOP>(*root, &supremum[7],
1762 					       index.n_core_null_bytes);
1763 	}
1764 }
1765 
1766 /** Reset the table to the canonical format on ROLLBACK of instant ALTER TABLE.
1767 @param[in]      index   clustered index with instant ALTER TABLE
1768 @param[in]      all     whether to reset FIL_PAGE_TYPE as well
1769 @param[in,out]  mtr     mini-transaction */
1770 ATTRIBUTE_COLD
btr_reset_instant(const dict_index_t & index,bool all,mtr_t * mtr)1771 void btr_reset_instant(const dict_index_t &index, bool all, mtr_t *mtr)
1772 {
1773   ut_ad(!index.table->is_temporary());
1774   ut_ad(index.is_primary());
1775   if (buf_block_t *root = btr_root_block_get(&index, RW_SX_LATCH, mtr))
1776   {
1777     byte *page_type= root->frame + FIL_PAGE_TYPE;
1778     if (all)
1779     {
1780       ut_ad(mach_read_from_2(page_type) == FIL_PAGE_TYPE_INSTANT ||
1781             mach_read_from_2(page_type) == FIL_PAGE_INDEX);
1782       mtr->write<2,mtr_t::MAYBE_NOP>(*root, page_type, FIL_PAGE_INDEX);
1783       byte *instant= PAGE_INSTANT + PAGE_HEADER + root->frame;
1784       mtr->write<2,mtr_t::MAYBE_NOP>(*root, instant,
1785                                      page_ptr_get_direction(instant + 1));
1786     }
1787     else
1788       ut_ad(mach_read_from_2(page_type) == FIL_PAGE_TYPE_INSTANT);
1789     static const byte supremuminfimum[8 + 8] = "supremuminfimum";
1790     uint16_t infimum, supremum;
1791     if (page_is_comp(root->frame))
1792     {
1793       infimum= PAGE_NEW_INFIMUM;
1794       supremum= PAGE_NEW_SUPREMUM;
1795     }
1796     else
1797     {
1798       infimum= PAGE_OLD_INFIMUM;
1799       supremum= PAGE_OLD_SUPREMUM;
1800     }
1801     ut_ad(!memcmp(&root->frame[infimum], supremuminfimum + 8, 8) ==
1802           !memcmp(&root->frame[supremum], supremuminfimum, 8));
1803     mtr->memcpy<mtr_t::MAYBE_NOP>(*root, &root->frame[infimum],
1804                                   supremuminfimum + 8, 8);
1805     mtr->memcpy<mtr_t::MAYBE_NOP>(*root, &root->frame[supremum],
1806                                   supremuminfimum, 8);
1807   }
1808 }
1809 
1810 /*************************************************************//**
1811 Makes tree one level higher by splitting the root, and inserts
1812 the tuple. It is assumed that mtr contains an x-latch on the tree.
1813 NOTE that the operation of this function must always succeed,
1814 we cannot reverse it: therefore enough free disk space must be
1815 guaranteed to be available before this function is called.
1816 @return inserted record */
1817 rec_t*
btr_root_raise_and_insert(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1818 btr_root_raise_and_insert(
1819 /*======================*/
1820 	ulint		flags,	/*!< in: undo logging and locking flags */
1821 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert: must be
1822 				on the root page; when the function returns,
1823 				the cursor is positioned on the predecessor
1824 				of the inserted record */
1825 	rec_offs**	offsets,/*!< out: offsets on inserted record */
1826 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1827 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
1828 	ulint		n_ext,	/*!< in: number of externally stored columns */
1829 	mtr_t*		mtr)	/*!< in: mtr */
1830 {
1831 	dict_index_t*	index;
1832 	ulint		new_page_no;
1833 	rec_t*		rec;
1834 	dtuple_t*	node_ptr;
1835 	ulint		level;
1836 	rec_t*		node_ptr_rec;
1837 	page_cur_t*	page_cursor;
1838 	page_zip_des_t*	root_page_zip;
1839 	page_zip_des_t*	new_page_zip;
1840 	buf_block_t*	root;
1841 	buf_block_t*	new_block;
1842 
1843 	root = btr_cur_get_block(cursor);
1844 	root_page_zip = buf_block_get_page_zip(root);
1845 	ut_ad(!page_is_empty(root->frame));
1846 	index = btr_cur_get_index(cursor);
1847 	ut_ad(index->n_core_null_bytes <= UT_BITS_IN_BYTES(index->n_nullable));
1848 #ifdef UNIV_ZIP_DEBUG
1849 	ut_a(!root_page_zip || page_zip_validate(root_page_zip, root->frame,
1850 						 index));
1851 #endif /* UNIV_ZIP_DEBUG */
1852 #ifdef UNIV_BTR_DEBUG
1853 	if (!dict_index_is_ibuf(index)) {
1854 		ulint	space = index->table->space_id;
1855 
1856 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1857 					    + root->frame, space));
1858 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1859 					    + root->frame, space));
1860 	}
1861 
1862 	ut_a(dict_index_get_page(index) == root->page.id().page_no());
1863 #endif /* UNIV_BTR_DEBUG */
1864 	ut_ad(mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
1865 					 | MTR_MEMO_SX_LOCK));
1866 	ut_ad(mtr->memo_contains_flagged(root, MTR_MEMO_PAGE_X_FIX));
1867 
1868 	/* Allocate a new page to the tree. Root splitting is done by first
1869 	moving the root records to the new page, emptying the root, putting
1870 	a node pointer to the new page, and then splitting the new page. */
1871 
1872 	level = btr_page_get_level(root->frame);
1873 
1874 	new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1875 
1876 	if (new_block == NULL && os_has_said_disk_full) {
1877 		return(NULL);
1878 	}
1879 
1880 	new_page_zip = buf_block_get_page_zip(new_block);
1881 	ut_a(!new_page_zip == !root_page_zip);
1882 	ut_a(!new_page_zip
1883 	     || page_zip_get_size(new_page_zip)
1884 	     == page_zip_get_size(root_page_zip));
1885 
1886 	btr_page_create(new_block, new_page_zip, index, level, mtr);
1887 	if (page_has_siblings(new_block->frame)) {
1888 		compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4);
1889 		compile_time_assert(FIL_NULL == 0xffffffff);
1890 		static_assert(FIL_PAGE_PREV % 8 == 0, "alignment");
1891 		memset_aligned<8>(new_block->frame + FIL_PAGE_PREV, 0xff, 8);
1892 		mtr->memset(new_block, FIL_PAGE_PREV, 8, 0xff);
1893 		if (UNIV_LIKELY_NULL(new_page_zip)) {
1894 			memset_aligned<8>(new_page_zip->data + FIL_PAGE_PREV,
1895 					  0xff, 8);
1896 		}
1897 	}
1898 
1899 	/* Copy the records from root to the new page one by one. */
1900 
1901 	if (0
1902 #ifdef UNIV_ZIP_COPY
1903 	    || new_page_zip
1904 #endif /* UNIV_ZIP_COPY */
1905 	    || !page_copy_rec_list_end(new_block, root,
1906 				       page_get_infimum_rec(root->frame),
1907 				       index, mtr)) {
1908 		ut_a(new_page_zip);
1909 
1910 		/* Copy the page byte for byte. */
1911 		page_zip_copy_recs(new_block,
1912 				   root_page_zip, root->frame, index, mtr);
1913 
1914 		/* Update the lock table and possible hash index. */
1915 		lock_move_rec_list_end(new_block, root,
1916 				       page_get_infimum_rec(root->frame));
1917 
1918 		/* Move any existing predicate locks */
1919 		if (dict_index_is_spatial(index)) {
1920 			lock_prdt_rec_move(new_block, root);
1921 		} else {
1922 			btr_search_move_or_delete_hash_entries(
1923 				new_block, root);
1924 		}
1925 	}
1926 
1927 	constexpr uint16_t max_trx_id = PAGE_HEADER + PAGE_MAX_TRX_ID;
1928 	if (dict_index_is_sec_or_ibuf(index)) {
1929 		/* In secondary indexes and the change buffer,
1930 		PAGE_MAX_TRX_ID can be reset on the root page, because
1931 		the field only matters on leaf pages, and the root no
1932 		longer is a leaf page. (Older versions of InnoDB did
1933 		set PAGE_MAX_TRX_ID on all secondary index pages.) */
1934 		byte* p = my_assume_aligned<8>(
1935 			PAGE_HEADER + PAGE_MAX_TRX_ID + root->frame);
1936 		if (mach_read_from_8(p)) {
1937 			mtr->memset(root, max_trx_id, 8, 0);
1938 			if (UNIV_LIKELY_NULL(root->page.zip.data)) {
1939 				memset_aligned<8>(max_trx_id
1940 						  + root->page.zip.data, 0, 8);
1941 			}
1942 		}
1943 	} else {
1944 		/* PAGE_ROOT_AUTO_INC is only present in the clustered index
1945 		root page; on other clustered index pages, we want to reserve
1946 		the field PAGE_MAX_TRX_ID for future use. */
1947 		byte* p = my_assume_aligned<8>(
1948 			PAGE_HEADER + PAGE_MAX_TRX_ID + new_block->frame);
1949 		if (mach_read_from_8(p)) {
1950 			mtr->memset(new_block, max_trx_id, 8, 0);
1951 			if (UNIV_LIKELY_NULL(new_block->page.zip.data)) {
1952 				memset_aligned<8>(max_trx_id
1953 						  + new_block->page.zip.data,
1954 						  0, 8);
1955 			}
1956 		}
1957 	}
1958 
1959 	/* If this is a pessimistic insert which is actually done to
1960 	perform a pessimistic update then we have stored the lock
1961 	information of the record to be inserted on the infimum of the
1962 	root page: we cannot discard the lock structs on the root page */
1963 
1964 	if (!dict_table_is_locking_disabled(index->table)) {
1965 		lock_update_root_raise(new_block, root);
1966 	}
1967 
1968 	/* Create a memory heap where the node pointer is stored */
1969 	if (!*heap) {
1970 		*heap = mem_heap_create(1000);
1971 	}
1972 
1973 	rec = page_rec_get_next(page_get_infimum_rec(new_block->frame));
1974 	new_page_no = new_block->page.id().page_no();
1975 
1976 	/* Build the node pointer (= node key and page address) for the
1977 	child */
1978 	if (dict_index_is_spatial(index)) {
1979 		rtr_mbr_t		new_mbr;
1980 
1981 		rtr_page_cal_mbr(index, new_block, &new_mbr, *heap);
1982 		node_ptr = rtr_index_build_node_ptr(
1983 			index, &new_mbr, rec, new_page_no, *heap);
1984 	} else {
1985 		node_ptr = dict_index_build_node_ptr(
1986 			index, rec, new_page_no, *heap, level);
1987 	}
1988 	/* The node pointer must be marked as the predefined minimum record,
1989 	as there is no lower alphabetical limit to records in the leftmost
1990 	node of a level: */
1991 	dtuple_set_info_bits(node_ptr,
1992 			     dtuple_get_info_bits(node_ptr)
1993 			     | REC_INFO_MIN_REC_FLAG);
1994 
1995 	/* Rebuild the root page to get free space */
1996 	btr_page_empty(root, root_page_zip, index, level + 1, mtr);
1997 	/* btr_page_empty() is supposed to zero-initialize the field. */
1998 	ut_ad(!page_get_instant(root->frame));
1999 
2000 	if (index->is_instant()) {
2001 		ut_ad(!root_page_zip);
2002 		btr_set_instant(root, *index, mtr);
2003 	}
2004 
2005 	ut_ad(!page_has_siblings(root->frame));
2006 
2007 	page_cursor = btr_cur_get_page_cur(cursor);
2008 
2009 	/* Insert node pointer to the root */
2010 
2011 	page_cur_set_before_first(root, page_cursor);
2012 
2013 	node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
2014 					     index, offsets, heap, 0, mtr);
2015 
2016 	/* The root page should only contain the node pointer
2017 	to new_block at this point.  Thus, the data should fit. */
2018 	ut_a(node_ptr_rec);
2019 
2020 	/* We play safe and reset the free bits for the new page */
2021 
2022 	if (!dict_index_is_clust(index)
2023 	    && !index->table->is_temporary()) {
2024 		ibuf_reset_free_bits(new_block);
2025 	}
2026 
2027 	if (tuple != NULL) {
2028 		/* Reposition the cursor to the child node */
2029 		page_cur_search(new_block, index, tuple, page_cursor);
2030 	} else {
2031 		/* Set cursor to first record on child node */
2032 		page_cur_set_before_first(new_block, page_cursor);
2033 	}
2034 
2035 	/* Split the child and insert tuple */
2036 	if (dict_index_is_spatial(index)) {
2037 		/* Split rtree page and insert tuple */
2038 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2039 						 tuple, n_ext, mtr));
2040 	} else {
2041 		return(btr_page_split_and_insert(flags, cursor, offsets, heap,
2042 						 tuple, n_ext, mtr));
2043 	}
2044 }
2045 
2046 /** Decide if the page should be split at the convergence point of inserts
2047 converging to the left.
2048 @param[in]	cursor	insert position
2049 @return the first record to be moved to the right half page
2050 @retval	NULL if no split is recommended */
btr_page_get_split_rec_to_left(const btr_cur_t * cursor)2051 rec_t* btr_page_get_split_rec_to_left(const btr_cur_t* cursor)
2052 {
2053 	rec_t* split_rec = btr_cur_get_rec(cursor);
2054 	const page_t* page = page_align(split_rec);
2055 
2056 	if (page_header_get_ptr(page, PAGE_LAST_INSERT)
2057 	    != page_rec_get_next(split_rec)) {
2058 		return NULL;
2059 	}
2060 
2061 	/* The metadata record must be present in the leftmost leaf page
2062 	of the clustered index, if and only if index->is_instant().
2063 	However, during innobase_instant_try(), index->is_instant()
2064 	would already hold when row_ins_clust_index_entry_low()
2065 	is being invoked to insert the the metadata record.
2066 	So, we can only assert that when the metadata record exists,
2067 	index->is_instant() must hold. */
2068 	ut_ad(!page_is_leaf(page) || page_has_prev(page)
2069 	      || cursor->index->is_instant()
2070 	      || !(rec_get_info_bits(page_rec_get_next_const(
2071 					     page_get_infimum_rec(page)),
2072 				     cursor->index->table->not_redundant())
2073 		   & REC_INFO_MIN_REC_FLAG));
2074 
2075 	const rec_t* infimum = page_get_infimum_rec(page);
2076 
2077 	/* If the convergence is in the middle of a page, include also
2078 	the record immediately before the new insert to the upper
2079 	page. Otherwise, we could repeatedly move from page to page
2080 	lots of records smaller than the convergence point. */
2081 
2082 	if (split_rec == infimum
2083 	    || split_rec == page_rec_get_next_const(infimum)) {
2084 		split_rec = page_rec_get_next(split_rec);
2085 	}
2086 
2087 	return split_rec;
2088 }
2089 
2090 /** Decide if the page should be split at the convergence point of inserts
2091 converging to the right.
2092 @param[in]	cursor		insert position
2093 @param[out]	split_rec	if split recommended, the first record
2094 				on the right half page, or
2095 				NULL if the to-be-inserted record
2096 				should be first
2097 @return whether split is recommended */
2098 bool
btr_page_get_split_rec_to_right(const btr_cur_t * cursor,rec_t ** split_rec)2099 btr_page_get_split_rec_to_right(const btr_cur_t* cursor, rec_t** split_rec)
2100 {
2101 	rec_t* insert_point = btr_cur_get_rec(cursor);
2102 	const page_t* page = page_align(insert_point);
2103 
2104 	/* We use eager heuristics: if the new insert would be right after
2105 	the previous insert on the same page, we assume that there is a
2106 	pattern of sequential inserts here. */
2107 
2108 	if (page_header_get_ptr(page, PAGE_LAST_INSERT) != insert_point) {
2109 		return false;
2110 	}
2111 
2112 	insert_point = page_rec_get_next(insert_point);
2113 
2114 	if (page_rec_is_supremum(insert_point)) {
2115 		insert_point = NULL;
2116 	} else {
2117 		insert_point = page_rec_get_next(insert_point);
2118 		if (page_rec_is_supremum(insert_point)) {
2119 			insert_point = NULL;
2120 		}
2121 
2122 		/* If there are >= 2 user records up from the insert
2123 		point, split all but 1 off. We want to keep one because
2124 		then sequential inserts can use the adaptive hash
2125 		index, as they can do the necessary checks of the right
2126 		search position just by looking at the records on this
2127 		page. */
2128 	}
2129 
2130 	*split_rec = insert_point;
2131 	return true;
2132 }
2133 
2134 /*************************************************************//**
2135 Calculates a split record such that the tuple will certainly fit on
2136 its half-page when the split is performed. We assume in this function
2137 only that the cursor page has at least one user record.
2138 @return split record, or NULL if tuple will be the first record on
2139 the lower or upper half-page (determined by btr_page_tuple_smaller()) */
2140 static
2141 rec_t*
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext)2142 btr_page_get_split_rec(
2143 /*===================*/
2144 	btr_cur_t*	cursor,	/*!< in: cursor at which insert should be made */
2145 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2146 	ulint		n_ext)	/*!< in: number of externally stored columns */
2147 {
2148 	page_t*		page;
2149 	page_zip_des_t*	page_zip;
2150 	ulint		insert_size;
2151 	ulint		free_space;
2152 	ulint		total_data;
2153 	ulint		total_n_recs;
2154 	ulint		total_space;
2155 	ulint		incl_data;
2156 	rec_t*		ins_rec;
2157 	rec_t*		rec;
2158 	rec_t*		next_rec;
2159 	ulint		n;
2160 	mem_heap_t*	heap;
2161 	rec_offs*	offsets;
2162 
2163 	page = btr_cur_get_page(cursor);
2164 
2165 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2166 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2167 
2168 	page_zip = btr_cur_get_page_zip(cursor);
2169 	if (page_zip) {
2170 		/* Estimate the free space of an empty compressed page. */
2171 		ulint	free_space_zip = page_zip_empty_size(
2172 			cursor->index->n_fields,
2173 			page_zip_get_size(page_zip));
2174 
2175 		if (free_space > (ulint) free_space_zip) {
2176 			free_space = (ulint) free_space_zip;
2177 		}
2178 	}
2179 
2180 	/* free_space is now the free space of a created new page */
2181 
2182 	total_data   = page_get_data_size(page) + insert_size;
2183 	total_n_recs = ulint(page_get_n_recs(page)) + 1;
2184 	ut_ad(total_n_recs >= 2);
2185 	total_space  = total_data + page_dir_calc_reserved_space(total_n_recs);
2186 
2187 	n = 0;
2188 	incl_data = 0;
2189 	ins_rec = btr_cur_get_rec(cursor);
2190 	rec = page_get_infimum_rec(page);
2191 
2192 	heap = NULL;
2193 	offsets = NULL;
2194 
2195 	/* We start to include records to the left half, and when the
2196 	space reserved by them exceeds half of total_space, then if
2197 	the included records fit on the left page, they will be put there
2198 	if something was left over also for the right page,
2199 	otherwise the last included record will be the first on the right
2200 	half page */
2201 
2202 	do {
2203 		/* Decide the next record to include */
2204 		if (rec == ins_rec) {
2205 			rec = NULL;	/* NULL denotes that tuple is
2206 					now included */
2207 		} else if (rec == NULL) {
2208 			rec = page_rec_get_next(ins_rec);
2209 		} else {
2210 			rec = page_rec_get_next(rec);
2211 		}
2212 
2213 		if (rec == NULL) {
2214 			/* Include tuple */
2215 			incl_data += insert_size;
2216 		} else {
2217 			offsets = rec_get_offsets(rec, cursor->index, offsets,
2218 						  page_is_leaf(page)
2219 						  ? cursor->index->n_core_fields
2220 						  : 0,
2221 						  ULINT_UNDEFINED, &heap);
2222 			incl_data += rec_offs_size(offsets);
2223 		}
2224 
2225 		n++;
2226 	} while (incl_data + page_dir_calc_reserved_space(n)
2227 		 < total_space / 2);
2228 
2229 	if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
2230 		/* The next record will be the first on
2231 		the right half page if it is not the
2232 		supremum record of page */
2233 
2234 		if (rec == ins_rec) {
2235 			rec = NULL;
2236 
2237 			goto func_exit;
2238 		} else if (rec == NULL) {
2239 			next_rec = page_rec_get_next(ins_rec);
2240 		} else {
2241 			next_rec = page_rec_get_next(rec);
2242 		}
2243 		ut_ad(next_rec);
2244 		if (!page_rec_is_supremum(next_rec)) {
2245 			rec = next_rec;
2246 		}
2247 	}
2248 
2249 func_exit:
2250 	if (heap) {
2251 		mem_heap_free(heap);
2252 	}
2253 	return(rec);
2254 }
2255 
2256 /*************************************************************//**
2257 Returns TRUE if the insert fits on the appropriate half-page with the
2258 chosen split_rec.
2259 @return true if fits */
2260 static MY_ATTRIBUTE((nonnull(1,3,4,6), warn_unused_result))
2261 bool
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,rec_offs ** offsets,const dtuple_t * tuple,ulint n_ext,mem_heap_t ** heap)2262 btr_page_insert_fits(
2263 /*=================*/
2264 	btr_cur_t*	cursor,	/*!< in: cursor at which insert
2265 				should be made */
2266 	const rec_t*	split_rec,/*!< in: suggestion for first record
2267 				on upper half-page, or NULL if
2268 				tuple to be inserted should be first */
2269 	rec_offs**	offsets,/*!< in: rec_get_offsets(
2270 				split_rec, cursor->index); out: garbage */
2271 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2272 	ulint		n_ext,	/*!< in: number of externally stored columns */
2273 	mem_heap_t**	heap)	/*!< in: temporary memory heap */
2274 {
2275 	page_t*		page;
2276 	ulint		insert_size;
2277 	ulint		free_space;
2278 	ulint		total_data;
2279 	ulint		total_n_recs;
2280 	const rec_t*	rec;
2281 	const rec_t*	end_rec;
2282 
2283 	page = btr_cur_get_page(cursor);
2284 
2285 	ut_ad(!split_rec
2286 	      || !page_is_comp(page) == !rec_offs_comp(*offsets));
2287 	ut_ad(!split_rec
2288 	      || rec_offs_validate(split_rec, cursor->index, *offsets));
2289 
2290 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2291 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2292 
2293 	/* free_space is now the free space of a created new page */
2294 
2295 	total_data   = page_get_data_size(page) + insert_size;
2296 	total_n_recs = ulint(page_get_n_recs(page)) + 1;
2297 
2298 	/* We determine which records (from rec to end_rec, not including
2299 	end_rec) will end up on the other half page from tuple when it is
2300 	inserted. */
2301 
2302 	if (split_rec == NULL) {
2303 		rec = page_rec_get_next(page_get_infimum_rec(page));
2304 		end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
2305 
2306 	} else if (cmp_dtuple_rec(tuple, split_rec, *offsets) >= 0) {
2307 
2308 		rec = page_rec_get_next(page_get_infimum_rec(page));
2309 		end_rec = split_rec;
2310 	} else {
2311 		rec = split_rec;
2312 		end_rec = page_get_supremum_rec(page);
2313 	}
2314 
2315 	if (total_data + page_dir_calc_reserved_space(total_n_recs)
2316 	    <= free_space) {
2317 
2318 		/* Ok, there will be enough available space on the
2319 		half page where the tuple is inserted */
2320 
2321 		return(true);
2322 	}
2323 
2324 	while (rec != end_rec) {
2325 		/* In this loop we calculate the amount of reserved
2326 		space after rec is removed from page. */
2327 
2328 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2329 					   page_is_leaf(page)
2330 					   ? cursor->index->n_core_fields
2331 					   : 0,
2332 					   ULINT_UNDEFINED, heap);
2333 
2334 		total_data -= rec_offs_size(*offsets);
2335 		total_n_recs--;
2336 
2337 		if (total_data + page_dir_calc_reserved_space(total_n_recs)
2338 		    <= free_space) {
2339 
2340 			/* Ok, there will be enough available space on the
2341 			half page where the tuple is inserted */
2342 
2343 			return(true);
2344 		}
2345 
2346 		rec = page_rec_get_next_const(rec);
2347 	}
2348 
2349 	return(false);
2350 }
2351 
2352 /*******************************************************//**
2353 Inserts a data tuple to a tree on a non-leaf level. It is assumed
2354 that mtr holds an x-latch on the tree. */
2355 void
btr_insert_on_non_leaf_level_func(ulint flags,dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,unsigned line,mtr_t * mtr)2356 btr_insert_on_non_leaf_level_func(
2357 /*==============================*/
2358 	ulint		flags,	/*!< in: undo logging and locking flags */
2359 	dict_index_t*	index,	/*!< in: index */
2360 	ulint		level,	/*!< in: level, must be > 0 */
2361 	dtuple_t*	tuple,	/*!< in: the record to be inserted */
2362 	const char*	file,	/*!< in: file name */
2363 	unsigned	line,	/*!< in: line where called */
2364 	mtr_t*		mtr)	/*!< in: mtr */
2365 {
2366 	big_rec_t*	dummy_big_rec;
2367 	btr_cur_t	cursor;
2368 	dberr_t		err;
2369 	rec_t*		rec;
2370 	mem_heap_t*	heap = NULL;
2371 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
2372 	rec_offs*	offsets         = offsets_;
2373 	rec_offs_init(offsets_);
2374 	rtr_info_t	rtr_info;
2375 
2376 	ut_ad(level > 0);
2377 
2378 	if (!dict_index_is_spatial(index)) {
2379 		dberr_t err = btr_cur_search_to_nth_level(
2380 			index, level, tuple, PAGE_CUR_LE,
2381 			BTR_CONT_MODIFY_TREE,
2382 			&cursor, 0, file, line, mtr);
2383 
2384 		if (err != DB_SUCCESS) {
2385 			ib::warn() << " Error code: " << err
2386 				   << " btr_page_get_father_node_ptr_func "
2387 				   << " level: " << level
2388 				   << " called from file: "
2389 				   << file << " line: " << line
2390 				   << " table: " << index->table->name
2391 				   << " index: " << index->name;
2392 		}
2393 	} else {
2394 		/* For spatial index, initialize structures to track
2395 		its parents etc. */
2396 		rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2397 
2398 		rtr_info_update_btr(&cursor, &rtr_info);
2399 
2400 		btr_cur_search_to_nth_level(index, level, tuple,
2401 					    PAGE_CUR_RTREE_INSERT,
2402 					    BTR_CONT_MODIFY_TREE,
2403 					    &cursor, 0, file, line, mtr);
2404 	}
2405 
2406 	ut_ad(cursor.flag == BTR_CUR_BINARY);
2407 
2408 	err = btr_cur_optimistic_insert(
2409 		flags
2410 		| BTR_NO_LOCKING_FLAG
2411 		| BTR_KEEP_SYS_FLAG
2412 		| BTR_NO_UNDO_LOG_FLAG,
2413 		&cursor, &offsets, &heap,
2414 		tuple, &rec, &dummy_big_rec, 0, NULL, mtr);
2415 
2416 	if (err == DB_FAIL) {
2417 		err = btr_cur_pessimistic_insert(flags
2418 						 | BTR_NO_LOCKING_FLAG
2419 						 | BTR_KEEP_SYS_FLAG
2420 						 | BTR_NO_UNDO_LOG_FLAG,
2421 						 &cursor, &offsets, &heap,
2422 						 tuple, &rec,
2423 						 &dummy_big_rec, 0, NULL, mtr);
2424 		ut_a(err == DB_SUCCESS);
2425 	}
2426 
2427 	if (heap != NULL) {
2428 		mem_heap_free(heap);
2429 	}
2430 
2431 	if (dict_index_is_spatial(index)) {
2432 		ut_ad(cursor.rtr_info);
2433 
2434 		rtr_clean_rtr_info(&rtr_info, true);
2435 	}
2436 }
2437 
2438 /**************************************************************//**
2439 Attaches the halves of an index page on the appropriate level in an
2440 index tree. */
2441 static MY_ATTRIBUTE((nonnull))
2442 void
btr_attach_half_pages(ulint flags,dict_index_t * index,buf_block_t * block,const rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2443 btr_attach_half_pages(
2444 /*==================*/
2445 	ulint		flags,		/*!< in: undo logging and
2446 					locking flags */
2447 	dict_index_t*	index,		/*!< in: the index tree */
2448 	buf_block_t*	block,		/*!< in/out: page to be split */
2449 	const rec_t*	split_rec,	/*!< in: first record on upper
2450 					half page */
2451 	buf_block_t*	new_block,	/*!< in/out: the new half page */
2452 	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
2453 	mtr_t*		mtr)		/*!< in: mtr */
2454 {
2455 	dtuple_t*	node_ptr_upper;
2456 	mem_heap_t*	heap;
2457 	buf_block_t*	prev_block = NULL;
2458 	buf_block_t*	next_block = NULL;
2459 	buf_block_t*	lower_block;
2460 	buf_block_t*	upper_block;
2461 
2462 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
2463 	ut_ad(mtr->memo_contains_flagged(new_block, MTR_MEMO_PAGE_X_FIX));
2464 
2465 	/* Create a memory heap where the data tuple is stored */
2466 	heap = mem_heap_create(1024);
2467 
2468 	/* Based on split direction, decide upper and lower pages */
2469 	if (direction == FSP_DOWN) {
2470 
2471 		btr_cur_t	cursor;
2472 		rec_offs*	offsets;
2473 
2474 		lower_block = new_block;
2475 		upper_block = block;
2476 
2477 		/* Look up the index for the node pointer to page */
2478 		offsets = btr_page_get_father_block(NULL, heap, index,
2479 						    block, mtr, &cursor);
2480 
2481 		/* Replace the address of the old child node (= page) with the
2482 		address of the new lower half */
2483 
2484 		btr_node_ptr_set_child_page_no(
2485 			btr_cur_get_block(&cursor),
2486 			btr_cur_get_rec(&cursor),
2487 			offsets, lower_block->page.id().page_no(), mtr);
2488 		mem_heap_empty(heap);
2489 	} else {
2490 		lower_block = block;
2491 		upper_block = new_block;
2492 	}
2493 
2494 	/* Get the level of the split pages */
2495 	const ulint level = btr_page_get_level(buf_block_get_frame(block));
2496 	ut_ad(level == btr_page_get_level(buf_block_get_frame(new_block)));
2497 
2498 	/* Get the previous and next pages of page */
2499 	const uint32_t prev_page_no = btr_page_get_prev(block->frame);
2500 	const uint32_t next_page_no = btr_page_get_next(block->frame);
2501 
2502 	/* for consistency, both blocks should be locked, before change */
2503 	if (prev_page_no != FIL_NULL && direction == FSP_DOWN) {
2504 		prev_block = btr_block_get(*index, prev_page_no, RW_X_LATCH,
2505 					   !level, mtr);
2506 	}
2507 	if (next_page_no != FIL_NULL && direction != FSP_DOWN) {
2508 		next_block = btr_block_get(*index, next_page_no, RW_X_LATCH,
2509 					   !level, mtr);
2510 	}
2511 
2512 	/* Build the node pointer (= node key and page address) for the upper
2513 	half */
2514 
2515 	node_ptr_upper = dict_index_build_node_ptr(
2516 		index, split_rec, upper_block->page.id().page_no(),
2517 		heap, level);
2518 
2519 	/* Insert it next to the pointer to the lower half. Note that this
2520 	may generate recursion leading to a split on the higher level. */
2521 
2522 	btr_insert_on_non_leaf_level(flags, index, level + 1,
2523 				     node_ptr_upper, mtr);
2524 
2525 	/* Free the memory heap */
2526 	mem_heap_free(heap);
2527 
2528 	/* Update page links of the level */
2529 
2530 	if (prev_block) {
2531 #ifdef UNIV_BTR_DEBUG
2532 		ut_a(page_is_comp(prev_block->frame)
2533 		     == page_is_comp(block->frame));
2534 		ut_a(btr_page_get_next(prev_block->frame)
2535 		     == block->page.id().page_no());
2536 #endif /* UNIV_BTR_DEBUG */
2537 		btr_page_set_next(prev_block, lower_block->page.id().page_no(),
2538 				  mtr);
2539 	}
2540 
2541 	if (next_block) {
2542 #ifdef UNIV_BTR_DEBUG
2543 		ut_a(page_is_comp(next_block->frame)
2544 		     == page_is_comp(block->frame));
2545 		ut_a(btr_page_get_prev(next_block->frame)
2546 		     == block->page.id().page_no());
2547 #endif /* UNIV_BTR_DEBUG */
2548 		btr_page_set_prev(next_block, upper_block->page.id().page_no(),
2549 				  mtr);
2550 	}
2551 
2552 	if (direction == FSP_DOWN) {
2553 		ut_ad(lower_block == new_block);
2554 		ut_ad(btr_page_get_next(upper_block->frame) == next_page_no);
2555 		btr_page_set_prev(lower_block, prev_page_no, mtr);
2556 	} else {
2557 		ut_ad(upper_block == new_block);
2558 		ut_ad(btr_page_get_prev(lower_block->frame) == prev_page_no);
2559 		btr_page_set_next(upper_block, next_page_no, mtr);
2560 	}
2561 
2562 	btr_page_set_prev(upper_block, lower_block->page.id().page_no(), mtr);
2563 	btr_page_set_next(lower_block, upper_block->page.id().page_no(), mtr);
2564 }
2565 
2566 /*************************************************************//**
2567 Determine if a tuple is smaller than any record on the page.
2568 @return TRUE if smaller */
2569 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2570 bool
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,rec_offs ** offsets,ulint n_uniq,mem_heap_t ** heap)2571 btr_page_tuple_smaller(
2572 /*===================*/
2573 	btr_cur_t*	cursor,	/*!< in: b-tree cursor */
2574 	const dtuple_t*	tuple,	/*!< in: tuple to consider */
2575 	rec_offs**	offsets,/*!< in/out: temporary storage */
2576 	ulint		n_uniq,	/*!< in: number of unique fields
2577 				in the index page records */
2578 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2579 {
2580 	buf_block_t*	block;
2581 	const rec_t*	first_rec;
2582 	page_cur_t	pcur;
2583 
2584 	/* Read the first user record in the page. */
2585 	block = btr_cur_get_block(cursor);
2586 	page_cur_set_before_first(block, &pcur);
2587 	page_cur_move_to_next(&pcur);
2588 	first_rec = page_cur_get_rec(&pcur);
2589 
2590 	*offsets = rec_get_offsets(
2591 		first_rec, cursor->index, *offsets,
2592 		page_is_leaf(block->frame) ? cursor->index->n_core_fields : 0,
2593 		n_uniq, heap);
2594 
2595 	return(cmp_dtuple_rec(tuple, first_rec, *offsets) < 0);
2596 }
2597 
2598 /** Insert the tuple into the right sibling page, if the cursor is at the end
2599 of a page.
2600 @param[in]	flags	undo logging and locking flags
2601 @param[in,out]	cursor	cursor at which to insert; when the function succeeds,
2602 			the cursor is positioned before the insert point.
2603 @param[out]	offsets	offsets on inserted record
2604 @param[in,out]	heap	memory heap for allocating offsets
2605 @param[in]	tuple	tuple to insert
2606 @param[in]	n_ext	number of externally stored columns
2607 @param[in,out]	mtr	mini-transaction
2608 @return	inserted record (first record on the right sibling page);
2609 	the cursor will be positioned on the page infimum
2610 @retval	NULL if the operation was not performed */
2611 static
2612 rec_t*
btr_insert_into_right_sibling(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t * heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2613 btr_insert_into_right_sibling(
2614 	ulint		flags,
2615 	btr_cur_t*	cursor,
2616 	rec_offs**	offsets,
2617 	mem_heap_t*	heap,
2618 	const dtuple_t*	tuple,
2619 	ulint		n_ext,
2620 	mtr_t*		mtr)
2621 {
2622 	buf_block_t*	block = btr_cur_get_block(cursor);
2623 	page_t*		page = buf_block_get_frame(block);
2624 	const uint32_t	next_page_no = btr_page_get_next(page);
2625 
2626 	ut_ad(mtr->memo_contains_flagged(&cursor->index->lock,
2627 					 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2628 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
2629 	ut_ad(heap);
2630 
2631 	if (next_page_no == FIL_NULL || !page_rec_is_supremum(
2632 			page_rec_get_next(btr_cur_get_rec(cursor)))) {
2633 
2634 		return(NULL);
2635 	}
2636 
2637 	page_cur_t	next_page_cursor;
2638 	buf_block_t*	next_block;
2639 	page_t*		next_page;
2640 	btr_cur_t	next_father_cursor;
2641 	rec_t*		rec = NULL;
2642 	ulint		max_size;
2643 
2644 	next_block = btr_block_get(*cursor->index, next_page_no, RW_X_LATCH,
2645 				   page_is_leaf(page), mtr);
2646 	if (UNIV_UNLIKELY(!next_block)) {
2647 		return NULL;
2648 	}
2649 	next_page = buf_block_get_frame(next_block);
2650 
2651 	bool	is_leaf = page_is_leaf(next_page);
2652 
2653 	btr_page_get_father(
2654 		cursor->index, next_block, mtr, &next_father_cursor);
2655 
2656 	page_cur_search(
2657 		next_block, cursor->index, tuple, PAGE_CUR_LE,
2658 		&next_page_cursor);
2659 
2660 	max_size = page_get_max_insert_size_after_reorganize(next_page, 1);
2661 
2662 	/* Extends gap lock for the next page */
2663 	if (!dict_table_is_locking_disabled(cursor->index->table)) {
2664 		lock_update_split_left(next_block, block);
2665 	}
2666 
2667 	rec = page_cur_tuple_insert(
2668 		&next_page_cursor, tuple, cursor->index, offsets, &heap,
2669 		n_ext, mtr);
2670 
2671 	if (rec == NULL) {
2672 		if (is_leaf
2673 		    && next_block->page.zip.ssize
2674 		    && !dict_index_is_clust(cursor->index)
2675 		    && !cursor->index->table->is_temporary()) {
2676 			/* Reset the IBUF_BITMAP_FREE bits, because
2677 			page_cur_tuple_insert() will have attempted page
2678 			reorganize before failing. */
2679 			ibuf_reset_free_bits(next_block);
2680 		}
2681 		return(NULL);
2682 	}
2683 
2684 	ibool	compressed;
2685 	dberr_t	err;
2686 	ulint	level = btr_page_get_level(next_page);
2687 
2688 	/* adjust cursor position */
2689 	*btr_cur_get_page_cur(cursor) = next_page_cursor;
2690 
2691 	ut_ad(btr_cur_get_rec(cursor) == page_get_infimum_rec(next_page));
2692 	ut_ad(page_rec_get_next(page_get_infimum_rec(next_page)) == rec);
2693 
2694 	/* We have to change the parent node pointer */
2695 
2696 	compressed = btr_cur_pessimistic_delete(
2697 		&err, TRUE, &next_father_cursor,
2698 		BTR_CREATE_FLAG, false, mtr);
2699 
2700 	ut_a(err == DB_SUCCESS);
2701 
2702 	if (!compressed) {
2703 		btr_cur_compress_if_useful(&next_father_cursor, FALSE, mtr);
2704 	}
2705 
2706 	dtuple_t*	node_ptr = dict_index_build_node_ptr(
2707 		cursor->index, rec, next_block->page.id().page_no(),
2708 		heap, level);
2709 
2710 	btr_insert_on_non_leaf_level(
2711 		flags, cursor->index, level + 1, node_ptr, mtr);
2712 
2713 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2714 
2715 	if (is_leaf
2716 	    && !dict_index_is_clust(cursor->index)
2717 	    && !cursor->index->table->is_temporary()) {
2718 		/* Update the free bits of the B-tree page in the
2719 		insert buffer bitmap. */
2720 
2721 		if (next_block->page.zip.ssize) {
2722 			ibuf_update_free_bits_zip(next_block, mtr);
2723 		} else {
2724 			ibuf_update_free_bits_if_full(
2725 				next_block, max_size,
2726 				rec_offs_size(*offsets) + PAGE_DIR_SLOT_SIZE);
2727 		}
2728 	}
2729 
2730 	return(rec);
2731 }
2732 
2733 /*************************************************************//**
2734 Splits an index page to halves and inserts the tuple. It is assumed
2735 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2736 released within this function! NOTE that the operation of this
2737 function must always succeed, we cannot reverse it: therefore enough
2738 free disk space (2 pages) must be guaranteed to be available before
2739 this function is called.
2740 NOTE: jonaso added support for calling function with tuple == NULL
2741 which cause it to only split a page.
2742 
2743 @return inserted record or NULL if run out of space */
2744 rec_t*
btr_page_split_and_insert(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2745 btr_page_split_and_insert(
2746 /*======================*/
2747 	ulint		flags,	/*!< in: undo logging and locking flags */
2748 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
2749 				function returns, the cursor is positioned
2750 				on the predecessor of the inserted record */
2751 	rec_offs**	offsets,/*!< out: offsets on inserted record */
2752 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
2753 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2754 	ulint		n_ext,	/*!< in: number of externally stored columns */
2755 	mtr_t*		mtr)	/*!< in: mtr */
2756 {
2757 	buf_block_t*	block;
2758 	page_t*		page;
2759 	page_zip_des_t*	page_zip;
2760 	buf_block_t*	new_block;
2761 	page_t*		new_page;
2762 	page_zip_des_t*	new_page_zip;
2763 	rec_t*		split_rec;
2764 	buf_block_t*	left_block;
2765 	buf_block_t*	right_block;
2766 	page_cur_t*	page_cursor;
2767 	rec_t*		first_rec;
2768 	byte*		buf = 0; /* remove warning */
2769 	rec_t*		move_limit;
2770 	ulint		n_iterations = 0;
2771 	ulint		n_uniq;
2772 
2773 	if (cursor->index->is_spatial()) {
2774 		/* Split rtree page and update parent */
2775 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2776 						 tuple, n_ext, mtr));
2777 	}
2778 
2779 	if (!*heap) {
2780 		*heap = mem_heap_create(1024);
2781 	}
2782 	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
2783 func_start:
2784 	mem_heap_empty(*heap);
2785 	*offsets = NULL;
2786 
2787 	ut_ad(mtr->memo_contains_flagged(&cursor->index->lock, MTR_MEMO_X_LOCK
2788 					 | MTR_MEMO_SX_LOCK));
2789 	ut_ad(!dict_index_is_online_ddl(cursor->index)
2790 	      || (flags & BTR_CREATE_FLAG)
2791 	      || dict_index_is_clust(cursor->index));
2792 	ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
2793 				  RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
2794 
2795 	block = btr_cur_get_block(cursor);
2796 	page = buf_block_get_frame(block);
2797 	page_zip = buf_block_get_page_zip(block);
2798 
2799 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
2800 	ut_ad(!page_is_empty(page));
2801 
2802 	/* try to insert to the next page if possible before split */
2803 	if (rec_t* rec = btr_insert_into_right_sibling(
2804 		    flags, cursor, offsets, *heap, tuple, n_ext, mtr)) {
2805 		return(rec);
2806 	}
2807 
2808 	/* 1. Decide the split record; split_rec == NULL means that the
2809 	tuple to be inserted should be the first record on the upper
2810 	half-page */
2811 	bool insert_left = false;
2812 	uint32_t hint_page_no = block->page.id().page_no() + 1;
2813 	byte direction = FSP_UP;
2814 
2815 	if (tuple && n_iterations > 0) {
2816 		split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
2817 
2818 		if (split_rec == NULL) {
2819 			insert_left = btr_page_tuple_smaller(
2820 				cursor, tuple, offsets, n_uniq, heap);
2821 		}
2822 	} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
2823 	} else if ((split_rec = btr_page_get_split_rec_to_left(cursor))) {
2824 		direction = FSP_DOWN;
2825 		hint_page_no -= 2;
2826 	} else {
2827 		/* If there is only one record in the index page, we
2828 		can't split the node in the middle by default. We need
2829 		to determine whether the new record will be inserted
2830 		to the left or right. */
2831 
2832 		if (page_get_n_recs(page) > 1) {
2833 			split_rec = page_get_middle_rec(page);
2834 		} else if (btr_page_tuple_smaller(cursor, tuple,
2835 						  offsets, n_uniq, heap)) {
2836 			split_rec = page_rec_get_next(
2837 				page_get_infimum_rec(page));
2838 		} else {
2839 			split_rec = NULL;
2840 		}
2841 	}
2842 
2843 	DBUG_EXECUTE_IF("disk_is_full",
2844 			os_has_said_disk_full = true;
2845 			return(NULL););
2846 
2847 	/* 2. Allocate a new page to the index */
2848 	const uint16_t page_level = btr_page_get_level(page);
2849 	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2850 				   page_level, mtr, mtr);
2851 
2852 	if (!new_block) {
2853 		return(NULL);
2854 	}
2855 
2856 	new_page = buf_block_get_frame(new_block);
2857 	new_page_zip = buf_block_get_page_zip(new_block);
2858 
2859 	if (page_level && UNIV_LIKELY_NULL(new_page_zip)) {
2860 		/* ROW_FORMAT=COMPRESSED non-leaf pages are not expected
2861 		to contain FIL_NULL in FIL_PAGE_PREV at this stage. */
2862 		memset_aligned<4>(new_page + FIL_PAGE_PREV, 0, 4);
2863 	}
2864 	btr_page_create(new_block, new_page_zip, cursor->index,
2865 			page_level, mtr);
2866 	/* Only record the leaf level page splits. */
2867 	if (!page_level) {
2868 		cursor->index->stat_defrag_n_page_split ++;
2869 		cursor->index->stat_defrag_modified_counter ++;
2870 		btr_defragment_save_defrag_stats_if_needed(cursor->index);
2871 	}
2872 
2873 	/* 3. Calculate the first record on the upper half-page, and the
2874 	first record (move_limit) on original page which ends up on the
2875 	upper half */
2876 
2877 	if (split_rec) {
2878 		first_rec = move_limit = split_rec;
2879 
2880 		*offsets = rec_get_offsets(split_rec, cursor->index, *offsets,
2881 					   page_is_leaf(page)
2882 					   ? cursor->index->n_core_fields : 0,
2883 					   n_uniq, heap);
2884 
2885 		insert_left = !tuple
2886 			|| cmp_dtuple_rec(tuple, split_rec, *offsets) < 0;
2887 
2888 		if (!insert_left && new_page_zip && n_iterations > 0) {
2889 			/* If a compressed page has already been split,
2890 			avoid further splits by inserting the record
2891 			to an empty page. */
2892 			split_rec = NULL;
2893 			goto insert_empty;
2894 		}
2895 	} else if (insert_left) {
2896 		ut_a(n_iterations > 0);
2897 		first_rec = page_rec_get_next(page_get_infimum_rec(page));
2898 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2899 	} else {
2900 insert_empty:
2901 		ut_ad(!split_rec);
2902 		ut_ad(!insert_left);
2903 		buf = UT_NEW_ARRAY_NOKEY(
2904 			byte,
2905 			rec_get_converted_size(cursor->index, tuple, n_ext));
2906 
2907 		first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
2908 						      tuple, n_ext);
2909 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2910 	}
2911 
2912 	/* 4. Do first the modifications in the tree structure */
2913 
2914 	/* FIXME: write FIL_PAGE_PREV,FIL_PAGE_NEXT in new_block earlier! */
2915 	btr_attach_half_pages(flags, cursor->index, block,
2916 			      first_rec, new_block, direction, mtr);
2917 
2918 	/* If the split is made on the leaf level and the insert will fit
2919 	on the appropriate half-page, we may release the tree x-latch.
2920 	We can then move the records after releasing the tree latch,
2921 	thus reducing the tree latch contention. */
2922 	bool insert_will_fit;
2923 	if (tuple == NULL) {
2924 		insert_will_fit = true;
2925 	} else if (split_rec) {
2926 		insert_will_fit = !new_page_zip
2927 			&& btr_page_insert_fits(cursor, split_rec,
2928 						offsets, tuple, n_ext, heap);
2929 	} else {
2930 		if (!insert_left) {
2931 			UT_DELETE_ARRAY(buf);
2932 			buf = NULL;
2933 		}
2934 
2935 		insert_will_fit = !new_page_zip
2936 			&& btr_page_insert_fits(cursor, NULL,
2937 						offsets, tuple, n_ext, heap);
2938 	}
2939 
2940 	if (!srv_read_only_mode
2941 	    && insert_will_fit
2942 	    && page_is_leaf(page)
2943 	    && !dict_index_is_online_ddl(cursor->index)) {
2944 
2945 		mtr->memo_release(
2946 			dict_index_get_lock(cursor->index),
2947 			MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK);
2948 
2949 		/* NOTE: We cannot release root block latch here, because it
2950 		has segment header and already modified in most of cases.*/
2951 	}
2952 
2953 	/* 5. Move then the records to the new page */
2954 	if (direction == FSP_DOWN) {
2955 		/*		fputs("Split left\n", stderr); */
2956 
2957 		if (0
2958 #ifdef UNIV_ZIP_COPY
2959 		    || page_zip
2960 #endif /* UNIV_ZIP_COPY */
2961 		    || !page_move_rec_list_start(new_block, block, move_limit,
2962 						 cursor->index, mtr)) {
2963 			/* For some reason, compressing new_page failed,
2964 			even though it should contain fewer records than
2965 			the original page.  Copy the page byte for byte
2966 			and then delete the records from both pages
2967 			as appropriate.  Deleting will always succeed. */
2968 			ut_a(new_page_zip);
2969 
2970 			page_zip_copy_recs(new_block,
2971 					   page_zip, page, cursor->index, mtr);
2972 			page_delete_rec_list_end(move_limit - page + new_page,
2973 						 new_block, cursor->index,
2974 						 ULINT_UNDEFINED,
2975 						 ULINT_UNDEFINED, mtr);
2976 
2977 			/* Update the lock table and possible hash index. */
2978 			lock_move_rec_list_start(
2979 				new_block, block, move_limit,
2980 				new_page + PAGE_NEW_INFIMUM);
2981 
2982 			btr_search_move_or_delete_hash_entries(
2983 				new_block, block);
2984 
2985 			/* Delete the records from the source page. */
2986 
2987 			page_delete_rec_list_start(move_limit, block,
2988 						   cursor->index, mtr);
2989 		}
2990 
2991 		left_block = new_block;
2992 		right_block = block;
2993 
2994 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
2995 			lock_update_split_left(right_block, left_block);
2996 		}
2997 	} else {
2998 		/*		fputs("Split right\n", stderr); */
2999 
3000 		if (0
3001 #ifdef UNIV_ZIP_COPY
3002 		    || page_zip
3003 #endif /* UNIV_ZIP_COPY */
3004 		    || !page_move_rec_list_end(new_block, block, move_limit,
3005 					       cursor->index, mtr)) {
3006 			/* For some reason, compressing new_page failed,
3007 			even though it should contain fewer records than
3008 			the original page.  Copy the page byte for byte
3009 			and then delete the records from both pages
3010 			as appropriate.  Deleting will always succeed. */
3011 			ut_a(new_page_zip);
3012 
3013 			page_zip_copy_recs(new_block,
3014 					   page_zip, page, cursor->index, mtr);
3015 			page_delete_rec_list_start(move_limit - page
3016 						   + new_page, new_block,
3017 						   cursor->index, mtr);
3018 
3019 			/* Update the lock table and possible hash index. */
3020 			lock_move_rec_list_end(new_block, block, move_limit);
3021 
3022 			btr_search_move_or_delete_hash_entries(
3023 				new_block, block);
3024 
3025 			/* Delete the records from the source page. */
3026 
3027 			page_delete_rec_list_end(move_limit, block,
3028 						 cursor->index,
3029 						 ULINT_UNDEFINED,
3030 						 ULINT_UNDEFINED, mtr);
3031 		}
3032 
3033 		left_block = block;
3034 		right_block = new_block;
3035 
3036 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
3037 			lock_update_split_right(right_block, left_block);
3038 		}
3039 	}
3040 
3041 #ifdef UNIV_ZIP_DEBUG
3042 	if (page_zip) {
3043 		ut_a(page_zip_validate(page_zip, page, cursor->index));
3044 		ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
3045 	}
3046 #endif /* UNIV_ZIP_DEBUG */
3047 
3048 	/* At this point, split_rec, move_limit and first_rec may point
3049 	to garbage on the old page. */
3050 
3051 	/* 6. The split and the tree modification is now completed. Decide the
3052 	page where the tuple should be inserted */
3053 	rec_t* rec;
3054 	buf_block_t* const insert_block = insert_left
3055 		? left_block : right_block;
3056 
3057 	if (UNIV_UNLIKELY(!tuple)) {
3058 		rec = NULL;
3059 		goto func_exit;
3060 	}
3061 
3062 	/* 7. Reposition the cursor for insert and try insertion */
3063 	page_cursor = btr_cur_get_page_cur(cursor);
3064 
3065 	page_cur_search(insert_block, cursor->index, tuple, page_cursor);
3066 
3067 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3068 				    offsets, heap, n_ext, mtr);
3069 
3070 #ifdef UNIV_ZIP_DEBUG
3071 	{
3072 		page_t*		insert_page
3073 			= buf_block_get_frame(insert_block);
3074 
3075 		page_zip_des_t*	insert_page_zip
3076 			= buf_block_get_page_zip(insert_block);
3077 
3078 		ut_a(!insert_page_zip
3079 		     || page_zip_validate(insert_page_zip, insert_page,
3080 					  cursor->index));
3081 	}
3082 #endif /* UNIV_ZIP_DEBUG */
3083 
3084 	if (rec != NULL) {
3085 
3086 		goto func_exit;
3087 	}
3088 
3089 	/* 8. If insert did not fit, try page reorganization.
3090 	For compressed pages, page_cur_tuple_insert() will have
3091 	attempted this already. */
3092 
3093 	if (page_cur_get_page_zip(page_cursor)
3094 	    || !btr_page_reorganize(page_cursor, cursor->index, mtr)) {
3095 
3096 		goto insert_failed;
3097 	}
3098 
3099 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3100 				    offsets, heap, n_ext, mtr);
3101 
3102 	if (rec == NULL) {
3103 		/* The insert did not fit on the page: loop back to the
3104 		start of the function for a new split */
3105 insert_failed:
3106 		/* We play safe and reset the free bits for new_page */
3107 		if (!dict_index_is_clust(cursor->index)
3108 		    && !cursor->index->table->is_temporary()) {
3109 			ibuf_reset_free_bits(new_block);
3110 			ibuf_reset_free_bits(block);
3111 		}
3112 
3113 		n_iterations++;
3114 		ut_ad(n_iterations < 2
3115 		      || buf_block_get_page_zip(insert_block));
3116 		ut_ad(!insert_will_fit);
3117 
3118 		goto func_start;
3119 	}
3120 
3121 func_exit:
3122 	/* Insert fit on the page: update the free bits for the
3123 	left and right pages in the same mtr */
3124 
3125 	if (!dict_index_is_clust(cursor->index)
3126 	    && !cursor->index->table->is_temporary()
3127 	    && page_is_leaf(page)) {
3128 
3129 		ibuf_update_free_bits_for_two_pages_low(
3130 			left_block, right_block, mtr);
3131 	}
3132 
3133 	MONITOR_INC(MONITOR_INDEX_SPLIT);
3134 
3135 	ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
3136 	ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
3137 
3138 	ut_ad(tuple || !rec);
3139 	ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
3140 	return(rec);
3141 }
3142 
3143 /** Remove a page from the level list of pages.
3144 @param[in]	block		page to remove
3145 @param[in]	index		index tree
3146 @param[in,out]	mtr		mini-transaction */
btr_level_list_remove(const buf_block_t & block,const dict_index_t & index,mtr_t * mtr)3147 dberr_t btr_level_list_remove(const buf_block_t& block,
3148                               const dict_index_t& index, mtr_t* mtr)
3149 {
3150 	ut_ad(mtr->memo_contains_flagged(&block, MTR_MEMO_PAGE_X_FIX));
3151 	ut_ad(block.zip_size() == index.table->space->zip_size());
3152 	ut_ad(index.table->space->id == block.page.id().space());
3153 	/* Get the previous and next page numbers of page */
3154 
3155 	const page_t* page = block.frame;
3156 	const uint32_t	prev_page_no = btr_page_get_prev(page);
3157 	const uint32_t	next_page_no = btr_page_get_next(page);
3158 
3159 	/* Update page links of the level */
3160 
3161 	if (prev_page_no != FIL_NULL) {
3162 		buf_block_t*	prev_block = btr_block_get(
3163 			index, prev_page_no, RW_X_LATCH, page_is_leaf(page),
3164 			mtr);
3165 #ifdef UNIV_BTR_DEBUG
3166 		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
3167 		static_assert(FIL_PAGE_NEXT % 4 == 0, "alignment");
3168 		static_assert(FIL_PAGE_OFFSET % 4 == 0, "alignment");
3169 		ut_a(!memcmp_aligned<4>(prev_block->frame + FIL_PAGE_NEXT,
3170 					page + FIL_PAGE_OFFSET, 4));
3171 #endif /* UNIV_BTR_DEBUG */
3172 
3173 		btr_page_set_next(prev_block, next_page_no, mtr);
3174 	}
3175 
3176 	if (next_page_no != FIL_NULL) {
3177 		buf_block_t*	next_block = btr_block_get(
3178 			index, next_page_no, RW_X_LATCH, page_is_leaf(page),
3179 			mtr);
3180 
3181 		if (!next_block) {
3182 			return DB_ERROR;
3183 		}
3184 #ifdef UNIV_BTR_DEBUG
3185 		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
3186 		static_assert(FIL_PAGE_PREV % 4 == 0, "alignment");
3187 		static_assert(FIL_PAGE_OFFSET % 4 == 0, "alignment");
3188 		ut_a(!memcmp_aligned<4>(next_block->frame + FIL_PAGE_PREV,
3189 					page + FIL_PAGE_OFFSET, 4));
3190 #endif /* UNIV_BTR_DEBUG */
3191 
3192 		btr_page_set_prev(next_block, prev_page_no, mtr);
3193 	}
3194 
3195 	return DB_SUCCESS;
3196 }
3197 
3198 /*************************************************************//**
3199 If page is the only on its level, this function moves its records to the
3200 father page, thus reducing the tree height.
3201 @return father block */
3202 UNIV_INTERN
3203 buf_block_t*
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3204 btr_lift_page_up(
3205 /*=============*/
3206 	dict_index_t*	index,	/*!< in: index tree */
3207 	buf_block_t*	block,	/*!< in: page which is the only on its level;
3208 				must not be empty: use
3209 				btr_discard_only_page_on_level if the last
3210 				record from the page should be removed */
3211 	mtr_t*		mtr)	/*!< in: mtr */
3212 {
3213 	buf_block_t*	father_block;
3214 	ulint		page_level;
3215 	page_zip_des_t*	father_page_zip;
3216 	page_t*		page		= buf_block_get_frame(block);
3217 	ulint		root_page_no;
3218 	buf_block_t*	blocks[BTR_MAX_LEVELS];
3219 	ulint		n_blocks;	/*!< last used index in blocks[] */
3220 	ulint		i;
3221 	bool		lift_father_up;
3222 	buf_block_t*	block_orig	= block;
3223 
3224 	ut_ad(!page_has_siblings(page));
3225 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
3226 
3227 	page_level = btr_page_get_level(page);
3228 	root_page_no = dict_index_get_page(index);
3229 
3230 	{
3231 		btr_cur_t	cursor;
3232 		rec_offs*	offsets	= NULL;
3233 		mem_heap_t*	heap	= mem_heap_create(
3234 			sizeof(*offsets)
3235 			* (REC_OFFS_HEADER_SIZE + 1 + 1
3236 			   + unsigned(index->n_fields)));
3237 		buf_block_t*	b;
3238 
3239 		if (dict_index_is_spatial(index)) {
3240 			offsets = rtr_page_get_father_block(
3241 				NULL, heap, index, block, mtr,
3242 				NULL, &cursor);
3243 		} else {
3244 			offsets = btr_page_get_father_block(offsets, heap,
3245 							    index, block,
3246 							    mtr, &cursor);
3247 		}
3248 		father_block = btr_cur_get_block(&cursor);
3249 		father_page_zip = buf_block_get_page_zip(father_block);
3250 
3251 		n_blocks = 0;
3252 
3253 		/* Store all ancestor pages so we can reset their
3254 		levels later on.  We have to do all the searches on
3255 		the tree now because later on, after we've replaced
3256 		the first level, the tree is in an inconsistent state
3257 		and can not be searched. */
3258 		for (b = father_block;
3259 		     b->page.id().page_no() != root_page_no; ) {
3260 			ut_a(n_blocks < BTR_MAX_LEVELS);
3261 
3262 			if (dict_index_is_spatial(index)) {
3263 				offsets = rtr_page_get_father_block(
3264 					NULL, heap, index, b, mtr,
3265 					NULL, &cursor);
3266 			} else {
3267 				offsets = btr_page_get_father_block(offsets,
3268 								    heap,
3269 								    index, b,
3270 								    mtr,
3271 								    &cursor);
3272 			}
3273 
3274 			blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
3275 		}
3276 
3277 		lift_father_up = (n_blocks && page_level == 0);
3278 		if (lift_father_up) {
3279 			/* The father page also should be the only on its level (not
3280 			root). We should lift up the father page at first.
3281 			Because the leaf page should be lifted up only for root page.
3282 			The freeing page is based on page_level (==0 or !=0)
3283 			to choose segment. If the page_level is changed ==0 from !=0,
3284 			later freeing of the page doesn't find the page allocation
3285 			to be freed.*/
3286 
3287 			block = father_block;
3288 			page = buf_block_get_frame(block);
3289 			page_level = btr_page_get_level(page);
3290 
3291 			ut_ad(!page_has_siblings(page));
3292 			ut_ad(mtr->memo_contains_flagged(block,
3293 							 MTR_MEMO_PAGE_X_FIX));
3294 
3295 			father_block = blocks[0];
3296 			father_page_zip = buf_block_get_page_zip(father_block);
3297 		}
3298 
3299 		mem_heap_free(heap);
3300 	}
3301 
3302 	btr_search_drop_page_hash_index(block);
3303 
3304 	/* Make the father empty */
3305 	btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
3306 	/* btr_page_empty() is supposed to zero-initialize the field. */
3307 	ut_ad(!page_get_instant(father_block->frame));
3308 
3309 	if (index->is_instant()
3310 	    && father_block->page.id().page_no() == root_page_no) {
3311 		ut_ad(!father_page_zip);
3312 		btr_set_instant(father_block, *index, mtr);
3313 	}
3314 
3315 	page_level++;
3316 
3317 	/* Copy the records to the father page one by one. */
3318 	if (0
3319 #ifdef UNIV_ZIP_COPY
3320 	    || father_page_zip
3321 #endif /* UNIV_ZIP_COPY */
3322 	    || !page_copy_rec_list_end(father_block, block,
3323 				       page_get_infimum_rec(page),
3324 				       index, mtr)) {
3325 		const page_zip_des_t*	page_zip
3326 			= buf_block_get_page_zip(block);
3327 		ut_a(father_page_zip);
3328 		ut_a(page_zip);
3329 
3330 		/* Copy the page byte for byte. */
3331 		page_zip_copy_recs(father_block,
3332 				   page_zip, page, index, mtr);
3333 
3334 		/* Update the lock table and possible hash index. */
3335 
3336 		lock_move_rec_list_end(father_block, block,
3337 				       page_get_infimum_rec(page));
3338 
3339 		/* Also update the predicate locks */
3340 		if (dict_index_is_spatial(index)) {
3341 			lock_prdt_rec_move(father_block, block);
3342 		} else {
3343 			btr_search_move_or_delete_hash_entries(
3344 				father_block, block);
3345 		}
3346 	}
3347 
3348 	if (!dict_table_is_locking_disabled(index->table)) {
3349 		/* Free predicate page locks on the block */
3350 		if (dict_index_is_spatial(index)) {
3351 			lock_mutex_enter();
3352 			lock_prdt_page_free_from_discard(
3353 				block, &lock_sys.prdt_page_hash);
3354 			lock_mutex_exit();
3355 		}
3356 		lock_update_copy_and_discard(father_block, block);
3357 	}
3358 
3359 	/* Go upward to root page, decrementing levels by one. */
3360 	for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
3361 		ut_ad(btr_page_get_level(blocks[i]->frame) == page_level + 1);
3362 		btr_page_set_level(blocks[i], page_level, mtr);
3363 	}
3364 
3365 	if (dict_index_is_spatial(index)) {
3366 		rtr_check_discard_page(index, NULL, block);
3367 	}
3368 
3369 	/* Free the file page */
3370 	btr_page_free(index, block, mtr);
3371 
3372 	/* We play it safe and reset the free bits for the father */
3373 	if (!dict_index_is_clust(index)
3374 	    && !index->table->is_temporary()) {
3375 		ibuf_reset_free_bits(father_block);
3376 	}
3377 	ut_ad(page_validate(father_block->frame, index));
3378 	ut_ad(btr_check_node_ptr(index, father_block, mtr));
3379 
3380 	return(lift_father_up ? block_orig : father_block);
3381 }
3382 
3383 /*************************************************************//**
3384 Tries to merge the page first to the left immediate brother if such a
3385 brother exists, and the node pointers to the current page and to the brother
3386 reside on the same page. If the left brother does not satisfy these
3387 conditions, looks at the right brother. If the page is the only one on that
3388 level lifts the records of the page to the father page, thus reducing the
3389 tree height. It is assumed that mtr holds an x-latch on the tree and on the
3390 page. If cursor is on the leaf level, mtr must also hold x-latches to the
3391 brothers, if they exist.
3392 @return TRUE on success */
3393 ibool
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3394 btr_compress(
3395 /*=========*/
3396 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to merge
3397 				or lift; the page must not be empty:
3398 				when deleting records, use btr_discard_page()
3399 				if the page would become empty */
3400 	ibool		adjust,	/*!< in: TRUE if should adjust the
3401 				cursor position even if compression occurs */
3402 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3403 {
3404 	dict_index_t*	index;
3405 	buf_block_t*	merge_block;
3406 	page_t*		merge_page = NULL;
3407 	page_zip_des_t*	merge_page_zip;
3408 	ibool		is_left;
3409 	buf_block_t*	block;
3410 	page_t*		page;
3411 	btr_cur_t	father_cursor;
3412 	mem_heap_t*	heap;
3413 	rec_offs*	offsets;
3414 	ulint		nth_rec = 0; /* remove bogus warning */
3415 	bool		mbr_changed = false;
3416 #ifdef UNIV_DEBUG
3417 	bool		leftmost_child;
3418 #endif
3419 	DBUG_ENTER("btr_compress");
3420 
3421 	block = btr_cur_get_block(cursor);
3422 	page = btr_cur_get_page(cursor);
3423 	index = btr_cur_get_index(cursor);
3424 
3425 	btr_assert_not_corrupted(block, index);
3426 
3427 	ut_ad(mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
3428 					 | MTR_MEMO_SX_LOCK));
3429 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
3430 
3431 	MONITOR_INC(MONITOR_INDEX_MERGE_ATTEMPTS);
3432 
3433 	const uint32_t left_page_no = btr_page_get_prev(page);
3434 	const uint32_t right_page_no = btr_page_get_next(page);
3435 
3436 #ifdef UNIV_DEBUG
3437 	if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3438 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3439 			page_rec_get_next(page_get_infimum_rec(page)),
3440 			page_is_comp(page)));
3441 	}
3442 #endif /* UNIV_DEBUG */
3443 
3444 	heap = mem_heap_create(100);
3445 
3446 	if (dict_index_is_spatial(index)) {
3447 		offsets = rtr_page_get_father_block(
3448 			NULL, heap, index, block, mtr, cursor, &father_cursor);
3449 		ut_ad(cursor->page_cur.block->page.id() == block->page.id());
3450 		rec_t*  my_rec = father_cursor.page_cur.rec;
3451 
3452 		ulint page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3453 
3454 		if (page_no != block->page.id().page_no()) {
3455 			ib::info() << "father positioned on page "
3456 				<< page_no << "instead of "
3457 				<< block->page.id().page_no();
3458 			offsets = btr_page_get_father_block(
3459 				NULL, heap, index, block, mtr, &father_cursor);
3460 		}
3461 	} else {
3462 		offsets = btr_page_get_father_block(
3463 			NULL, heap, index, block, mtr, &father_cursor);
3464 	}
3465 
3466 	if (adjust) {
3467 		nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3468 		ut_ad(nth_rec > 0);
3469 	}
3470 
3471 	if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3472 		/* The page is the only one on the level, lift the records
3473 		to the father */
3474 
3475 		merge_block = btr_lift_page_up(index, block, mtr);
3476 		goto func_exit;
3477 	}
3478 
3479 	ut_d(leftmost_child =
3480 		left_page_no != FIL_NULL
3481 		&& (page_rec_get_next(
3482 			page_get_infimum_rec(
3483 				btr_cur_get_page(&father_cursor)))
3484 		    == btr_cur_get_rec(&father_cursor)));
3485 
3486 	/* Decide the page to which we try to merge and which will inherit
3487 	the locks */
3488 
3489 	is_left = btr_can_merge_with_page(cursor, left_page_no,
3490 					  &merge_block, mtr);
3491 
3492 	DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3493 retry:
3494 	if (!is_left
3495 	   && !btr_can_merge_with_page(cursor, right_page_no, &merge_block,
3496 				       mtr)) {
3497 		if (!merge_block) {
3498 			merge_page = NULL;
3499 		}
3500 		goto err_exit;
3501 	}
3502 
3503 	merge_page = buf_block_get_frame(merge_block);
3504 
3505 #ifdef UNIV_BTR_DEBUG
3506 	if (is_left) {
3507 		ut_a(btr_page_get_next(merge_page)
3508 		     == block->page.id().page_no());
3509 	} else {
3510 		ut_a(btr_page_get_prev(merge_page)
3511 		     == block->page.id().page_no());
3512 	}
3513 #endif /* UNIV_BTR_DEBUG */
3514 
3515 	ut_ad(page_validate(merge_page, index));
3516 
3517 	merge_page_zip = buf_block_get_page_zip(merge_block);
3518 #ifdef UNIV_ZIP_DEBUG
3519 	if (merge_page_zip) {
3520 		const page_zip_des_t*	page_zip
3521 			= buf_block_get_page_zip(block);
3522 		ut_a(page_zip);
3523 		ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3524 		ut_a(page_zip_validate(page_zip, page, index));
3525 	}
3526 #endif /* UNIV_ZIP_DEBUG */
3527 
3528 	/* Move records to the merge page */
3529 	if (is_left) {
3530 		btr_cur_t	cursor2;
3531 		rtr_mbr_t	new_mbr;
3532 		rec_offs*	offsets2 = NULL;
3533 
3534 		/* For rtree, we need to update father's mbr. */
3535 		if (index->is_spatial()) {
3536 			/* We only support merge pages with the same parent
3537 			page */
3538 			if (!rtr_check_same_block(
3539 				index, &cursor2,
3540 				btr_cur_get_block(&father_cursor),
3541 				merge_block, heap)) {
3542 				is_left = false;
3543 				goto retry;
3544 			}
3545 
3546 			/* Set rtr_info for cursor2, since it is
3547 			necessary in recursive page merge. */
3548 			cursor2.rtr_info = cursor->rtr_info;
3549 			cursor2.tree_height = cursor->tree_height;
3550 
3551 			offsets2 = rec_get_offsets(
3552 				btr_cur_get_rec(&cursor2), index, NULL,
3553 				page_is_leaf(cursor2.page_cur.block->frame)
3554 				? index->n_fields : 0,
3555 				ULINT_UNDEFINED, &heap);
3556 
3557 			/* Check if parent entry needs to be updated */
3558 			mbr_changed = rtr_merge_mbr_changed(
3559 				&cursor2, &father_cursor,
3560 				offsets2, offsets, &new_mbr);
3561 		}
3562 
3563 		rec_t*	orig_pred = page_copy_rec_list_start(
3564 			merge_block, block, page_get_supremum_rec(page),
3565 			index, mtr);
3566 
3567 		if (!orig_pred) {
3568 			goto err_exit;
3569 		}
3570 
3571 		btr_search_drop_page_hash_index(block);
3572 
3573 		/* Remove the page from the level list */
3574 		if (DB_SUCCESS != btr_level_list_remove(*block, *index, mtr)) {
3575 			goto err_exit;
3576 		}
3577 
3578 		if (dict_index_is_spatial(index)) {
3579 			rec_t*  my_rec = father_cursor.page_cur.rec;
3580 
3581 			ulint page_no = btr_node_ptr_get_child_page_no(
3582 						my_rec, offsets);
3583 
3584 			if (page_no != block->page.id().page_no()) {
3585 				ib::fatal() << "father positioned on "
3586 					<< page_no << " instead of "
3587 					<< block->page.id().page_no();
3588 			}
3589 
3590 			if (mbr_changed) {
3591 #ifdef UNIV_DEBUG
3592 				bool	success = rtr_update_mbr_field(
3593 					&cursor2, offsets2, &father_cursor,
3594 					merge_page, &new_mbr, NULL, mtr);
3595 
3596 				ut_ad(success);
3597 #else
3598 				rtr_update_mbr_field(
3599 					&cursor2, offsets2, &father_cursor,
3600 					merge_page, &new_mbr, NULL, mtr);
3601 #endif
3602 			} else {
3603 				rtr_node_ptr_delete(&father_cursor, mtr);
3604 			}
3605 
3606 			/* No GAP lock needs to be worrying about */
3607 			lock_mutex_enter();
3608 			lock_prdt_page_free_from_discard(
3609 				block, &lock_sys.prdt_page_hash);
3610 			lock_rec_free_all_from_discard_page(block);
3611 			lock_mutex_exit();
3612 		} else {
3613 			btr_cur_node_ptr_delete(&father_cursor, mtr);
3614 			if (!dict_table_is_locking_disabled(index->table)) {
3615 				lock_update_merge_left(
3616 					merge_block, orig_pred, block);
3617 			}
3618 		}
3619 
3620 		if (adjust) {
3621 			nth_rec += page_rec_get_n_recs_before(orig_pred);
3622 		}
3623 	} else {
3624 		rec_t*		orig_succ;
3625 		ibool		compressed;
3626 		dberr_t		err;
3627 		btr_cur_t	cursor2;
3628 					/* father cursor pointing to node ptr
3629 					of the right sibling */
3630 #ifdef UNIV_BTR_DEBUG
3631 		byte		fil_page_prev[4];
3632 #endif /* UNIV_BTR_DEBUG */
3633 
3634 		if (dict_index_is_spatial(index)) {
3635 			cursor2.rtr_info = NULL;
3636 
3637 			/* For spatial index, we disallow merge of blocks
3638 			with different parents, since the merge would need
3639 			to update entry (for MBR and Primary key) in the
3640 			parent of block being merged */
3641 			if (!rtr_check_same_block(
3642 				index, &cursor2,
3643 				btr_cur_get_block(&father_cursor),
3644 				merge_block, heap)) {
3645 				goto err_exit;
3646 			}
3647 
3648 			/* Set rtr_info for cursor2, since it is
3649 			necessary in recursive page merge. */
3650 			cursor2.rtr_info = cursor->rtr_info;
3651 			cursor2.tree_height = cursor->tree_height;
3652 		} else {
3653 			btr_page_get_father(index, merge_block, mtr, &cursor2);
3654 		}
3655 
3656 		if (merge_page_zip && left_page_no == FIL_NULL) {
3657 
3658 			/* The function page_zip_compress(), which will be
3659 			invoked by page_copy_rec_list_end() below,
3660 			requires that FIL_PAGE_PREV be FIL_NULL.
3661 			Clear the field, but prepare to restore it. */
3662 			static_assert(FIL_PAGE_PREV % 8 == 0, "alignment");
3663 #ifdef UNIV_BTR_DEBUG
3664 			memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3665 #endif /* UNIV_BTR_DEBUG */
3666 			compile_time_assert(FIL_NULL == 0xffffffffU);
3667 			memset_aligned<4>(merge_page + FIL_PAGE_PREV, 0xff, 4);
3668 		}
3669 
3670 		orig_succ = page_copy_rec_list_end(merge_block, block,
3671 						   page_get_infimum_rec(page),
3672 						   cursor->index, mtr);
3673 
3674 		if (!orig_succ) {
3675 			ut_a(merge_page_zip);
3676 #ifdef UNIV_BTR_DEBUG
3677 			if (left_page_no == FIL_NULL) {
3678 				/* FIL_PAGE_PREV was restored from
3679 				merge_page_zip. */
3680 				ut_a(!memcmp(fil_page_prev,
3681 					     merge_page + FIL_PAGE_PREV, 4));
3682 			}
3683 #endif /* UNIV_BTR_DEBUG */
3684 			goto err_exit;
3685 		}
3686 
3687 		btr_search_drop_page_hash_index(block);
3688 
3689 #ifdef UNIV_BTR_DEBUG
3690 		if (merge_page_zip && left_page_no == FIL_NULL) {
3691 
3692 			/* Restore FIL_PAGE_PREV in order to avoid an assertion
3693 			failure in btr_level_list_remove(), which will set
3694 			the field again to FIL_NULL.  Even though this makes
3695 			merge_page and merge_page_zip inconsistent for a
3696 			split second, it is harmless, because the pages
3697 			are X-latched. */
3698 			memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3699 		}
3700 #endif /* UNIV_BTR_DEBUG */
3701 
3702 		/* Remove the page from the level list */
3703 		if (DB_SUCCESS != btr_level_list_remove(*block, *index, mtr)) {
3704 			goto err_exit;
3705 		}
3706 
3707 		ut_ad(btr_node_ptr_get_child_page_no(
3708 			      btr_cur_get_rec(&father_cursor), offsets)
3709 		      == block->page.id().page_no());
3710 
3711 		/* Replace the address of the old child node (= page) with the
3712 		address of the merge page to the right */
3713 		btr_node_ptr_set_child_page_no(
3714 			btr_cur_get_block(&father_cursor),
3715 			btr_cur_get_rec(&father_cursor),
3716 			offsets, right_page_no, mtr);
3717 
3718 #ifdef UNIV_DEBUG
3719 		if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3720 			ut_ad(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3721 				page_rec_get_next(page_get_infimum_rec(
3722 					buf_block_get_frame(merge_block))),
3723 				page_is_comp(page)));
3724 		}
3725 #endif /* UNIV_DEBUG */
3726 
3727 		/* For rtree, we need to update father's mbr. */
3728 		if (index->is_spatial()) {
3729 			rec_offs* offsets2;
3730 			ulint	rec_info;
3731 
3732 			offsets2 = rec_get_offsets(
3733 				btr_cur_get_rec(&cursor2), index, NULL,
3734 				page_is_leaf(cursor2.page_cur.block->frame)
3735 				? index->n_fields : 0,
3736 				ULINT_UNDEFINED, &heap);
3737 
3738 			ut_ad(btr_node_ptr_get_child_page_no(
3739 				btr_cur_get_rec(&cursor2), offsets2)
3740 				== right_page_no);
3741 
3742 			rec_info = rec_get_info_bits(
3743 				btr_cur_get_rec(&father_cursor),
3744 				rec_offs_comp(offsets));
3745 			if (rec_info & REC_INFO_MIN_REC_FLAG) {
3746 				/* When the father node ptr is minimal rec,
3747 				we will keep it and delete the node ptr of
3748 				merge page. */
3749 				rtr_merge_and_update_mbr(&father_cursor,
3750 							 &cursor2,
3751 							 offsets, offsets2,
3752 							 merge_page, mtr);
3753 			} else {
3754 				/* Otherwise, we will keep the node ptr of
3755 				merge page and delete the father node ptr.
3756 				This is for keeping the rec order in upper
3757 				level. */
3758 				rtr_merge_and_update_mbr(&cursor2,
3759 							 &father_cursor,
3760 							 offsets2, offsets,
3761 							 merge_page, mtr);
3762 			}
3763 			lock_mutex_enter();
3764 			lock_prdt_page_free_from_discard(
3765 				block, &lock_sys.prdt_page_hash);
3766 			lock_rec_free_all_from_discard_page(block);
3767 			lock_mutex_exit();
3768 		} else {
3769 
3770 			compressed = btr_cur_pessimistic_delete(&err, TRUE,
3771 								&cursor2,
3772 								BTR_CREATE_FLAG,
3773 								false, mtr);
3774 			ut_a(err == DB_SUCCESS);
3775 
3776 			if (!compressed) {
3777 				btr_cur_compress_if_useful(&cursor2,
3778 							   FALSE,
3779 							   mtr);
3780 			}
3781 
3782 			if (!dict_table_is_locking_disabled(index->table)) {
3783 				lock_update_merge_right(
3784 					merge_block, orig_succ, block);
3785 			}
3786 		}
3787 	}
3788 
3789 	if (!dict_index_is_clust(index)
3790 	    && !index->table->is_temporary()
3791 	    && page_is_leaf(merge_page)) {
3792 		/* Update the free bits of the B-tree page in the
3793 		insert buffer bitmap.  This has to be done in a
3794 		separate mini-transaction that is committed before the
3795 		main mini-transaction.  We cannot update the insert
3796 		buffer bitmap in this mini-transaction, because
3797 		btr_compress() can be invoked recursively without
3798 		committing the mini-transaction in between.  Since
3799 		insert buffer bitmap pages have a lower rank than
3800 		B-tree pages, we must not access other pages in the
3801 		same mini-transaction after accessing an insert buffer
3802 		bitmap page. */
3803 
3804 		/* The free bits in the insert buffer bitmap must
3805 		never exceed the free space on a page.  It is safe to
3806 		decrement or reset the bits in the bitmap in a
3807 		mini-transaction that is committed before the
3808 		mini-transaction that affects the free space. */
3809 
3810 		/* It is unsafe to increment the bits in a separately
3811 		committed mini-transaction, because in crash recovery,
3812 		the free bits could momentarily be set too high. */
3813 
3814 		if (merge_block->zip_size()) {
3815 			/* Because the free bits may be incremented
3816 			and we cannot update the insert buffer bitmap
3817 			in the same mini-transaction, the only safe
3818 			thing we can do here is the pessimistic
3819 			approach: reset the free bits. */
3820 			ibuf_reset_free_bits(merge_block);
3821 		} else {
3822 			/* On uncompressed pages, the free bits will
3823 			never increase here.  Thus, it is safe to
3824 			write the bits accurately in a separate
3825 			mini-transaction. */
3826 			ibuf_update_free_bits_if_full(merge_block,
3827 						      srv_page_size,
3828 						      ULINT_UNDEFINED);
3829 		}
3830 	}
3831 
3832 	ut_ad(page_validate(merge_page, index));
3833 #ifdef UNIV_ZIP_DEBUG
3834 	ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page,
3835 						  index));
3836 #endif /* UNIV_ZIP_DEBUG */
3837 
3838 	if (dict_index_is_spatial(index)) {
3839 		rtr_check_discard_page(index, NULL, block);
3840 	}
3841 
3842 	/* Free the file page */
3843 	btr_page_free(index, block, mtr);
3844 
3845 	/* btr_check_node_ptr() needs parent block latched.
3846 	If the merge_block's parent block is not same,
3847 	we cannot use btr_check_node_ptr() */
3848 	ut_ad(leftmost_child
3849 	      || btr_check_node_ptr(index, merge_block, mtr));
3850 func_exit:
3851 	mem_heap_free(heap);
3852 
3853 	if (adjust) {
3854 		ut_ad(nth_rec > 0);
3855 		btr_cur_position(
3856 			index,
3857 			page_rec_get_nth(merge_block->frame, nth_rec),
3858 			merge_block, cursor);
3859 	}
3860 
3861 	MONITOR_INC(MONITOR_INDEX_MERGE_SUCCESSFUL);
3862 
3863 	DBUG_RETURN(TRUE);
3864 
3865 err_exit:
3866 	/* We play it safe and reset the free bits. */
3867 	if (merge_block && merge_block->zip_size()
3868 	    && page_is_leaf(merge_block->frame)
3869 	    && !dict_index_is_clust(index)) {
3870 
3871 		ibuf_reset_free_bits(merge_block);
3872 	}
3873 
3874 	mem_heap_free(heap);
3875 	DBUG_RETURN(FALSE);
3876 }
3877 
3878 /*************************************************************//**
3879 Discards a page that is the only page on its level.  This will empty
3880 the whole B-tree, leaving just an empty root page.  This function
3881 should almost never be reached, because btr_compress(), which is invoked in
3882 delete operations, calls btr_lift_page_up() to flatten the B-tree. */
3883 ATTRIBUTE_COLD
3884 static
3885 void
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3886 btr_discard_only_page_on_level(
3887 /*===========================*/
3888 	dict_index_t*	index,	/*!< in: index tree */
3889 	buf_block_t*	block,	/*!< in: page which is the only on its level */
3890 	mtr_t*		mtr)	/*!< in: mtr */
3891 {
3892 	ulint		page_level = 0;
3893 
3894 	ut_ad(!index->is_dummy);
3895 
3896 	/* Save the PAGE_MAX_TRX_ID from the leaf page. */
3897 	const trx_id_t max_trx_id = page_get_max_trx_id(block->frame);
3898 	const rec_t* r = page_rec_get_next(page_get_infimum_rec(block->frame));
3899 	ut_ad(rec_is_metadata(r, *index) == index->is_instant());
3900 
3901 	while (block->page.id().page_no() != dict_index_get_page(index)) {
3902 		btr_cur_t	cursor;
3903 		buf_block_t*	father;
3904 		const page_t*	page	= buf_block_get_frame(block);
3905 
3906 		ut_a(page_get_n_recs(page) == 1);
3907 		ut_a(page_level == btr_page_get_level(page));
3908 		ut_a(!page_has_siblings(page));
3909 		ut_ad(fil_page_index_page_check(page));
3910 		ut_ad(block->page.id().space() == index->table->space->id);
3911 		ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
3912 		btr_search_drop_page_hash_index(block);
3913 
3914 		if (dict_index_is_spatial(index)) {
3915 			/* Check any concurrent search having this page */
3916 			rtr_check_discard_page(index, NULL, block);
3917 			rtr_page_get_father(index, block, mtr, NULL, &cursor);
3918 		} else {
3919 			btr_page_get_father(index, block, mtr, &cursor);
3920 		}
3921 		father = btr_cur_get_block(&cursor);
3922 
3923 		if (!dict_table_is_locking_disabled(index->table)) {
3924 			lock_update_discard(
3925 				father, PAGE_HEAP_NO_SUPREMUM, block);
3926 		}
3927 
3928 		/* Free the file page */
3929 		btr_page_free(index, block, mtr);
3930 
3931 		block = father;
3932 		page_level++;
3933 	}
3934 
3935 	/* block is the root page, which must be empty, except
3936 	for the node pointer to the (now discarded) block(s). */
3937 	ut_ad(!page_has_siblings(block->frame));
3938 
3939 #ifdef UNIV_BTR_DEBUG
3940 	if (!dict_index_is_ibuf(index)) {
3941 		const page_t*	root	= buf_block_get_frame(block);
3942 		const ulint	space	= index->table->space_id;
3943 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
3944 					    + root, space));
3945 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
3946 					    + root, space));
3947 	}
3948 #endif /* UNIV_BTR_DEBUG */
3949 
3950 	mem_heap_t* heap = nullptr;
3951 	const rec_t* rec = nullptr;
3952 	rec_offs* offsets = nullptr;
3953 	if (index->table->instant || index->must_avoid_clear_instant_add()) {
3954 		if (!rec_is_metadata(r, *index)) {
3955 		} else if (!index->table->instant
3956 			   || rec_is_alter_metadata(r, *index)) {
3957 			heap = mem_heap_create(srv_page_size);
3958 			offsets = rec_get_offsets(r, index, nullptr,
3959 						  index->n_core_fields,
3960 						  ULINT_UNDEFINED, &heap);
3961 			rec = rec_copy(mem_heap_alloc(heap,
3962 						      rec_offs_size(offsets)),
3963 				       r, offsets);
3964 			rec_offs_make_valid(rec, index, true, offsets);
3965 		}
3966 	}
3967 
3968 	btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
3969 	ut_ad(page_is_leaf(buf_block_get_frame(block)));
3970 	/* btr_page_empty() is supposed to zero-initialize the field. */
3971 	ut_ad(!page_get_instant(block->frame));
3972 
3973 	if (index->is_primary()) {
3974 		if (rec) {
3975 			page_cur_t cur;
3976 			page_cur_set_before_first(block, &cur);
3977 			DBUG_ASSERT(index->table->instant);
3978 			DBUG_ASSERT(rec_is_alter_metadata(rec, *index));
3979 			btr_set_instant(block, *index, mtr);
3980 			rec = page_cur_insert_rec_low(&cur, index, rec,
3981 						      offsets, mtr);
3982 			ut_ad(rec);
3983 			mem_heap_free(heap);
3984 		} else if (index->is_instant()) {
3985 			index->clear_instant_add();
3986 		}
3987 	} else if (!index->table->is_temporary()) {
3988 		/* We play it safe and reset the free bits for the root */
3989 		ibuf_reset_free_bits(block);
3990 
3991 		ut_a(max_trx_id);
3992 		page_set_max_trx_id(block,
3993 				    buf_block_get_page_zip(block),
3994 				    max_trx_id, mtr);
3995 	}
3996 }
3997 
3998 /*************************************************************//**
3999 Discards a page from a B-tree. This is used to remove the last record from
4000 a B-tree page: the whole page must be removed at the same time. This cannot
4001 be used for the root page, which is allowed to be empty. */
4002 void
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)4003 btr_discard_page(
4004 /*=============*/
4005 	btr_cur_t*	cursor,	/*!< in: cursor on the page to discard: not on
4006 				the root page */
4007 	mtr_t*		mtr)	/*!< in: mtr */
4008 {
4009 	dict_index_t*	index;
4010 	buf_block_t*	merge_block;
4011 	buf_block_t*	block;
4012 	btr_cur_t	parent_cursor;
4013 
4014 	block = btr_cur_get_block(cursor);
4015 	index = btr_cur_get_index(cursor);
4016 
4017 	ut_ad(dict_index_get_page(index) != block->page.id().page_no());
4018 
4019 	ut_ad(mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
4020 					 | MTR_MEMO_SX_LOCK));
4021 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
4022 
4023 	MONITOR_INC(MONITOR_INDEX_DISCARD);
4024 
4025 	if (dict_index_is_spatial(index)) {
4026 		rtr_page_get_father(index, block, mtr, cursor, &parent_cursor);
4027 	} else {
4028 		btr_page_get_father(index, block, mtr, &parent_cursor);
4029 	}
4030 
4031 	/* Decide the page which will inherit the locks */
4032 
4033 	const uint32_t left_page_no = btr_page_get_prev(block->frame);
4034 	const uint32_t right_page_no = btr_page_get_next(block->frame);
4035 
4036 	ut_d(bool parent_is_different = false);
4037 	if (left_page_no != FIL_NULL) {
4038 		merge_block = btr_block_get(*index, left_page_no, RW_X_LATCH,
4039 					    true, mtr);
4040 #ifdef UNIV_BTR_DEBUG
4041 		ut_a(btr_page_get_next(merge_block->frame)
4042 		     == block->page.id().page_no());
4043 #endif /* UNIV_BTR_DEBUG */
4044 		ut_d(parent_is_different =
4045 			(page_rec_get_next(
4046 				page_get_infimum_rec(
4047 					btr_cur_get_page(
4048 						&parent_cursor)))
4049 			 == btr_cur_get_rec(&parent_cursor)));
4050 	} else if (right_page_no != FIL_NULL) {
4051 		merge_block = btr_block_get(*index, right_page_no, RW_X_LATCH,
4052 					    true, mtr);
4053 #ifdef UNIV_BTR_DEBUG
4054 		ut_a(btr_page_get_prev(merge_block->frame)
4055 		     == block->page.id().page_no());
4056 #endif /* UNIV_BTR_DEBUG */
4057 		ut_d(parent_is_different = page_rec_is_supremum(
4058 			page_rec_get_next(btr_cur_get_rec(&parent_cursor))));
4059 		if (!page_is_leaf(merge_block->frame)) {
4060 			rec_t* node_ptr = page_rec_get_next(
4061 				page_get_infimum_rec(merge_block->frame));
4062 			ut_ad(page_rec_is_user_rec(node_ptr));
4063 			/* We have to mark the leftmost node pointer as the
4064 			predefined minimum record. */
4065 			btr_set_min_rec_mark<true>(node_ptr, *merge_block,
4066 						   mtr);
4067 		}
4068 	} else {
4069 		btr_discard_only_page_on_level(index, block, mtr);
4070 
4071 		return;
4072 	}
4073 
4074 	ut_a(page_is_comp(merge_block->frame) == page_is_comp(block->frame));
4075 	ut_ad(!memcmp_aligned<2>(&merge_block->frame[PAGE_HEADER + PAGE_LEVEL],
4076 				 &block->frame[PAGE_HEADER + PAGE_LEVEL], 2));
4077 	btr_search_drop_page_hash_index(block);
4078 
4079 	if (dict_index_is_spatial(index)) {
4080 		rtr_node_ptr_delete(&parent_cursor, mtr);
4081 	} else {
4082 		btr_cur_node_ptr_delete(&parent_cursor, mtr);
4083 	}
4084 
4085 	/* Remove the page from the level list */
4086 	ut_a(DB_SUCCESS == btr_level_list_remove(*block, *index, mtr));
4087 
4088 #ifdef UNIV_ZIP_DEBUG
4089 	{
4090 		page_zip_des_t*	merge_page_zip
4091 			= buf_block_get_page_zip(merge_block);
4092 		ut_a(!merge_page_zip
4093 		     || page_zip_validate(merge_page_zip, merge_block->frame,
4094 					  index));
4095 	}
4096 #endif /* UNIV_ZIP_DEBUG */
4097 
4098 	if (!dict_table_is_locking_disabled(index->table)) {
4099 		if (left_page_no != FIL_NULL) {
4100 			lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
4101 					    block);
4102 		} else {
4103 			lock_update_discard(merge_block,
4104 					    lock_get_min_heap_no(merge_block),
4105 					    block);
4106 		}
4107 	}
4108 
4109 	if (dict_index_is_spatial(index)) {
4110 		rtr_check_discard_page(index, cursor, block);
4111 	}
4112 
4113 	/* Free the file page */
4114 	btr_page_free(index, block, mtr);
4115 
4116 	/* btr_check_node_ptr() needs parent block latched.
4117 	If the merge_block's parent block is not same,
4118 	we cannot use btr_check_node_ptr() */
4119 	ut_ad(parent_is_different
4120 	      || btr_check_node_ptr(index, merge_block, mtr));
4121 
4122 	if (btr_cur_get_block(&parent_cursor)->page.id().page_no()
4123 	    == index->page
4124 	    && !page_has_siblings(btr_cur_get_page(&parent_cursor))
4125 	    && page_get_n_recs(btr_cur_get_page(&parent_cursor)) == 1) {
4126 		btr_lift_page_up(index, merge_block, mtr);
4127 	}
4128 }
4129 
4130 #ifdef UNIV_BTR_PRINT
4131 /*************************************************************//**
4132 Prints size info of a B-tree. */
4133 void
btr_print_size(dict_index_t * index)4134 btr_print_size(
4135 /*===========*/
4136 	dict_index_t*	index)	/*!< in: index tree */
4137 {
4138 	page_t*		root;
4139 	fseg_header_t*	seg;
4140 	mtr_t		mtr;
4141 
4142 	if (dict_index_is_ibuf(index)) {
4143 		fputs("Sorry, cannot print info of an ibuf tree:"
4144 		      " use ibuf functions\n", stderr);
4145 
4146 		return;
4147 	}
4148 
4149 	mtr_start(&mtr);
4150 
4151 	root = btr_root_get(index, &mtr);
4152 
4153 	seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4154 
4155 	fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
4156 	fseg_print(seg, &mtr);
4157 
4158 	if (!dict_index_is_ibuf(index)) {
4159 
4160 		seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
4161 
4162 		fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
4163 		fseg_print(seg, &mtr);
4164 	}
4165 
4166 	mtr_commit(&mtr);
4167 }
4168 
4169 /************************************************************//**
4170 Prints recursively index tree pages. */
4171 static
4172 void
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,rec_offs ** offsets,mtr_t * mtr)4173 btr_print_recursive(
4174 /*================*/
4175 	dict_index_t*	index,	/*!< in: index tree */
4176 	buf_block_t*	block,	/*!< in: index page */
4177 	ulint		width,	/*!< in: print this many entries from start
4178 				and end */
4179 	mem_heap_t**	heap,	/*!< in/out: heap for rec_get_offsets() */
4180 	rec_offs**	offsets,/*!< in/out: buffer for rec_get_offsets() */
4181 	mtr_t*		mtr)	/*!< in: mtr */
4182 {
4183 	const page_t*	page	= buf_block_get_frame(block);
4184 	page_cur_t	cursor;
4185 	ulint		n_recs;
4186 	ulint		i	= 0;
4187 	mtr_t		mtr2;
4188 
4189 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_SX_FIX));
4190 
4191 	ib::info() << "NODE ON LEVEL " << btr_page_get_level(page)
4192 		<< " page " << block->page.id;
4193 
4194 	page_print(block, index, width, width);
4195 
4196 	n_recs = page_get_n_recs(page);
4197 
4198 	page_cur_set_before_first(block, &cursor);
4199 	page_cur_move_to_next(&cursor);
4200 
4201 	while (!page_cur_is_after_last(&cursor)) {
4202 
4203 		if (page_is_leaf(page)) {
4204 
4205 			/* If this is the leaf level, do nothing */
4206 
4207 		} else if ((i <= width) || (i >= n_recs - width)) {
4208 
4209 			const rec_t*	node_ptr;
4210 
4211 			mtr_start(&mtr2);
4212 
4213 			node_ptr = page_cur_get_rec(&cursor);
4214 
4215 			*offsets = rec_get_offsets(
4216 				node_ptr, index, *offsets, 0,
4217 				ULINT_UNDEFINED, heap);
4218 			btr_print_recursive(index,
4219 					    btr_node_ptr_get_child(node_ptr,
4220 								   index,
4221 								   *offsets,
4222 								   &mtr2),
4223 					    width, heap, offsets, &mtr2);
4224 			mtr_commit(&mtr2);
4225 		}
4226 
4227 		page_cur_move_to_next(&cursor);
4228 		i++;
4229 	}
4230 }
4231 
4232 /**************************************************************//**
4233 Prints directories and other info of all nodes in the tree. */
4234 void
btr_print_index(dict_index_t * index,ulint width)4235 btr_print_index(
4236 /*============*/
4237 	dict_index_t*	index,	/*!< in: index */
4238 	ulint		width)	/*!< in: print this many entries from start
4239 				and end */
4240 {
4241 	mtr_t		mtr;
4242 	buf_block_t*	root;
4243 	mem_heap_t*	heap	= NULL;
4244 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
4245 	rec_offs*	offsets	= offsets_;
4246 	rec_offs_init(offsets_);
4247 
4248 	fputs("--------------------------\n"
4249 	      "INDEX TREE PRINT\n", stderr);
4250 
4251 	mtr_start(&mtr);
4252 
4253 	root = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4254 
4255 	btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
4256 	if (heap) {
4257 		mem_heap_free(heap);
4258 	}
4259 
4260 	mtr_commit(&mtr);
4261 
4262 	ut_ad(btr_validate_index(index, 0));
4263 }
4264 #endif /* UNIV_BTR_PRINT */
4265 
4266 #ifdef UNIV_DEBUG
4267 /************************************************************//**
4268 Checks that the node pointer to a page is appropriate.
4269 @return TRUE */
4270 ibool
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4271 btr_check_node_ptr(
4272 /*===============*/
4273 	dict_index_t*	index,	/*!< in: index tree */
4274 	buf_block_t*	block,	/*!< in: index page */
4275 	mtr_t*		mtr)	/*!< in: mtr */
4276 {
4277 	mem_heap_t*	heap;
4278 	dtuple_t*	tuple;
4279 	rec_offs*	offsets;
4280 	btr_cur_t	cursor;
4281 	page_t*		page = buf_block_get_frame(block);
4282 
4283 	ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
4284 
4285 	if (dict_index_get_page(index) == block->page.id().page_no()) {
4286 
4287 		return(TRUE);
4288 	}
4289 
4290 	heap = mem_heap_create(256);
4291 
4292 	if (dict_index_is_spatial(index)) {
4293 		offsets = rtr_page_get_father_block(NULL, heap, index, block, mtr,
4294 						    NULL, &cursor);
4295 	} else {
4296 		offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
4297 						    &cursor);
4298 	}
4299 
4300 	if (page_is_leaf(page)) {
4301 
4302 		goto func_exit;
4303 	}
4304 
4305 	tuple = dict_index_build_node_ptr(
4306 		index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
4307 		btr_page_get_level(page));
4308 
4309 	/* For spatial index, the MBR in the parent rec could be different
4310 	with that of first rec of child, their relationship should be
4311 	"WITHIN" relationship */
4312 	if (dict_index_is_spatial(index)) {
4313 		ut_a(!cmp_dtuple_rec_with_gis(
4314 			tuple, btr_cur_get_rec(&cursor),
4315 			PAGE_CUR_WITHIN));
4316 	} else {
4317 		ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
4318 	}
4319 func_exit:
4320 	mem_heap_free(heap);
4321 
4322 	return(TRUE);
4323 }
4324 #endif /* UNIV_DEBUG */
4325 
4326 /************************************************************//**
4327 Display identification information for a record. */
4328 static
4329 void
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)4330 btr_index_rec_validate_report(
4331 /*==========================*/
4332 	const page_t*		page,	/*!< in: index page */
4333 	const rec_t*		rec,	/*!< in: index record */
4334 	const dict_index_t*	index)	/*!< in: index */
4335 {
4336 	ib::info() << "Record in index " << index->name
4337 		<< " of table " << index->table->name
4338 		<< ", page " << page_id_t(page_get_space_id(page),
4339 					  page_get_page_no(page))
4340 		<< ", at offset " << page_offset(rec);
4341 }
4342 
4343 /************************************************************//**
4344 Checks the size and number of fields in a record based on the definition of
4345 the index.
4346 @return TRUE if ok */
4347 ibool
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)4348 btr_index_rec_validate(
4349 /*===================*/
4350 	const rec_t*		rec,		/*!< in: index record */
4351 	const dict_index_t*	index,		/*!< in: index */
4352 	ibool			dump_on_error)	/*!< in: TRUE if the function
4353 						should print hex dump of record
4354 						and page on error */
4355 {
4356 	ulint		len;
4357 	const page_t*	page;
4358 	mem_heap_t*	heap	= NULL;
4359 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
4360 	rec_offs*	offsets	= offsets_;
4361 	rec_offs_init(offsets_);
4362 
4363 	page = page_align(rec);
4364 
4365 	ut_ad(index->n_core_fields);
4366 
4367 	if (index->is_ibuf()) {
4368 		/* The insert buffer index tree can contain records from any
4369 		other index: we cannot check the number of fields or
4370 		their length */
4371 
4372 		return(TRUE);
4373 	}
4374 
4375 #ifdef VIRTUAL_INDEX_DEBUG
4376 	if (dict_index_has_virtual(index)) {
4377 		fprintf(stderr, "index name is %s\n", index->name());
4378 	}
4379 #endif
4380 	if ((ibool)!!page_is_comp(page) != dict_table_is_comp(index->table)) {
4381 		btr_index_rec_validate_report(page, rec, index);
4382 
4383 		ib::error() << "Compact flag=" << !!page_is_comp(page)
4384 			<< ", should be " << dict_table_is_comp(index->table);
4385 
4386 		return(FALSE);
4387 	}
4388 
4389 	const bool is_alter_metadata = page_is_leaf(page)
4390 		&& !page_has_prev(page)
4391 		&& index->is_primary() && index->table->instant
4392 		&& rec == page_rec_get_next_const(page_get_infimum_rec(page));
4393 
4394 	if (is_alter_metadata
4395 	    && !rec_is_alter_metadata(rec, page_is_comp(page))) {
4396 		btr_index_rec_validate_report(page, rec, index);
4397 
4398 		ib::error() << "First record is not ALTER TABLE metadata";
4399 		return FALSE;
4400 	}
4401 
4402 	if (!page_is_comp(page)) {
4403 		const ulint n_rec_fields = rec_get_n_fields_old(rec);
4404 		if (n_rec_fields == DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD
4405 		    && index->id == DICT_INDEXES_ID) {
4406 			/* A record for older SYS_INDEXES table
4407 			(missing merge_threshold column) is acceptable. */
4408 		} else if (is_alter_metadata) {
4409 			if (n_rec_fields != ulint(index->n_fields) + 1) {
4410 				goto n_field_mismatch;
4411 			}
4412 		} else if (n_rec_fields < index->n_core_fields
4413 			   || n_rec_fields > index->n_fields) {
4414 n_field_mismatch:
4415 			btr_index_rec_validate_report(page, rec, index);
4416 
4417 			ib::error() << "Has " << rec_get_n_fields_old(rec)
4418 				    << " fields, should have "
4419 				    << index->n_core_fields << ".."
4420 				    << index->n_fields;
4421 
4422 			if (dump_on_error) {
4423 				fputs("InnoDB: corrupt record ", stderr);
4424 				rec_print_old(stderr, rec);
4425 				putc('\n', stderr);
4426 			}
4427 			return(FALSE);
4428 		}
4429 	}
4430 
4431 	offsets = rec_get_offsets(rec, index, offsets, page_is_leaf(page)
4432 				  ? index->n_core_fields : 0,
4433 				  ULINT_UNDEFINED, &heap);
4434 	const dict_field_t* field = index->fields;
4435 	ut_ad(rec_offs_n_fields(offsets)
4436 	      == ulint(index->n_fields) + is_alter_metadata);
4437 
4438 	for (unsigned i = 0; i < rec_offs_n_fields(offsets); i++) {
4439 		rec_get_nth_field_offs(offsets, i, &len);
4440 
4441 		ulint fixed_size;
4442 
4443 		if (is_alter_metadata && i == index->first_user_field()) {
4444 			fixed_size = FIELD_REF_SIZE;
4445 			if (len != FIELD_REF_SIZE
4446 			    || !rec_offs_nth_extern(offsets, i)) {
4447 				goto len_mismatch;
4448 			}
4449 
4450 			continue;
4451 		} else {
4452 			fixed_size = dict_col_get_fixed_size(
4453 				field->col, page_is_comp(page));
4454 			if (rec_offs_nth_extern(offsets, i)) {
4455 				const byte* data = rec_get_nth_field(
4456 					rec, offsets, i, &len);
4457 				len -= BTR_EXTERN_FIELD_REF_SIZE;
4458 				ulint extern_len = mach_read_from_4(
4459 					data + len + BTR_EXTERN_LEN + 4);
4460 				if (fixed_size == extern_len) {
4461 					goto next_field;
4462 				}
4463 			}
4464 		}
4465 
4466 		/* Note that if fixed_size != 0, it equals the
4467 		length of a fixed-size column in the clustered index.
4468 		We should adjust it here.
4469 		A prefix index of the column is of fixed, but different
4470 		length.  When fixed_size == 0, prefix_len is the maximum
4471 		length of the prefix index column. */
4472 
4473 		if (len_is_stored(len)
4474 		    && (field->prefix_len
4475 			? len > field->prefix_len
4476 			: (fixed_size && len != fixed_size))) {
4477 len_mismatch:
4478 			btr_index_rec_validate_report(page, rec, index);
4479 			ib::error	error;
4480 
4481 			error << "Field " << i << " len is " << len
4482 				<< ", should be " << fixed_size;
4483 
4484 			if (dump_on_error) {
4485 				error << "; ";
4486 				rec_print(error.m_oss, rec,
4487 					  rec_get_info_bits(
4488 						  rec, rec_offs_comp(offsets)),
4489 					  offsets);
4490 			}
4491 			if (heap) {
4492 				mem_heap_free(heap);
4493 			}
4494 			return(FALSE);
4495 		}
4496 next_field:
4497 		field++;
4498 	}
4499 
4500 #ifdef VIRTUAL_INDEX_DEBUG
4501 	if (dict_index_has_virtual(index)) {
4502 		rec_print_new(stderr, rec, offsets);
4503 	}
4504 #endif
4505 
4506 	if (heap) {
4507 		mem_heap_free(heap);
4508 	}
4509 	return(TRUE);
4510 }
4511 
4512 /************************************************************//**
4513 Checks the size and number of fields in records based on the definition of
4514 the index.
4515 @return TRUE if ok */
4516 static
4517 ibool
btr_index_page_validate(buf_block_t * block,dict_index_t * index)4518 btr_index_page_validate(
4519 /*====================*/
4520 	buf_block_t*	block,	/*!< in: index page */
4521 	dict_index_t*	index)	/*!< in: index */
4522 {
4523 	page_cur_t	cur;
4524 	ibool		ret	= TRUE;
4525 #ifndef DBUG_OFF
4526 	ulint		nth	= 1;
4527 #endif /* !DBUG_OFF */
4528 
4529 	page_cur_set_before_first(block, &cur);
4530 
4531 	/* Directory slot 0 should only contain the infimum record. */
4532 	DBUG_EXECUTE_IF("check_table_rec_next",
4533 			ut_a(page_rec_get_nth_const(
4534 				     page_cur_get_page(&cur), 0)
4535 			     == cur.rec);
4536 			ut_a(page_dir_slot_get_n_owned(
4537 				     page_dir_get_nth_slot(
4538 					     page_cur_get_page(&cur), 0))
4539 			     == 1););
4540 
4541 	page_cur_move_to_next(&cur);
4542 
4543 	for (;;) {
4544 		if (page_cur_is_after_last(&cur)) {
4545 
4546 			break;
4547 		}
4548 
4549 		if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4550 
4551 			return(FALSE);
4552 		}
4553 
4554 		/* Verify that page_rec_get_nth_const() is correctly
4555 		retrieving each record. */
4556 		DBUG_EXECUTE_IF("check_table_rec_next",
4557 				ut_a(cur.rec == page_rec_get_nth_const(
4558 					     page_cur_get_page(&cur),
4559 					     page_rec_get_n_recs_before(
4560 						     cur.rec)));
4561 				ut_a(nth++ == page_rec_get_n_recs_before(
4562 					     cur.rec)););
4563 
4564 		page_cur_move_to_next(&cur);
4565 	}
4566 
4567 	return(ret);
4568 }
4569 
4570 /************************************************************//**
4571 Report an error on one page of an index tree. */
4572 static
4573 void
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4574 btr_validate_report1(
4575 /*=================*/
4576 	dict_index_t*		index,	/*!< in: index */
4577 	ulint			level,	/*!< in: B-tree level */
4578 	const buf_block_t*	block)	/*!< in: index page */
4579 {
4580 	ib::error	error;
4581 	error << "In page " << block->page.id().page_no()
4582 		<< " of index " << index->name
4583 		<< " of table " << index->table->name;
4584 
4585 	if (level > 0) {
4586 		error << ", index tree level " << level;
4587 	}
4588 }
4589 
4590 /************************************************************//**
4591 Report an error on two pages of an index tree. */
4592 static
4593 void
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4594 btr_validate_report2(
4595 /*=================*/
4596 	const dict_index_t*	index,	/*!< in: index */
4597 	ulint			level,	/*!< in: B-tree level */
4598 	const buf_block_t*	block1,	/*!< in: first index page */
4599 	const buf_block_t*	block2)	/*!< in: second index page */
4600 {
4601   ib::error error;
4602   error << "In pages " << block1->page.id()
4603 	<< " and " << block2->page.id() << " of index " << index->name
4604 	<< " of table " << index->table->name;
4605 
4606   if (level)
4607     error << ", index tree level " << level;
4608 }
4609 
4610 /************************************************************//**
4611 Validates index tree level.
4612 @return TRUE if ok */
4613 static
4614 bool
btr_validate_level(dict_index_t * index,const trx_t * trx,ulint level,bool lockout)4615 btr_validate_level(
4616 /*===============*/
4617 	dict_index_t*	index,	/*!< in: index tree */
4618 	const trx_t*	trx,	/*!< in: transaction or NULL */
4619 	ulint		level,	/*!< in: level number */
4620 	bool		lockout)/*!< in: true if X-latch index is intended */
4621 {
4622 	buf_block_t*	block;
4623 	page_t*		page;
4624 	buf_block_t*	right_block = 0; /* remove warning */
4625 	page_t*		right_page = 0; /* remove warning */
4626 	page_t*		father_page;
4627 	btr_cur_t	node_cur;
4628 	btr_cur_t	right_node_cur;
4629 	rec_t*		rec;
4630 	page_cur_t	cursor;
4631 	dtuple_t*	node_ptr_tuple;
4632 	bool		ret	= true;
4633 	mtr_t		mtr;
4634 	mem_heap_t*	heap	= mem_heap_create(256);
4635 	rec_offs*	offsets	= NULL;
4636 	rec_offs*	offsets2= NULL;
4637 #ifdef UNIV_ZIP_DEBUG
4638 	page_zip_des_t*	page_zip;
4639 #endif /* UNIV_ZIP_DEBUG */
4640 	ulint		savepoint = 0;
4641 	ulint		savepoint2 = 0;
4642 	uint32_t	parent_page_no = FIL_NULL;
4643 	uint32_t	parent_right_page_no = FIL_NULL;
4644 	bool		rightmost_child = false;
4645 
4646 	mtr.start();
4647 
4648 	if (!srv_read_only_mode) {
4649 		if (lockout) {
4650 			mtr_x_lock_index(index, &mtr);
4651 		} else {
4652 			mtr_sx_lock_index(index, &mtr);
4653 		}
4654 	}
4655 
4656 	block = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4657 	page = buf_block_get_frame(block);
4658 
4659 	fil_space_t*		space	= index->table->space;
4660 
4661 	while (level != btr_page_get_level(page)) {
4662 		const rec_t*	node_ptr;
4663 
4664 		if (fseg_page_is_free(space, block->page.id().page_no())) {
4665 
4666 			btr_validate_report1(index, level, block);
4667 
4668 			ib::warn() << "Page is free";
4669 
4670 			ret = false;
4671 		}
4672 
4673 		ut_a(index->table->space_id == block->page.id().space());
4674 		ut_a(block->page.id().space() == page_get_space_id(page));
4675 #ifdef UNIV_ZIP_DEBUG
4676 		page_zip = buf_block_get_page_zip(block);
4677 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4678 #endif /* UNIV_ZIP_DEBUG */
4679 		ut_a(!page_is_leaf(page));
4680 
4681 		page_cur_set_before_first(block, &cursor);
4682 		page_cur_move_to_next(&cursor);
4683 
4684 		node_ptr = page_cur_get_rec(&cursor);
4685 		offsets = rec_get_offsets(node_ptr, index, offsets, 0,
4686 					  ULINT_UNDEFINED, &heap);
4687 
4688 		savepoint2 = mtr_set_savepoint(&mtr);
4689 		block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4690 		page = buf_block_get_frame(block);
4691 
4692 		/* For R-Tree, since record order might not be the same as
4693 		linked index page in the lower level, we need to travers
4694 		backwards to get the first page rec in this level.
4695 		This is only used for index validation. Spatial index
4696 		does not use such scan for any of its DML or query
4697 		operations  */
4698 		if (dict_index_is_spatial(index)) {
4699 			uint32_t left_page_no = btr_page_get_prev(page);
4700 
4701 			while (left_page_no != FIL_NULL) {
4702 				/* To obey latch order of tree blocks,
4703 				we should release the right_block once to
4704 				obtain lock of the uncle block. */
4705 				mtr_release_block_at_savepoint(
4706 					&mtr, savepoint2, block);
4707 
4708 				savepoint2 = mtr_set_savepoint(&mtr);
4709 				block = btr_block_get(*index, left_page_no,
4710 						      RW_SX_LATCH, false,
4711 						      &mtr);
4712 				page = buf_block_get_frame(block);
4713 				left_page_no = btr_page_get_prev(page);
4714 			}
4715 		}
4716 	}
4717 
4718 	/* Now we are on the desired level. Loop through the pages on that
4719 	level. */
4720 
4721 loop:
4722 	mem_heap_empty(heap);
4723 	offsets = offsets2 = NULL;
4724 	if (!srv_read_only_mode) {
4725 		if (lockout) {
4726 			mtr_x_lock_index(index, &mtr);
4727 		} else {
4728 			mtr_sx_lock_index(index, &mtr);
4729 		}
4730 	}
4731 
4732 #ifdef UNIV_ZIP_DEBUG
4733 	page_zip = buf_block_get_page_zip(block);
4734 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4735 #endif /* UNIV_ZIP_DEBUG */
4736 
4737 	ut_a(block->page.id().space() == index->table->space_id);
4738 
4739 	if (fseg_page_is_free(space, block->page.id().page_no())) {
4740 
4741 		btr_validate_report1(index, level, block);
4742 
4743 		ib::warn() << "Page is marked as free";
4744 		ret = false;
4745 
4746 	} else if (btr_page_get_index_id(page) != index->id) {
4747 
4748 		ib::error() << "Page index id " << btr_page_get_index_id(page)
4749 			<< " != data dictionary index id " << index->id;
4750 
4751 		ret = false;
4752 
4753 	} else if (!page_validate(page, index)) {
4754 
4755 		btr_validate_report1(index, level, block);
4756 		ret = false;
4757 
4758 	} else if (level == 0 && !btr_index_page_validate(block, index)) {
4759 
4760 		/* We are on level 0. Check that the records have the right
4761 		number of fields, and field lengths are right. */
4762 
4763 		ret = false;
4764 	}
4765 
4766 	ut_a(btr_page_get_level(page) == level);
4767 
4768 	uint32_t right_page_no = btr_page_get_next(page);
4769 	uint32_t left_page_no = btr_page_get_prev(page);
4770 
4771 	ut_a(!page_is_empty(page)
4772 	     || (level == 0
4773 		 && page_get_page_no(page) == dict_index_get_page(index)));
4774 
4775 	if (right_page_no != FIL_NULL) {
4776 		const rec_t*	right_rec;
4777 		savepoint = mtr_set_savepoint(&mtr);
4778 
4779 		right_block = btr_block_get(*index, right_page_no, RW_SX_LATCH,
4780 					    !level, &mtr);
4781 		right_page = buf_block_get_frame(right_block);
4782 
4783 		if (btr_page_get_prev(right_page) != page_get_page_no(page)) {
4784 			btr_validate_report2(index, level, block, right_block);
4785 			fputs("InnoDB: broken FIL_PAGE_NEXT"
4786 			      " or FIL_PAGE_PREV links\n", stderr);
4787 
4788 			ret = false;
4789 		}
4790 
4791 		if (page_is_comp(right_page) != page_is_comp(page)) {
4792 			btr_validate_report2(index, level, block, right_block);
4793 			fputs("InnoDB: 'compact' flag mismatch\n", stderr);
4794 
4795 			ret = false;
4796 
4797 			goto node_ptr_fails;
4798 		}
4799 
4800 		rec = page_rec_get_prev(page_get_supremum_rec(page));
4801 		right_rec = page_rec_get_next(page_get_infimum_rec(
4802 						      right_page));
4803 		offsets = rec_get_offsets(rec, index, offsets,
4804 					  page_is_leaf(page)
4805 					  ? index->n_core_fields : 0,
4806 					  ULINT_UNDEFINED, &heap);
4807 		offsets2 = rec_get_offsets(right_rec, index, offsets2,
4808 					   page_is_leaf(right_page)
4809 					   ? index->n_core_fields : 0,
4810 					   ULINT_UNDEFINED, &heap);
4811 
4812 		/* For spatial index, we cannot guarantee the key ordering
4813 		across pages, so skip the record compare verification for
4814 		now. Will enhanced in special R-Tree index validation scheme */
4815 		if (!dict_index_is_spatial(index)
4816 		    && cmp_rec_rec(rec, right_rec,
4817 				   offsets, offsets2, index) >= 0) {
4818 
4819 			btr_validate_report2(index, level, block, right_block);
4820 
4821 			fputs("InnoDB: records in wrong order"
4822 			      " on adjacent pages\n", stderr);
4823 
4824 			fputs("InnoDB: record ", stderr);
4825 			rec = page_rec_get_prev(page_get_supremum_rec(page));
4826 			rec_print(stderr, rec, index);
4827 			putc('\n', stderr);
4828 			fputs("InnoDB: record ", stderr);
4829 			rec = page_rec_get_next(
4830 				page_get_infimum_rec(right_page));
4831 			rec_print(stderr, rec, index);
4832 			putc('\n', stderr);
4833 
4834 			ret = false;
4835 		}
4836 	}
4837 
4838 	if (level > 0 && left_page_no == FIL_NULL) {
4839 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
4840 			     page_rec_get_next(page_get_infimum_rec(page)),
4841 			     page_is_comp(page)));
4842 	}
4843 
4844 	/* Similarly skip the father node check for spatial index for now,
4845 	for a couple of reasons:
4846 	1) As mentioned, there is no ordering relationship between records
4847 	in parent level and linked pages in the child level.
4848 	2) Search parent from root is very costly for R-tree.
4849 	We will add special validation mechanism for R-tree later (WL #7520) */
4850 	if (!dict_index_is_spatial(index)
4851 	    && block->page.id().page_no() != dict_index_get_page(index)) {
4852 
4853 		/* Check father node pointers */
4854 		rec_t*	node_ptr;
4855 
4856 		btr_cur_position(
4857 			index, page_rec_get_next(page_get_infimum_rec(page)),
4858 			block, &node_cur);
4859 		offsets = btr_page_get_father_node_ptr_for_validate(
4860 			offsets, heap, &node_cur, &mtr);
4861 
4862 		father_page = btr_cur_get_page(&node_cur);
4863 		node_ptr = btr_cur_get_rec(&node_cur);
4864 
4865 		parent_page_no = page_get_page_no(father_page);
4866 		parent_right_page_no = btr_page_get_next(father_page);
4867 		rightmost_child = page_rec_is_supremum(
4868 					page_rec_get_next(node_ptr));
4869 
4870 		btr_cur_position(
4871 			index,
4872 			page_rec_get_prev(page_get_supremum_rec(page)),
4873 			block, &node_cur);
4874 
4875 		offsets = btr_page_get_father_node_ptr_for_validate(
4876 				offsets, heap, &node_cur, &mtr);
4877 
4878 		if (node_ptr != btr_cur_get_rec(&node_cur)
4879 		    || btr_node_ptr_get_child_page_no(node_ptr, offsets)
4880 		    != block->page.id().page_no()) {
4881 
4882 			btr_validate_report1(index, level, block);
4883 
4884 			fputs("InnoDB: node pointer to the page is wrong\n",
4885 			      stderr);
4886 
4887 			fputs("InnoDB: node ptr ", stderr);
4888 			rec_print(stderr, node_ptr, index);
4889 
4890 			rec = btr_cur_get_rec(&node_cur);
4891 			fprintf(stderr, "\n"
4892 				"InnoDB: node ptr child page n:o %u\n",
4893 				btr_node_ptr_get_child_page_no(rec, offsets));
4894 
4895 			fputs("InnoDB: record on page ", stderr);
4896 			rec_print_new(stderr, rec, offsets);
4897 			putc('\n', stderr);
4898 			ret = false;
4899 
4900 			goto node_ptr_fails;
4901 		}
4902 
4903 		if (!page_is_leaf(page)) {
4904 			node_ptr_tuple = dict_index_build_node_ptr(
4905 				index,
4906 				page_rec_get_next(page_get_infimum_rec(page)),
4907 				0, heap, btr_page_get_level(page));
4908 
4909 			if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
4910 					   offsets)) {
4911 				const rec_t* first_rec = page_rec_get_next(
4912 					page_get_infimum_rec(page));
4913 
4914 				btr_validate_report1(index, level, block);
4915 
4916 				ib::error() << "Node ptrs differ on levels > 0";
4917 
4918 				fputs("InnoDB: node ptr ",stderr);
4919 				rec_print_new(stderr, node_ptr, offsets);
4920 				fputs("InnoDB: first rec ", stderr);
4921 				rec_print(stderr, first_rec, index);
4922 				putc('\n', stderr);
4923 				ret = false;
4924 
4925 				goto node_ptr_fails;
4926 			}
4927 		}
4928 
4929 		if (left_page_no == FIL_NULL) {
4930 			ut_a(node_ptr == page_rec_get_next(
4931 				     page_get_infimum_rec(father_page)));
4932 			ut_a(!page_has_prev(father_page));
4933 		}
4934 
4935 		if (right_page_no == FIL_NULL) {
4936 			ut_a(node_ptr == page_rec_get_prev(
4937 				     page_get_supremum_rec(father_page)));
4938 			ut_a(!page_has_next(father_page));
4939 		} else {
4940 			const rec_t*	right_node_ptr;
4941 
4942 			right_node_ptr = page_rec_get_next(node_ptr);
4943 
4944 			if (!lockout && rightmost_child) {
4945 
4946 				/* To obey latch order of tree blocks,
4947 				we should release the right_block once to
4948 				obtain lock of the uncle block. */
4949 				mtr_release_block_at_savepoint(
4950 					&mtr, savepoint, right_block);
4951 
4952 				if (parent_right_page_no != FIL_NULL) {
4953 					btr_block_get(*index,
4954 						      parent_right_page_no,
4955 						      RW_SX_LATCH, false,
4956 						      &mtr);
4957 				}
4958 
4959 				right_block = btr_block_get(*index,
4960 							    right_page_no,
4961 							    RW_SX_LATCH,
4962 							    !level, &mtr);
4963 			}
4964 
4965 			btr_cur_position(
4966 				index, page_rec_get_next(
4967 					page_get_infimum_rec(
4968 						buf_block_get_frame(
4969 							right_block))),
4970 				right_block, &right_node_cur);
4971 
4972 			offsets = btr_page_get_father_node_ptr_for_validate(
4973 					offsets, heap, &right_node_cur, &mtr);
4974 
4975 			if (right_node_ptr
4976 			    != page_get_supremum_rec(father_page)) {
4977 
4978 				if (btr_cur_get_rec(&right_node_cur)
4979 				    != right_node_ptr) {
4980 					ret = false;
4981 					fputs("InnoDB: node pointer to"
4982 					      " the right page is wrong\n",
4983 					      stderr);
4984 
4985 					btr_validate_report1(index, level,
4986 							     block);
4987 				}
4988 			} else {
4989 				page_t*	right_father_page
4990 					= btr_cur_get_page(&right_node_cur);
4991 
4992 				if (btr_cur_get_rec(&right_node_cur)
4993 				    != page_rec_get_next(
4994 					    page_get_infimum_rec(
4995 						    right_father_page))) {
4996 					ret = false;
4997 					fputs("InnoDB: node pointer 2 to"
4998 					      " the right page is wrong\n",
4999 					      stderr);
5000 
5001 					btr_validate_report1(index, level,
5002 							     block);
5003 				}
5004 
5005 				if (page_get_page_no(right_father_page)
5006 				    != btr_page_get_next(father_page)) {
5007 
5008 					ret = false;
5009 					fputs("InnoDB: node pointer 3 to"
5010 					      " the right page is wrong\n",
5011 					      stderr);
5012 
5013 					btr_validate_report1(index, level,
5014 							     block);
5015 				}
5016 			}
5017 		}
5018 	}
5019 
5020 node_ptr_fails:
5021 	/* Commit the mini-transaction to release the latch on 'page'.
5022 	Re-acquire the latch on right_page, which will become 'page'
5023 	on the next loop.  The page has already been checked. */
5024 	mtr.commit();
5025 
5026 	if (trx_is_interrupted(trx)) {
5027 		/* On interrupt, return the current status. */
5028 	} else if (right_page_no != FIL_NULL) {
5029 
5030 		mtr.start();
5031 
5032 		if (!lockout) {
5033 			if (rightmost_child) {
5034 				if (parent_right_page_no != FIL_NULL) {
5035 					btr_block_get(*index,
5036 						      parent_right_page_no,
5037 						      RW_SX_LATCH, false,
5038 						      &mtr);
5039 				}
5040 			} else if (parent_page_no != FIL_NULL) {
5041 				btr_block_get(*index, parent_page_no,
5042 					      RW_SX_LATCH, false, &mtr);
5043 			}
5044 		}
5045 
5046 		block = btr_block_get(*index, right_page_no, RW_SX_LATCH,
5047 				      !level, &mtr);
5048 		page = buf_block_get_frame(block);
5049 
5050 		goto loop;
5051 	}
5052 
5053 	mem_heap_free(heap);
5054 
5055 	return(ret);
5056 }
5057 
5058 /**************************************************************//**
5059 Checks the consistency of an index tree.
5060 @return	DB_SUCCESS if ok, error code if not */
5061 dberr_t
btr_validate_index(dict_index_t * index,const trx_t * trx)5062 btr_validate_index(
5063 /*===============*/
5064 	dict_index_t*	index,	/*!< in: index */
5065 	const trx_t*	trx)	/*!< in: transaction or NULL */
5066 {
5067 	dberr_t err = DB_SUCCESS;
5068 	bool lockout = dict_index_is_spatial(index);
5069 
5070 	/* Full Text index are implemented by auxiliary tables,
5071 	not the B-tree */
5072 	if (dict_index_is_online_ddl(index) || (index->type & DICT_FTS)) {
5073 		return(err);
5074 	}
5075 
5076 	mtr_t		mtr;
5077 
5078 	mtr_start(&mtr);
5079 
5080 	if (!srv_read_only_mode) {
5081 		if (lockout) {
5082 			mtr_x_lock_index(index, &mtr);
5083 		} else {
5084 			mtr_sx_lock_index(index, &mtr);
5085 		}
5086 	}
5087 
5088 	page_t*	root = btr_root_get(index, &mtr);
5089 
5090 	if (!root) {
5091 		mtr_commit(&mtr);
5092 		return DB_CORRUPTION;
5093 	}
5094 
5095 	ulint	n = btr_page_get_level(root);
5096 
5097 	btr_validate_index_running++;
5098 	for (ulint i = 0; i <= n; ++i) {
5099 
5100 		if (!btr_validate_level(index, trx, n - i, lockout)) {
5101 			err = DB_CORRUPTION;
5102 		}
5103 	}
5104 
5105 	mtr_commit(&mtr);
5106 	/* In theory we need release barrier here, so that
5107 	btr_validate_index_running decrement is guaranteed to
5108 	happen after latches are released.
5109 
5110 	Original code issued SEQ_CST on update and non-atomic
5111 	access on load. Which means it had broken synchronisation
5112 	as well. */
5113 	btr_validate_index_running--;
5114 
5115 	return(err);
5116 }
5117 
5118 /**************************************************************//**
5119 Checks if the page in the cursor can be merged with given page.
5120 If necessary, re-organize the merge_page.
5121 @return	true if possible to merge. */
5122 static
5123 bool
btr_can_merge_with_page(btr_cur_t * cursor,uint32_t page_no,buf_block_t ** merge_block,mtr_t * mtr)5124 btr_can_merge_with_page(
5125 /*====================*/
5126 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
5127 	uint32_t	page_no,	/*!< in: a sibling page */
5128 	buf_block_t**	merge_block,	/*!< out: the merge block */
5129 	mtr_t*		mtr)		/*!< in: mini-transaction */
5130 {
5131 	dict_index_t*	index;
5132 	page_t*		page;
5133 	ulint		n_recs;
5134 	ulint		data_size;
5135 	ulint		max_ins_size_reorg;
5136 	ulint		max_ins_size;
5137 	buf_block_t*	mblock;
5138 	page_t*		mpage;
5139 	DBUG_ENTER("btr_can_merge_with_page");
5140 
5141 	if (page_no == FIL_NULL) {
5142 		*merge_block = NULL;
5143 		DBUG_RETURN(false);
5144 	}
5145 
5146 	index = btr_cur_get_index(cursor);
5147 	page = btr_cur_get_page(cursor);
5148 
5149 	mblock = btr_block_get(*index, page_no, RW_X_LATCH, page_is_leaf(page),
5150 			       mtr);
5151 	mpage = buf_block_get_frame(mblock);
5152 
5153 	n_recs = page_get_n_recs(page);
5154 	data_size = page_get_data_size(page);
5155 
5156 	max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
5157 		mpage, n_recs);
5158 
5159 	if (data_size > max_ins_size_reorg) {
5160 		goto error;
5161 	}
5162 
5163 	/* If compression padding tells us that merging will result in
5164 	too packed up page i.e.: which is likely to cause compression
5165 	failure then don't merge the pages. */
5166 	if (mblock->page.zip.data && page_is_leaf(mpage)
5167 	    && (page_get_data_size(mpage) + data_size
5168 		>= dict_index_zip_pad_optimal_page_size(index))) {
5169 
5170 		goto error;
5171 	}
5172 
5173 	max_ins_size = page_get_max_insert_size(mpage, n_recs);
5174 
5175 	if (data_size > max_ins_size) {
5176 		/* We have to reorganize mpage */
5177 		if (!btr_page_reorganize_block(page_zip_level, mblock, index,
5178 					       mtr)) {
5179 			goto error;
5180 		}
5181 
5182 		max_ins_size = page_get_max_insert_size(mpage, n_recs);
5183 
5184 		ut_ad(page_validate(mpage, index));
5185 		ut_ad(max_ins_size == max_ins_size_reorg);
5186 
5187 		if (data_size > max_ins_size) {
5188 
5189 			/* Add fault tolerance, though this should
5190 			never happen */
5191 
5192 			goto error;
5193 		}
5194 	}
5195 
5196 	*merge_block = mblock;
5197 	DBUG_RETURN(true);
5198 
5199 error:
5200 	*merge_block = NULL;
5201 	DBUG_RETURN(false);
5202 }
5203