1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2021, Oracle and/or its affiliates.
4 Copyright (c) 2012, Facebook Inc.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License, version 2.0,
8 as published by the Free Software Foundation.
9 
10 This program is also distributed with certain software (including
11 but not limited to OpenSSL) that is licensed under separate terms,
12 as designated in a particular file or component or in included license
13 documentation.  The authors of MySQL hereby grant you an additional
14 permission to link the program and your derivative works with the
15 separately licensed software that they have included with MySQL.
16 
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 GNU General Public License, version 2.0, for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
25 
26 *****************************************************************************/
27 
28 /**************************************************//**
29 @file btr/btr0btr.cc
30 The B-tree
31 
32 Created 6/2/1994 Heikki Tuuri
33 *******************************************************/
34 
35 #include "btr0btr.h"
36 
37 #ifdef UNIV_NONINL
38 #include "btr0btr.ic"
39 #endif
40 
41 #include "fsp0sysspace.h"
42 #include "page0page.h"
43 #include "page0zip.h"
44 #include "gis0rtree.h"
45 
46 #ifndef UNIV_HOTBACKUP
47 #include "btr0cur.h"
48 #include "btr0sea.h"
49 #include "btr0pcur.h"
50 #include "rem0cmp.h"
51 #include "lock0lock.h"
52 #include "ibuf0ibuf.h"
53 #include "trx0trx.h"
54 #include "srv0mon.h"
55 #include "gis0geo.h"
56 #include "ut0new.h"
57 #include "dict0boot.h"
58 
59 extern my_bool srv_immediate_scrub_data_uncompressed;
60 
61 /**************************************************************//**
62 Checks if the page in the cursor can be merged with given page.
63 If necessary, re-organize the merge_page.
64 @return	true if possible to merge. */
65 bool
66 btr_can_merge_with_page(
67 /*====================*/
68 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
69 	ulint		page_no,	/*!< in: a sibling page */
70 	buf_block_t**	merge_block,	/*!< out: the merge block */
71 	mtr_t*		mtr);		/*!< in: mini-transaction */
72 
73 #endif /* UNIV_HOTBACKUP */
74 
75 /**************************************************************//**
76 Report that an index page is corrupted. */
77 void
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)78 btr_corruption_report(
79 /*==================*/
80 	const buf_block_t*	block,	/*!< in: corrupted block */
81 	const dict_index_t*	index)	/*!< in: index tree */
82 {
83 	ib::error()
84 		<< "Flag mismatch in page " << block->page.id
85 		<< " index " << index->name
86 		<< " of table " << index->table->name;
87 }
88 
89 #ifndef UNIV_HOTBACKUP
90 /*
91 Latching strategy of the InnoDB B-tree
92 --------------------------------------
93 A tree latch protects all non-leaf nodes of the tree. Each node of a tree
94 also has a latch of its own.
95 
96 A B-tree operation normally first acquires an S-latch on the tree. It
97 searches down the tree and releases the tree latch when it has the
98 leaf node latch. To save CPU time we do not acquire any latch on
99 non-leaf nodes of the tree during a search, those pages are only bufferfixed.
100 
101 If an operation needs to restructure the tree, it acquires an X-latch on
102 the tree before searching to a leaf node. If it needs, for example, to
103 split a leaf,
104 (1) InnoDB decides the split point in the leaf,
105 (2) allocates a new page,
106 (3) inserts the appropriate node pointer to the first non-leaf level,
107 (4) releases the tree X-latch,
108 (5) and then moves records from the leaf to the new allocated page.
109 
110 Node pointers
111 -------------
112 Leaf pages of a B-tree contain the index records stored in the
113 tree. On levels n > 0 we store 'node pointers' to pages on level
114 n - 1. For each page there is exactly one node pointer stored:
115 thus the our tree is an ordinary B-tree, not a B-link tree.
116 
117 A node pointer contains a prefix P of an index record. The prefix
118 is long enough so that it determines an index record uniquely.
119 The file page number of the child page is added as the last
120 field. To the child page we can store node pointers or index records
121 which are >= P in the alphabetical order, but < P1 if there is
122 a next node pointer on the level, and P1 is its prefix.
123 
124 If a node pointer with a prefix P points to a non-leaf child,
125 then the leftmost record in the child must have the same
126 prefix P. If it points to a leaf node, the child is not required
127 to contain any record with a prefix equal to P. The leaf case
128 is decided this way to allow arbitrary deletions in a leaf node
129 without touching upper levels of the tree.
130 
131 We have predefined a special minimum record which we
132 define as the smallest record in any alphabetical order.
133 A minimum record is denoted by setting a bit in the record
134 header. A minimum record acts as the prefix of a node pointer
135 which points to a leftmost node on any level of the tree.
136 
137 File page allocation
138 --------------------
139 In the root node of a B-tree there are two file segment headers.
140 The leaf pages of a tree are allocated from one file segment, to
141 make them consecutive on disk if possible. From the other file segment
142 we allocate pages for the non-leaf levels of the tree.
143 */
144 
145 #ifdef UNIV_BTR_DEBUG
146 /**************************************************************//**
147 Checks a file segment header within a B-tree root page.
148 @return TRUE if valid */
149 static
150 ibool
btr_root_fseg_validate(const fseg_header_t * seg_header,ulint space)151 btr_root_fseg_validate(
152 /*===================*/
153 	const fseg_header_t*	seg_header,	/*!< in: segment header */
154 	ulint			space)		/*!< in: tablespace identifier */
155 {
156 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
157 
158 	if (UNIV_UNLIKELY(srv_pass_corrupt_table != 0)) {
159 		return (mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space)
160 			&& (offset >= FIL_PAGE_DATA)
161 			&& (offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
162 	}
163 
164 	ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
165 	ut_a(offset >= FIL_PAGE_DATA);
166 	ut_a(offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
167 	return(TRUE);
168 }
169 #endif /* UNIV_BTR_DEBUG */
170 
171 /**************************************************************//**
172 Gets the root node of a tree and x- or s-latches it.
173 @return root page, x- or s-latched */
174 buf_block_t*
btr_root_block_get(const dict_index_t * index,ulint mode,mtr_t * mtr)175 btr_root_block_get(
176 /*===============*/
177 	const dict_index_t*	index,	/*!< in: index tree */
178 	ulint			mode,	/*!< in: either RW_S_LATCH
179 					or RW_X_LATCH */
180 	mtr_t*			mtr)	/*!< in: mtr */
181 {
182 	const ulint		space = dict_index_get_space(index);
183 	const page_id_t		page_id(space, dict_index_get_page(index));
184 	const page_size_t	page_size(dict_table_page_size(index->table));
185 
186 	buf_block_t*	block = btr_block_get(page_id, page_size, mode,
187 					      index, mtr);
188 
189 	if (!block && index && index->table && !index->table->is_readable()) {
190 
191 			ib::warn() << "Table in tablespace is encrypted but encryption service or"
192 				" used key_id is not available. "
193 				" Can't continue reading table.";
194 		return NULL;
195 	}
196 
197 	SRV_CORRUPT_TABLE_CHECK(block, return(0););
198 
199 	btr_assert_not_corrupted(block, index);
200 #ifdef UNIV_BTR_DEBUG
201 	if (!dict_index_is_ibuf(index)) {
202 		const page_t*	root = buf_block_get_frame(block);
203 
204 		if (UNIV_UNLIKELY(srv_pass_corrupt_table != 0)) {
205 			if (!btr_root_fseg_validate(FIL_PAGE_DATA
206 						    + PAGE_BTR_SEG_LEAF
207 						    + root, space))
208 				return(NULL);
209 			if (!btr_root_fseg_validate(FIL_PAGE_DATA
210 						    + PAGE_BTR_SEG_TOP
211 						    + root, space))
212 				return(NULL);
213 			return(block);
214 		}
215 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
216 					    + root, space));
217 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
218 					    + root, space));
219 	}
220 #endif /* UNIV_BTR_DEBUG */
221 
222 	return(block);
223 }
224 
225 /**************************************************************//**
226 Gets the root node of a tree and sx-latches it for segment access.
227 @return root page, sx-latched */
228 page_t*
btr_root_get(const dict_index_t * index,mtr_t * mtr)229 btr_root_get(
230 /*=========*/
231 	const dict_index_t*	index,	/*!< in: index tree */
232 	mtr_t*			mtr)	/*!< in: mtr */
233 {
234 	/* Intended to be used for segment list access.
235 	SX lock doesn't block reading user data by other threads.
236 	And block the segment list access by others.*/
237 
238 	buf_block_t* root = btr_root_block_get(index, RW_SX_LATCH,
239 					       mtr);
240 
241 	if (root && root->page.encrypted == true) {
242 		root = NULL;
243 	}
244 
245 	return(root ? buf_block_get_frame(root) : NULL);
246 }
247 
248 /**************************************************************//**
249 Gets the height of the B-tree (the level of the root, when the leaf
250 level is assumed to be 0). The caller must hold an S or X latch on
251 the index.
252 @return tree height (level of the root) */
253 ulint
btr_height_get(dict_index_t * index,mtr_t * mtr)254 btr_height_get(
255 /*===========*/
256 	dict_index_t*	index,	/*!< in: index tree */
257 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
258 {
259 	ulint		height=0;
260 	buf_block_t*	root_block;
261 
262 	ut_ad(srv_read_only_mode
263 	      || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
264 					   MTR_MEMO_S_LOCK
265 					   | MTR_MEMO_X_LOCK
266 					   | MTR_MEMO_SX_LOCK)
267 	      || dict_table_is_intrinsic(index->table));
268 
269 	/* S latches the page */
270 	root_block = btr_root_block_get(index, RW_S_LATCH, mtr);
271 
272 	if (root_block) {
273 		height = btr_page_get_level(buf_block_get_frame(root_block), mtr);
274 
275 		/* Release the S latch on the root page. */
276 		mtr->memo_release(root_block, MTR_MEMO_PAGE_S_FIX);
277 
278 		ut_d(sync_check_unlock(&root_block->lock));
279 	}
280 
281 	return(height);
282 }
283 
284 /**************************************************************//**
285 Checks a file segment header within a B-tree root page and updates
286 the segment header space id.
287 @return TRUE if valid */
288 static
289 bool
btr_root_fseg_adjust_on_import(fseg_header_t * seg_header,page_zip_des_t * page_zip,ulint space,mtr_t * mtr)290 btr_root_fseg_adjust_on_import(
291 /*===========================*/
292 	fseg_header_t*	seg_header,	/*!< in/out: segment header */
293 	page_zip_des_t*	page_zip,	/*!< in/out: compressed page,
294 					or NULL */
295 	ulint		space,		/*!< in: tablespace identifier */
296 	mtr_t*		mtr)		/*!< in/out: mini-transaction */
297 {
298 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
299 
300 	if (offset < FIL_PAGE_DATA
301 	    || offset > UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) {
302 
303 		return(FALSE);
304 
305 	} else if (page_zip) {
306 		mach_write_to_4(seg_header + FSEG_HDR_SPACE, space);
307 		page_zip_write_header(page_zip, seg_header + FSEG_HDR_SPACE,
308 				      4, mtr);
309 	} else {
310 		mlog_write_ulint(seg_header + FSEG_HDR_SPACE,
311 				 space, MLOG_4BYTES, mtr);
312 	}
313 
314 	return(TRUE);
315 }
316 
317 /**************************************************************//**
318 Checks and adjusts the root node of a tree during IMPORT TABLESPACE.
319 @return error code, or DB_SUCCESS */
320 dberr_t
btr_root_adjust_on_import(const dict_index_t * index)321 btr_root_adjust_on_import(
322 /*======================*/
323 	const dict_index_t*	index)	/*!< in: index tree */
324 {
325 	dberr_t			err;
326 	mtr_t			mtr;
327 	page_t*			page;
328 	buf_block_t*		block;
329 	page_zip_des_t*		page_zip;
330 	dict_table_t*		table = index->table;
331 	const ulint		space_id = dict_index_get_space(index);
332 	const page_id_t		page_id(space_id, dict_index_get_page(index));
333 	const page_size_t	page_size(dict_table_page_size(table));
334 
335 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_3",
336 			return(DB_CORRUPTION););
337 
338 	mtr_start(&mtr);
339 
340 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
341 
342 	block = btr_block_get(page_id, page_size, RW_X_LATCH, index, &mtr);
343 
344 	page = buf_block_get_frame(block);
345 	page_zip = buf_block_get_page_zip(block);
346 
347 	if (!page_is_root(page)) {
348 
349 		err = DB_CORRUPTION;
350 
351 	} else if (dict_index_is_clust(index)) {
352 		bool	page_is_compact_format;
353 
354 		page_is_compact_format = page_is_comp(page) > 0;
355 
356 		/* Check if the page format and table format agree. */
357 		if (page_is_compact_format != dict_table_is_comp(table)) {
358 			err = DB_CORRUPTION;
359 		} else {
360 			/* Check that the table flags and the tablespace
361 			flags match. */
362 			ulint	flags = dict_tf_to_fsp_flags(
363 				table->flags,
364 				false,
365 				dict_table_is_encrypted(table));
366 			ulint	fsp_flags = fil_space_get_flags(table->space);
367 
368 			err = fsp_flags_are_equal(flags, fsp_flags)
369 			      ? DB_SUCCESS : DB_CORRUPTION;
370 		}
371 	} else {
372 		err = DB_SUCCESS;
373 	}
374 
375 	/* Check and adjust the file segment headers, if all OK so far. */
376 	if (err == DB_SUCCESS
377 	    && (!btr_root_fseg_adjust_on_import(
378 			FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
379 			+ page, page_zip, space_id, &mtr)
380 		|| !btr_root_fseg_adjust_on_import(
381 			FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
382 			+ page, page_zip, space_id, &mtr))) {
383 
384 		err = DB_CORRUPTION;
385 	}
386 
387 	mtr_commit(&mtr);
388 
389 	return(err);
390 }
391 
392 /**************************************************************//**
393 Creates a new index page (not the root, and also not
394 used in page reorganization).  @see btr_page_empty(). */
395 void
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)396 btr_page_create(
397 /*============*/
398 	buf_block_t*	block,	/*!< in/out: page to be created */
399 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
400 	dict_index_t*	index,	/*!< in: index */
401 	ulint		level,	/*!< in: the B-tree level of the page */
402 	mtr_t*		mtr)	/*!< in: mtr */
403 {
404 	page_t*		page = buf_block_get_frame(block);
405 
406 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
407 
408 	if (page_zip) {
409 		page_create_zip(block, index, level, 0, NULL, mtr);
410 	} else {
411 		page_create(block, mtr, dict_table_is_comp(index->table),
412 			    dict_index_is_spatial(index));
413 		/* Set the level of the new index page */
414 		btr_page_set_level(page, NULL, level, mtr);
415 	}
416 
417 	/* For Spatial Index, initialize the Split Sequence Number */
418 	if (dict_index_is_spatial(index)) {
419 		page_set_ssn_id(block, page_zip, 0, mtr);
420 	}
421 
422 	btr_page_set_index_id(page, page_zip, index->id, mtr);
423 }
424 
425 /**************************************************************//**
426 Allocates a new file page to be used in an ibuf tree. Takes the page from
427 the free list of the tree, which must contain pages!
428 @return new allocated block, x-latched */
429 static
430 buf_block_t*
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)431 btr_page_alloc_for_ibuf(
432 /*====================*/
433 	dict_index_t*	index,	/*!< in: index tree */
434 	mtr_t*		mtr)	/*!< in: mtr */
435 {
436 	fil_addr_t	node_addr;
437 	page_t*		root;
438 	page_t*		new_page;
439 	buf_block_t*	new_block;
440 
441 	root = btr_root_get(index, mtr);
442 
443 	node_addr = flst_get_first(root + PAGE_HEADER
444 				   + PAGE_BTR_IBUF_FREE_LIST, mtr);
445 	ut_a(node_addr.page != FIL_NULL);
446 
447 	new_block = buf_page_get(
448 		page_id_t(dict_index_get_space(index), node_addr.page),
449 		dict_table_page_size(index->table),
450 		RW_X_LATCH, mtr);
451 
452 	new_page = buf_block_get_frame(new_block);
453 	buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
454 
455 	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
456 		    new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
457 		    mtr);
458 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
459 			    mtr));
460 
461 	return(new_block);
462 }
463 
464 /**************************************************************//**
465 Allocates a new file page to be used in an index tree. NOTE: we assume
466 that the caller has made the reservation for free extents!
467 @retval NULL if no page could be allocated
468 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
469 (init_mtr == mtr, or the page was not previously freed in mtr)
470 @retval block (not allocated or initialized) otherwise */
471 static MY_ATTRIBUTE((nonnull, warn_unused_result))
472 buf_block_t*
btr_page_alloc_low(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)473 btr_page_alloc_low(
474 /*===============*/
475 	dict_index_t*	index,		/*!< in: index */
476 	ulint		hint_page_no,	/*!< in: hint of a good page */
477 	byte		file_direction,	/*!< in: direction where a possible
478 					page split is made */
479 	ulint		level,		/*!< in: level where the page is placed
480 					in the tree */
481 	mtr_t*		mtr,		/*!< in/out: mini-transaction
482 					for the allocation */
483 	mtr_t*		init_mtr)	/*!< in/out: mtr or another
484 					mini-transaction in which the
485 					page should be initialized.
486 					If init_mtr!=mtr, but the page
487 					is already X-latched in mtr, do
488 					not initialize the page. */
489 {
490 	fseg_header_t*	seg_header;
491 	page_t*		root;
492 
493 	root = btr_root_get(index, mtr);
494 
495 	if (level == 0) {
496 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
497 	} else {
498 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
499 	}
500 
501 	/* Parameter TRUE below states that the caller has made the
502 	reservation for free extents, and thus we know that a page can
503 	be allocated: */
504 
505 	return(fseg_alloc_free_page_general(
506 		       seg_header, hint_page_no, file_direction,
507 		       TRUE, mtr, init_mtr));
508 }
509 
510 /**************************************************************//**
511 Allocates a new file page to be used in an index tree. NOTE: we assume
512 that the caller has made the reservation for free extents!
513 @retval NULL if no page could be allocated
514 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
515 (init_mtr == mtr, or the page was not previously freed in mtr)
516 @retval block (not allocated or initialized) otherwise */
517 buf_block_t*
btr_page_alloc(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)518 btr_page_alloc(
519 /*===========*/
520 	dict_index_t*	index,		/*!< in: index */
521 	ulint		hint_page_no,	/*!< in: hint of a good page */
522 	byte		file_direction,	/*!< in: direction where a possible
523 					page split is made */
524 	ulint		level,		/*!< in: level where the page is placed
525 					in the tree */
526 	mtr_t*		mtr,		/*!< in/out: mini-transaction
527 					for the allocation */
528 	mtr_t*		init_mtr)	/*!< in/out: mini-transaction
529 					for x-latching and initializing
530 					the page */
531 {
532 	buf_block_t*	new_block;
533 
534 	if (dict_index_is_ibuf(index)) {
535 
536 		return(btr_page_alloc_for_ibuf(index, mtr));
537 	}
538 
539 	new_block = btr_page_alloc_low(
540 		index, hint_page_no, file_direction, level, mtr, init_mtr);
541 
542 	if (new_block) {
543 		buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
544 	}
545 
546 	return(new_block);
547 }
548 
549 /**************************************************************//**
550 Gets the number of pages in a B-tree.
551 @return number of pages, or ULINT_UNDEFINED if the index is unavailable */
552 ulint
btr_get_size(dict_index_t * index,ulint flag,mtr_t * mtr)553 btr_get_size(
554 /*=========*/
555 	dict_index_t*	index,	/*!< in: index */
556 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
557 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
558 				is s-latched */
559 {
560 	fseg_header_t*	seg_header;
561 	page_t*		root;
562 	ulint		n;
563 	ulint		dummy;
564 
565 	ut_ad(srv_read_only_mode
566 	      || mtr_memo_contains(mtr, dict_index_get_lock(index),
567 				   MTR_MEMO_S_LOCK)
568 	      || dict_table_is_intrinsic(index->table));
569 
570 	if (index->page == FIL_NULL
571 	    || dict_index_is_online_ddl(index)
572 	    || !index->is_committed()) {
573 		return(ULINT_UNDEFINED);
574 	}
575 
576 	root = btr_root_get(index, mtr);
577 
578 	if (!root && index->table->is_readable() == false)
579 		return ULINT_UNDEFINED;
580 
581 	SRV_CORRUPT_TABLE_CHECK(root,
582 	{
583 		mtr_commit(mtr);
584 		return(ULINT_UNDEFINED);
585 	});
586 
587 	if (flag == BTR_N_LEAF_PAGES) {
588 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
589 
590 		fseg_n_reserved_pages(seg_header, &n, mtr);
591 
592 	} else if (flag == BTR_TOTAL_SIZE) {
593 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
594 
595 		n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
596 
597 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
598 
599 		n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
600 	} else {
601 		ut_error;
602 	}
603 
604 	return(n);
605 }
606 
607 /**************************************************************//**
608 Frees a page used in an ibuf tree. Puts the page to the free list of the
609 ibuf tree. */
610 static
611 void
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)612 btr_page_free_for_ibuf(
613 /*===================*/
614 	dict_index_t*	index,	/*!< in: index tree */
615 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
616 	mtr_t*		mtr)	/*!< in: mtr */
617 {
618 	page_t*		root;
619 
620 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
621 	root = btr_root_get(index, mtr);
622 
623 	flst_add_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
624 		       buf_block_get_frame(block)
625 		       + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
626 
627 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
628 			    mtr));
629 }
630 
631 /**************************************************************//**
632 Frees a file page used in an index tree. Can be used also to (BLOB)
633 external storage pages. */
634 void
btr_page_free_low(dict_index_t * index,buf_block_t * block,ulint level,mtr_t * mtr)635 btr_page_free_low(
636 /*==============*/
637 	dict_index_t*	index,	/*!< in: index tree */
638 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
639 	ulint		level,	/*!< in: page level (ULINT_UNDEFINED=BLOB) */
640 	mtr_t*		mtr)	/*!< in: mtr */
641 {
642 	fseg_header_t*	seg_header;
643 	page_t*		root;
644 
645 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
646 	/* The page gets invalid for optimistic searches: increment the frame
647 	modify clock */
648 
649 	buf_block_modify_clock_inc(block);
650 
651 	const bool scrub = srv_immediate_scrub_data_uncompressed;
652 
653 	if (scrub) {
654 		/* MariaDB code says that we can only use this code for blobs,
655 		 * but apparently, and can only scrub parts of normal pages -
656 		 * but with this approach, data remains there, and this doesn't
657 		 * cause any test failures.
658 		 */
659                 page_t* page = buf_block_get_frame(block);
660                 memset(page + PAGE_HEADER, 0,
661                        srv_page_size - PAGE_HEADER);
662 	}
663 	if (dict_index_is_ibuf(index)) {
664 
665 		btr_page_free_for_ibuf(index, block, mtr);
666 
667 		return;
668 	}
669 
670 	root = btr_root_get(index, mtr);
671 
672 	if (level == 0 || level == ULINT_UNDEFINED) {
673 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
674 	} else {
675 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
676 	}
677 
678 #ifdef UNIV_GIS_DEBUG
679 	if (dict_index_is_spatial(index)) {
680 		fprintf(stderr, "GIS_DIAG: Freed  %ld\n",
681 			(long) block->page.id.page_no());
682 	}
683 #endif
684 
685         if (scrub) {
686                 /**
687                 * Reset page type so that scrub thread won't try to scrub it
688                 */
689                 mlog_write_ulint(buf_block_get_frame(block) + FIL_PAGE_TYPE,
690                                  FIL_PAGE_TYPE_ALLOCATED, MLOG_2BYTES, mtr);
691         }
692 
693 	fseg_free_page(seg_header,
694 		       block->page.id.space(),
695 		       block->page.id.page_no(),
696 		       level != ULINT_UNDEFINED, mtr);
697 
698 	/* The page was marked free in the allocation bitmap, but it
699 	should remain buffer-fixed until mtr_commit(mtr) or until it
700 	is explicitly freed from the mini-transaction. */
701 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
702 	/* TODO: Discard any operations on the page from the redo log
703 	and remove the block from the flush list and the buffer pool.
704 	This would free up buffer pool earlier and reduce writes to
705 	both the tablespace and the redo log. */
706 }
707 
708 /**************************************************************//**
709 Frees a file page used in an index tree. NOTE: cannot free field external
710 storage pages because the page must contain info on its level. */
711 void
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr)712 btr_page_free(
713 /*==========*/
714 	dict_index_t*	index,	/*!< in: index tree */
715 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
716 	mtr_t*		mtr)	/*!< in: mtr */
717 {
718 	const page_t*	page	= buf_block_get_frame(block);
719 	ulint		level	= btr_page_get_level(page, mtr);
720 
721 	ut_ad(fil_page_index_page_check(block->frame));
722 	ut_ad(level != ULINT_UNDEFINED);
723 	btr_page_free_low(index, block, level, mtr);
724 }
725 
726 /**************************************************************//**
727 Sets the child node file address in a node pointer. */
728 UNIV_INLINE
729 void
btr_node_ptr_set_child_page_no(rec_t * rec,page_zip_des_t * page_zip,const ulint * offsets,ulint page_no,mtr_t * mtr)730 btr_node_ptr_set_child_page_no(
731 /*===========================*/
732 	rec_t*		rec,	/*!< in: node pointer record */
733 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
734 				part will be updated, or NULL */
735 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
736 	ulint		page_no,/*!< in: child node address */
737 	mtr_t*		mtr)	/*!< in: mtr */
738 {
739 	byte*	field;
740 	ulint	len;
741 
742 	ut_ad(rec_offs_validate(rec, NULL, offsets));
743 	ut_ad(!page_is_leaf(page_align(rec)));
744 	ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
745 
746 	/* The child address is in the last field */
747 	field = rec_get_nth_field(rec, offsets,
748 				  rec_offs_n_fields(offsets) - 1, &len);
749 
750 	ut_ad(len == REC_NODE_PTR_SIZE);
751 
752 	if (page_zip) {
753 		page_zip_write_node_ptr(page_zip, rec,
754 					rec_offs_data_size(offsets),
755 					page_no, mtr);
756 	} else {
757 		mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
758 	}
759 }
760 
761 /************************************************************//**
762 Returns the child page of a node pointer and sx-latches it.
763 @return child page, sx-latched */
764 static
765 buf_block_t*
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const ulint * offsets,mtr_t * mtr)766 btr_node_ptr_get_child(
767 /*===================*/
768 	const rec_t*	node_ptr,/*!< in: node pointer */
769 	dict_index_t*	index,	/*!< in: index */
770 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
771 	mtr_t*		mtr)	/*!< in: mtr */
772 {
773 	ut_ad(rec_offs_validate(node_ptr, index, offsets));
774 
775 	const page_id_t	page_id(
776 		page_get_space_id(page_align(node_ptr)),
777 		btr_node_ptr_get_child_page_no(node_ptr, offsets));
778 
779 	return(btr_block_get(page_id, dict_table_page_size(index->table),
780 			     RW_SX_LATCH, index, mtr));
781 }
782 
783 /************************************************************//**
784 Returns the upper level node pointer to a page. It is assumed that mtr holds
785 an sx-latch on the tree.
786 @return rec_get_offsets() of the node pointer record */
787 static
788 ulint*
btr_page_get_father_node_ptr_func(ulint * offsets,mem_heap_t * heap,btr_cur_t * cursor,ulint latch_mode,const char * file,ulint line,mtr_t * mtr)789 btr_page_get_father_node_ptr_func(
790 /*==============================*/
791 	ulint*		offsets,/*!< in: work area for the return value */
792 	mem_heap_t*	heap,	/*!< in: memory heap to use */
793 	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
794 				out: cursor on node pointer record,
795 				its page x-latched */
796 	ulint		latch_mode,/*!< in: BTR_CONT_MODIFY_TREE
797 				or BTR_CONT_SEARCH_TREE */
798 	const char*	file,	/*!< in: file name */
799 	ulint		line,	/*!< in: line where called */
800 	mtr_t*		mtr)	/*!< in: mtr */
801 {
802 	dtuple_t*	tuple;
803 	rec_t*		user_rec;
804 	rec_t*		node_ptr;
805 	ulint		level;
806 	ulint		page_no;
807 	dict_index_t*	index;
808 
809 	ut_ad(latch_mode == BTR_CONT_MODIFY_TREE
810 	      || latch_mode == BTR_CONT_SEARCH_TREE);
811 
812 	page_no = btr_cur_get_block(cursor)->page.id.page_no();
813 	index = btr_cur_get_index(cursor);
814 	ut_ad(!dict_index_is_spatial(index));
815 
816 	ut_ad(srv_read_only_mode
817 	      || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
818 					   MTR_MEMO_X_LOCK
819 					   | MTR_MEMO_SX_LOCK)
820 	      || dict_table_is_intrinsic(index->table));
821 
822 	ut_ad(dict_index_get_page(index) != page_no);
823 
824 	level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
825 
826 	user_rec = btr_cur_get_rec(cursor);
827 	ut_a(page_rec_is_user_rec(user_rec));
828 
829 	tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
830 	if (dict_table_is_intrinsic(index->table)) {
831 		btr_cur_search_to_nth_level_with_no_latch(
832 			index, level + 1, tuple, PAGE_CUR_LE, cursor,
833 			file, line, mtr);
834 	} else {
835 		btr_cur_search_to_nth_level(
836 			index, level + 1, tuple,
837 			PAGE_CUR_LE, latch_mode, cursor, 0,
838 			file, line, mtr);
839 	}
840 
841 	node_ptr = btr_cur_get_rec(cursor);
842 	ut_ad(!page_rec_is_comp(node_ptr)
843 	      || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
844 	offsets = rec_get_offsets(node_ptr, index, offsets,
845 				  ULINT_UNDEFINED, &heap);
846 
847 	if (btr_node_ptr_get_child_page_no(node_ptr, offsets) != page_no) {
848 		rec_t*	print_rec;
849 
850 		ib::error()
851 			<< "Corruption of an index tree: table "
852 			<< index->table->name
853 			<< " index " << index->name
854 			<< ", father ptr page no "
855 			<< btr_node_ptr_get_child_page_no(node_ptr, offsets)
856 			<< ", child page no " << page_no;
857 
858 		print_rec = page_rec_get_next(
859 			page_get_infimum_rec(page_align(user_rec)));
860 		offsets = rec_get_offsets(print_rec, index,
861 					  offsets, ULINT_UNDEFINED, &heap);
862 		page_rec_print(print_rec, offsets);
863 		offsets = rec_get_offsets(node_ptr, index, offsets,
864 					  ULINT_UNDEFINED, &heap);
865 		page_rec_print(node_ptr, offsets);
866 
867 		ib::fatal()
868 			<< "You should dump + drop + reimport the table to"
869 			<< " fix the corruption. If the crash happens at"
870 			<< " database startup. " << FORCE_RECOVERY_MSG
871 			<< " Then dump + drop + reimport.";
872 	}
873 
874 	return(offsets);
875 }
876 
877 #define btr_page_get_father_node_ptr(of,heap,cur,mtr)			\
878 	btr_page_get_father_node_ptr_func(				\
879 		of,heap,cur,BTR_CONT_MODIFY_TREE,__FILE__,__LINE__,mtr)
880 
881 #define btr_page_get_father_node_ptr_for_validate(of,heap,cur,mtr)	\
882 	btr_page_get_father_node_ptr_func(				\
883 		of,heap,cur,BTR_CONT_SEARCH_TREE,__FILE__,__LINE__,mtr)
884 
885 /************************************************************//**
886 Returns the upper level node pointer to a page. It is assumed that mtr holds
887 an x-latch on the tree.
888 @return rec_get_offsets() of the node pointer record */
889 static
890 ulint*
btr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)891 btr_page_get_father_block(
892 /*======================*/
893 	ulint*		offsets,/*!< in: work area for the return value */
894 	mem_heap_t*	heap,	/*!< in: memory heap to use */
895 	dict_index_t*	index,	/*!< in: b-tree index */
896 	buf_block_t*	block,	/*!< in: child page in the index */
897 	mtr_t*		mtr,	/*!< in: mtr */
898 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
899 				its page x-latched */
900 {
901 	rec_t*	rec
902 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
903 								 block)));
904 	btr_cur_position(index, rec, block, cursor);
905 	return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
906 }
907 
908 /************************************************************//**
909 Seeks to the upper level node pointer to a page.
910 It is assumed that mtr holds an x-latch on the tree. */
911 static
912 void
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)913 btr_page_get_father(
914 /*================*/
915 	dict_index_t*	index,	/*!< in: b-tree index */
916 	buf_block_t*	block,	/*!< in: child page in the index */
917 	mtr_t*		mtr,	/*!< in: mtr */
918 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
919 				its page x-latched */
920 {
921 	mem_heap_t*	heap;
922 	rec_t*		rec
923 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
924 								 block)));
925 	btr_cur_position(index, rec, block, cursor);
926 
927 	heap = mem_heap_create(100);
928 	btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
929 	mem_heap_free(heap);
930 }
931 
932 /** Free a B-tree root page. btr_free_but_not_root() must already
933 have been called.
934 In a persistent tablespace, the caller must invoke fsp_init_file_page()
935 before mtr.commit().
936 @param[in,out]	block	index root page
937 @param[in,out]	mtr	mini-transaction */
938 static
939 void
btr_free_root(buf_block_t * block,mtr_t * mtr)940 btr_free_root(
941 	buf_block_t*	block,
942 	mtr_t*		mtr)
943 {
944 	fseg_header_t*	header;
945 
946 	ut_ad(mtr_memo_contains_flagged(mtr, block, MTR_MEMO_PAGE_X_FIX));
947 	ut_ad(mtr->is_named_space(block->page.id.space()));
948 
949 	SRV_CORRUPT_TABLE_CHECK(block, return;);
950 
951 	btr_search_drop_page_hash_index(block);
952 
953 	header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
954 #ifdef UNIV_BTR_DEBUG
955 	ut_a(btr_root_fseg_validate(header, block->page.id.space()));
956 #endif /* UNIV_BTR_DEBUG */
957 
958 	while (!fseg_free_step(header, true, mtr)) {
959 		/* Free the entire segment in small steps. */
960 	}
961 }
962 
963 /** PAGE_INDEX_ID value for freed index B-trees */
964 static const index_id_t	BTR_FREED_INDEX_ID = 0;
965 
966 /** Invalidate an index root page so that btr_free_root_check()
967 will not find it.
968 @param[in,out]	block	index root page
969 @param[in,out]	mtr	mini-transaction */
970 static
971 void
btr_free_root_invalidate(buf_block_t * block,mtr_t * mtr)972 btr_free_root_invalidate(
973 	buf_block_t*	block,
974 	mtr_t*		mtr)
975 {
976 	ut_ad(page_is_root(block->frame));
977 
978 	btr_page_set_index_id(
979 		buf_block_get_frame(block),
980 		buf_block_get_page_zip(block),
981 		BTR_FREED_INDEX_ID, mtr);
982 }
983 
984 /** Prepare to free a B-tree.
985 @param[in]	page_id		page id
986 @param[in]	page_size	page size
987 @param[in]	index_id	PAGE_INDEX_ID contents
988 @param[in,out]	mtr		mini-transaction
989 @return root block, to invoke btr_free_but_not_root() and btr_free_root()
990 @retval NULL if the page is no longer a matching B-tree page */
991 static MY_ATTRIBUTE((warn_unused_result))
992 buf_block_t*
btr_free_root_check(const page_id_t & page_id,const page_size_t & page_size,index_id_t index_id,mtr_t * mtr)993 btr_free_root_check(
994 	const page_id_t&	page_id,
995 	const page_size_t&	page_size,
996 	index_id_t		index_id,
997 	mtr_t*			mtr)
998 {
999 	ut_ad(page_id.space() != srv_tmp_space.space_id());
1000 	ut_ad(index_id != BTR_FREED_INDEX_ID);
1001 
1002 	buf_block_t*	block = buf_page_get(
1003 		page_id, page_size, RW_X_LATCH, mtr);
1004 
1005 	if (block) {
1006 		buf_block_dbg_add_level(block, SYNC_TREE_NODE);
1007 
1008 		if (fil_page_index_page_check(block->frame)
1009 		   && index_id == btr_page_get_index_id(block->frame)) {
1010 			/* This should be a root page.
1011 			It should not be possible to reassign the same
1012 			index_id for some other index in the tablespace. */
1013 			ut_ad(page_is_root(block->frame));
1014 		} else {
1015 			block = NULL;
1016 		}
1017         }
1018 
1019 	return(block);
1020 }
1021 
1022 /** Create the root node for a new index tree.
1023 @param[in]	type			type of the index
1024 @param[in]	space			space where created
1025 @param[in]	page_size		page size
1026 @param[in]	index_id		index id
1027 @param[in]	index			index, or NULL when applying TRUNCATE
1028 log record during recovery
1029 @param[in]	btr_redo_create_info	used for applying TRUNCATE log
1030 @param[in]	mtr			mini-transaction handle
1031 record during recovery
1032 @return page number of the created root, FIL_NULL if did not succeed */
1033 ulint
btr_create(ulint type,ulint space,const page_size_t & page_size,index_id_t index_id,dict_index_t * index,const btr_create_t * btr_redo_create_info,mtr_t * mtr)1034 btr_create(
1035 	ulint			type,
1036 	ulint			space,
1037 	const page_size_t&	page_size,
1038 	index_id_t		index_id,
1039 	dict_index_t*		index,
1040 	const btr_create_t*	btr_redo_create_info,
1041 	mtr_t*			mtr)
1042 {
1043 	ulint			page_no;
1044 	buf_block_t*		block;
1045 	buf_frame_t*		frame;
1046 	page_t*			page;
1047 	page_zip_des_t*		page_zip;
1048 
1049 	ut_ad(mtr->is_named_space(space));
1050 	ut_ad(index_id != BTR_FREED_INDEX_ID);
1051 
1052 	/* Create the two new segments (one, in the case of an ibuf tree) for
1053 	the index tree; the segment headers are put on the allocated root page
1054 	(for an ibuf tree, not in the root, but on a separate ibuf header
1055 	page) */
1056 
1057 	if (type & DICT_IBUF) {
1058 		/* Allocate first the ibuf header page */
1059 		buf_block_t*	ibuf_hdr_block = fseg_create(
1060 			space, 0,
1061 			IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
1062 
1063 		if (ibuf_hdr_block == NULL) {
1064 			return(FIL_NULL);
1065 		}
1066 
1067 		buf_block_dbg_add_level(
1068 			ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
1069 
1070 		ut_ad(ibuf_hdr_block->page.id.page_no()
1071 		      == IBUF_HEADER_PAGE_NO);
1072 		/* Allocate then the next page to the segment: it will be the
1073 		tree root page */
1074 
1075 		block = fseg_alloc_free_page(
1076 			buf_block_get_frame(ibuf_hdr_block)
1077 			+ IBUF_HEADER + IBUF_TREE_SEG_HEADER,
1078 			IBUF_TREE_ROOT_PAGE_NO,
1079 			FSP_UP, mtr);
1080 		ut_ad(block->page.id.page_no() == IBUF_TREE_ROOT_PAGE_NO);
1081 	} else {
1082 		block = fseg_create(space, 0,
1083 				    PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
1084 	}
1085 
1086 	if (block == NULL) {
1087 
1088 		return(FIL_NULL);
1089 	}
1090 
1091 	page_no = block->page.id.page_no();
1092 	frame = buf_block_get_frame(block);
1093 
1094 	if (type & DICT_IBUF) {
1095 		/* It is an insert buffer tree: initialize the free list */
1096 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
1097 
1098 		ut_ad(page_no == IBUF_TREE_ROOT_PAGE_NO);
1099 
1100 		flst_init(frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
1101 	} else {
1102 		/* It is a non-ibuf tree: create a file segment for leaf
1103 		pages */
1104 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1105 
1106 		if (!fseg_create(space, page_no,
1107 				 PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr)) {
1108 			/* Not enough space for new segment, free root
1109 			segment before return. */
1110 			btr_free_root(block, mtr);
1111 			if (!dict_table_is_temporary(index->table)) {
1112 				btr_free_root_invalidate(block, mtr);
1113 			}
1114 
1115 			return(FIL_NULL);
1116 		}
1117 
1118 		/* The fseg create acquires a second latch on the page,
1119 		therefore we must declare it: */
1120 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1121 	}
1122 
1123 	/* Create a new index page on the allocated segment page */
1124 	page_zip = buf_block_get_page_zip(block);
1125 
1126 	if (page_zip) {
1127 		if (index != NULL) {
1128 			page = page_create_zip(block, index, 0, 0, NULL, mtr);
1129 		} else {
1130 			/* Create a compressed index page when applying
1131 			TRUNCATE log record during recovery */
1132 			ut_ad(btr_redo_create_info != NULL);
1133 
1134 			redo_page_compress_t	page_comp_info;
1135 
1136 			page_comp_info.type = type;
1137 
1138 			page_comp_info.index_id = index_id;
1139 
1140 			page_comp_info.n_fields =
1141 				btr_redo_create_info->n_fields;
1142 
1143 			page_comp_info.field_len =
1144 				btr_redo_create_info->field_len;
1145 
1146 			page_comp_info.fields = btr_redo_create_info->fields;
1147 
1148 			page_comp_info.trx_id_pos =
1149 				btr_redo_create_info->trx_id_pos;
1150 
1151 			page = page_create_zip(block, NULL, 0, 0,
1152 					       &page_comp_info, mtr);
1153 		}
1154 	} else {
1155 		if (index != NULL) {
1156 			page = page_create(block, mtr,
1157 					   dict_table_is_comp(index->table),
1158 					   dict_index_is_spatial(index));
1159 		} else {
1160 			ut_ad(btr_redo_create_info != NULL);
1161 			page = page_create(
1162 				block, mtr, btr_redo_create_info->format_flags,
1163 				type == DICT_SPATIAL);
1164 		}
1165 		/* Set the level of the new index page */
1166 		btr_page_set_level(page, NULL, 0, mtr);
1167 	}
1168 
1169 	/* Set the index id of the page */
1170 	btr_page_set_index_id(page, page_zip, index_id, mtr);
1171 
1172 	/* Set the next node and previous node fields */
1173 	btr_page_set_next(page, page_zip, FIL_NULL, mtr);
1174 	btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
1175 
1176 	/* We reset the free bits for the page to allow creation of several
1177 	trees in the same mtr, otherwise the latch on a bitmap page would
1178 	prevent it because of the latching order.
1179 
1180 	index will be NULL if we are recreating the table during recovery
1181 	on behalf of TRUNCATE.
1182 
1183 	Note: Insert Buffering is disabled for temporary tables given that
1184 	most temporary tables are smaller in size and short-lived. */
1185 	if (!(type & DICT_CLUSTERED)
1186 	    && (index == NULL || !dict_table_is_temporary(index->table))) {
1187 
1188 		ibuf_reset_free_bits(block);
1189 	}
1190 
1191 	/* In the following assertion we test that two records of maximum
1192 	allowed size fit on the root page: this fact is needed to ensure
1193 	correctness of split algorithms */
1194 
1195 	ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);
1196 
1197 	return(page_no);
1198 }
1199 
1200 /** Free a B-tree except the root page. The root page MUST be freed after
1201 this by calling btr_free_root.
1202 @param[in,out]	block		root page
1203 @param[in]	log_mode	mtr logging mode
1204 @param[in]	is_ahi_allowed	false for intrinsic tables because AHI
1205 				is disallowed. See dict_index_t->disable_ahi,
1206 				true for other tables */
1207 static
1208 void
btr_free_but_not_root(buf_block_t * block,mtr_log_t log_mode,bool is_ahi_allowed)1209 btr_free_but_not_root(
1210 	buf_block_t*	block,
1211 	mtr_log_t	log_mode,
1212 	bool		is_ahi_allowed)
1213 
1214 {
1215 	ibool	finished;
1216 	mtr_t	mtr;
1217 
1218 	ut_ad(page_is_root(block->frame));
1219 	bool ahi = false;
1220 	if (is_ahi_allowed) {
1221 		ut_ad(mutex_own(&dict_sys->mutex));
1222 		ahi = btr_search_enabled;
1223 	}
1224 
1225 leaf_loop:
1226 	mtr_start(&mtr);
1227 	mtr_set_log_mode(&mtr, log_mode);
1228 	mtr.set_named_space(block->page.id.space());
1229 
1230 	page_t*	root = block->frame;
1231 
1232 	SRV_CORRUPT_TABLE_CHECK(root,
1233 	{
1234 		mtr_commit(&mtr);
1235 		return;
1236 	});
1237 
1238 #ifdef UNIV_BTR_DEBUG
1239 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1240 				    + root, block->page.id.space()));
1241 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1242 				    + root, block->page.id.space()));
1243 #endif /* UNIV_BTR_DEBUG */
1244 
1245 	/* NOTE: page hash indexes are dropped when a page is freed inside
1246 	fsp0fsp. */
1247 
1248 	finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
1249 				  ahi, &mtr);
1250 	mtr_commit(&mtr);
1251 
1252 	if (!finished) {
1253 
1254 		goto leaf_loop;
1255 	}
1256 top_loop:
1257 	mtr_start(&mtr);
1258 	mtr_set_log_mode(&mtr, log_mode);
1259 	mtr.set_named_space(block->page.id.space());
1260 
1261 	root = block->frame;
1262 
1263 	SRV_CORRUPT_TABLE_CHECK(root,
1264 	{
1265 		mtr_commit(&mtr);
1266 		return;
1267 	});
1268 
1269 #ifdef UNIV_BTR_DEBUG
1270 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1271 				    + root, block->page.id.space()));
1272 #endif /* UNIV_BTR_DEBUG */
1273 
1274 	finished = fseg_free_step_not_header(
1275 		root + PAGE_HEADER + PAGE_BTR_SEG_TOP, ahi, &mtr);
1276 	mtr_commit(&mtr);
1277 
1278 	if (!finished) {
1279 
1280 		goto top_loop;
1281 	}
1282 }
1283 
1284 /** Free a persistent index tree if it exists.
1285 @param[in]	page_id		root page id
1286 @param[in]	page_size	page size
1287 @param[in]	index_id	PAGE_INDEX_ID contents
1288 @param[in,out]	mtr		mini-transaction */
1289 void
btr_free_if_exists(const page_id_t & page_id,const page_size_t & page_size,index_id_t index_id,mtr_t * mtr)1290 btr_free_if_exists(
1291 	const page_id_t&	page_id,
1292 	const page_size_t&	page_size,
1293 	index_id_t		index_id,
1294 	mtr_t*			mtr)
1295 {
1296 	buf_block_t* root = btr_free_root_check(
1297 		page_id, page_size, index_id, mtr);
1298 
1299 	if (root == NULL) {
1300 		return;
1301 	}
1302 
1303 	btr_free_but_not_root(root, mtr->get_log_mode(), true);
1304 	mtr->set_named_space(page_id.space());
1305 	btr_free_root(root, mtr);
1306 	btr_free_root_invalidate(root, mtr);
1307 }
1308 
1309 /** Free an index tree in a temporary tablespace or during TRUNCATE TABLE.
1310 @param[in]	page_id		root page id
1311 @param[in]	page_size	page size
1312 @param[in]	is_intrinsic	true for intrinsic tables else false */
1313 void
btr_free(const page_id_t & page_id,const page_size_t & page_size,bool is_intrinsic)1314 btr_free(
1315 	const page_id_t&	page_id,
1316 	const page_size_t&	page_size,
1317 	bool			is_intrinsic)
1318 {
1319 	mtr_t		mtr;
1320 	mtr.start();
1321 	mtr.set_log_mode(MTR_LOG_NO_REDO);
1322 
1323 	buf_block_t*	block = buf_page_get(
1324 		page_id, page_size, RW_X_LATCH, &mtr);
1325 
1326 	ut_ad(page_is_root(block->frame));
1327 
1328 	btr_free_but_not_root(block, MTR_LOG_NO_REDO, !is_intrinsic);
1329 	btr_free_root(block, &mtr);
1330 	mtr.commit();
1331 }
1332 #endif /* !UNIV_HOTBACKUP */
1333 
1334 /*************************************************************//**
1335 Reorganizes an index page.
1336 
1337 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1338 if this is a compressed leaf page in a secondary index. This has to
1339 be done either within the same mini-transaction, or by invoking
1340 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1341 IBUF_BITMAP_FREE is unaffected by reorganization.
1342 
1343 @retval true if the operation was successful
1344 @retval false if it is a compressed page, and recompression failed */
1345 bool
btr_page_reorganize_low(bool recovery,ulint z_level,page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1346 btr_page_reorganize_low(
1347 /*====================*/
1348 	bool		recovery,/*!< in: true if called in recovery:
1349 				locks should not be updated, i.e.,
1350 				there cannot exist locks on the
1351 				page, and a hash index should not be
1352 				dropped: it cannot exist */
1353 	ulint		z_level,/*!< in: compression level to be used
1354 				if dealing with compressed page */
1355 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1356 	dict_index_t*	index,	/*!< in: the index tree of the page */
1357 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1358 {
1359 	buf_block_t*	block		= page_cur_get_block(cursor);
1360 #ifndef UNIV_HOTBACKUP
1361 	buf_pool_t*	buf_pool	= buf_pool_from_bpage(&block->page);
1362 #endif /* !UNIV_HOTBACKUP */
1363 	page_t*		page		= buf_block_get_frame(block);
1364 	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
1365 	buf_block_t*	temp_block;
1366 	page_t*		temp_page;
1367 	ulint		data_size1;
1368 	ulint		data_size2;
1369 	ulint		max_ins_size1;
1370 	ulint		max_ins_size2;
1371 	bool		success		= false;
1372 	ulint		pos;
1373 	bool		log_compressed;
1374 	bool		is_spatial;
1375 
1376 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1377 	btr_assert_not_corrupted(block, index);
1378 #ifdef UNIV_ZIP_DEBUG
1379 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1380 #endif /* UNIV_ZIP_DEBUG */
1381 	data_size1 = page_get_data_size(page);
1382 	max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);
1383 
1384 	/* Turn logging off */
1385 	mtr_log_t	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1386 
1387 #ifndef UNIV_HOTBACKUP
1388 	temp_block = buf_block_alloc(buf_pool);
1389 #else /* !UNIV_HOTBACKUP */
1390 	ut_ad(block == back_block1);
1391 	temp_block = back_block2;
1392 #endif /* !UNIV_HOTBACKUP */
1393 	temp_page = temp_block->frame;
1394 
1395 	MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
1396 
1397 	/* This function can be called by log redo with a "dummy" index.
1398 	So we would trust more on the original page's type */
1399 	is_spatial = (fil_page_get_type(page) == FIL_PAGE_RTREE
1400 		      || dict_index_is_spatial(index));
1401 
1402 	/* Copy the old page to temporary space */
1403 	buf_frame_copy(temp_page, page);
1404 
1405 #ifndef UNIV_HOTBACKUP
1406 	if (!recovery) {
1407 		btr_search_drop_page_hash_index(block);
1408 	}
1409 #endif /* !UNIV_HOTBACKUP */
1410 
1411 	/* Save the cursor position. */
1412 	pos = page_rec_get_n_recs_before(page_cur_get_rec(cursor));
1413 
1414 	/* Recreate the page: note that global data on page (possible
1415 	segment headers, next page-field, etc.) is preserved intact */
1416 
1417 	page_create(block, mtr, dict_table_is_comp(index->table), is_spatial);
1418 
1419 	/* Copy the records from the temporary space to the recreated page;
1420 	do not copy the lock bits yet */
1421 
1422 	page_copy_rec_list_end_no_locks(block, temp_block,
1423 					page_get_infimum_rec(temp_page),
1424 					index, mtr);
1425 
1426 	/* Multiple transactions cannot simultaneously operate on the
1427 	same temp-table in parallel.
1428 	max_trx_id is ignored for temp tables because it not required
1429 	for MVCC. */
1430 	if (dict_index_is_sec_or_ibuf(index)
1431 	    && page_is_leaf(page)
1432 	    && !dict_table_is_temporary(index->table)) {
1433 		/* Copy max trx id to recreated page */
1434 		trx_id_t	max_trx_id = page_get_max_trx_id(temp_page);
1435 		page_set_max_trx_id(block, NULL, max_trx_id, mtr);
1436 		/* In crash recovery, dict_index_is_sec_or_ibuf() always
1437 		holds, even for clustered indexes.  max_trx_id is
1438 		unused in clustered index pages. */
1439 		ut_ad(max_trx_id != 0 || recovery);
1440 	}
1441 
1442 	/* If innodb_log_compressed_pages is ON, page reorganize should log the
1443 	compressed page image.*/
1444 	log_compressed = page_zip && page_zip_log_pages;
1445 
1446 	if (log_compressed) {
1447 		mtr_set_log_mode(mtr, log_mode);
1448 	}
1449 
1450 	if (page_zip
1451 	    && !page_zip_compress(page_zip, page, index, z_level, NULL, mtr)) {
1452 
1453 		/* Restore the old page and exit. */
1454 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1455 		/* Check that the bytes that we skip are identical. */
1456 		ut_a(!memcmp(page, temp_page, PAGE_HEADER));
1457 		ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
1458 			     PAGE_HEADER + PAGE_N_RECS + temp_page,
1459 			     PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
1460 		ut_a(!memcmp(UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + page,
1461 			     UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + temp_page,
1462 			     FIL_PAGE_DATA_END));
1463 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1464 
1465 		memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
1466 		       PAGE_N_RECS - PAGE_N_DIR_SLOTS);
1467 		memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
1468 		       UNIV_PAGE_SIZE - PAGE_DATA - FIL_PAGE_DATA_END);
1469 
1470 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1471 		ut_a(!memcmp(page, temp_page, UNIV_PAGE_SIZE));
1472 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1473 
1474 		goto func_exit;
1475 	}
1476 
1477 #ifndef UNIV_HOTBACKUP
1478 	/* No locks are acquried for intrinsic tables. */
1479 	if (!recovery && !dict_table_is_locking_disabled(index->table)) {
1480 		/* Update the record lock bitmaps */
1481 		lock_move_reorganize_page(block, temp_block);
1482 	}
1483 #endif /* !UNIV_HOTBACKUP */
1484 
1485 	data_size2 = page_get_data_size(page);
1486 	max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);
1487 
1488 	if (data_size1 != data_size2 || max_ins_size1 != max_ins_size2) {
1489 		ib::error()
1490 			<< "Page old data size " << data_size1
1491 			<< " new data size " << data_size2
1492 			<< ", page old max ins size " << max_ins_size1
1493 			<< " new max ins size " << max_ins_size2;
1494 
1495 		ib::error() << BUG_REPORT_MSG;
1496 		ut_ad(0);
1497 	} else {
1498 		success = true;
1499 	}
1500 
1501 	/* Restore the cursor position. */
1502 	if (pos > 0) {
1503 		cursor->rec = page_rec_get_nth(page, pos);
1504 	} else {
1505 		ut_ad(cursor->rec == page_get_infimum_rec(page));
1506 	}
1507 
1508 func_exit:
1509 #ifdef UNIV_ZIP_DEBUG
1510 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1511 #endif /* UNIV_ZIP_DEBUG */
1512 #ifndef UNIV_HOTBACKUP
1513 	buf_block_free(temp_block);
1514 #endif /* !UNIV_HOTBACKUP */
1515 
1516 	/* Restore logging mode */
1517 	mtr_set_log_mode(mtr, log_mode);
1518 
1519 #ifndef UNIV_HOTBACKUP
1520 	if (success) {
1521 		mlog_id_t	type;
1522 		byte*		log_ptr;
1523 
1524 		/* Write the log record */
1525 		if (page_zip) {
1526 			ut_ad(page_is_comp(page));
1527 			type = MLOG_ZIP_PAGE_REORGANIZE;
1528 		} else if (page_is_comp(page)) {
1529 			type = MLOG_COMP_PAGE_REORGANIZE;
1530 		} else {
1531 			type = MLOG_PAGE_REORGANIZE;
1532 		}
1533 
1534 		log_ptr = log_compressed
1535 			? NULL
1536 			: mlog_open_and_write_index(
1537 				mtr, page, index, type,
1538 				page_zip ? 1 : 0);
1539 
1540 		/* For compressed pages write the compression level. */
1541 		if (log_ptr && page_zip) {
1542 			mach_write_to_1(log_ptr, z_level);
1543 			mlog_close(mtr, log_ptr + 1);
1544 		}
1545 
1546 		MONITOR_INC(MONITOR_INDEX_REORG_SUCCESSFUL);
1547 	}
1548 #endif /* !UNIV_HOTBACKUP */
1549 
1550 	return(success);
1551 }
1552 
1553 /*************************************************************//**
1554 Reorganizes an index page.
1555 
1556 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1557 if this is a compressed leaf page in a secondary index. This has to
1558 be done either within the same mini-transaction, or by invoking
1559 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1560 IBUF_BITMAP_FREE is unaffected by reorganization.
1561 
1562 @retval true if the operation was successful
1563 @retval false if it is a compressed page, and recompression failed */
1564 static MY_ATTRIBUTE((nonnull))
1565 bool
btr_page_reorganize_block(bool recovery,ulint z_level,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1566 btr_page_reorganize_block(
1567 /*======================*/
1568 	bool		recovery,/*!< in: true if called in recovery:
1569 				locks should not be updated, i.e.,
1570 				there cannot exist locks on the
1571 				page, and a hash index should not be
1572 				dropped: it cannot exist */
1573 	ulint		z_level,/*!< in: compression level to be used
1574 				if dealing with compressed page */
1575 	buf_block_t*	block,	/*!< in/out: B-tree page */
1576 	dict_index_t*	index,	/*!< in: the index tree of the page */
1577 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1578 {
1579 	page_cur_t	cur;
1580 	page_cur_set_before_first(block, &cur);
1581 
1582 	return(btr_page_reorganize_low(recovery, z_level, &cur, index, mtr));
1583 }
1584 
1585 #ifndef UNIV_HOTBACKUP
1586 /*************************************************************//**
1587 Reorganizes an index page.
1588 
1589 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1590 if this is a compressed leaf page in a secondary index. This has to
1591 be done either within the same mini-transaction, or by invoking
1592 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1593 IBUF_BITMAP_FREE is unaffected by reorganization.
1594 
1595 @retval true if the operation was successful
1596 @retval false if it is a compressed page, and recompression failed */
1597 bool
btr_page_reorganize(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1598 btr_page_reorganize(
1599 /*================*/
1600 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1601 	dict_index_t*	index,	/*!< in: the index tree of the page */
1602 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1603 {
1604 	return(btr_page_reorganize_low(false, page_zip_level,
1605 				       cursor, index, mtr));
1606 }
1607 #endif /* !UNIV_HOTBACKUP */
1608 
1609 /***********************************************************//**
1610 Parses a redo log record of reorganizing a page.
1611 @return end of log record or NULL */
1612 byte*
btr_parse_page_reorganize(byte * ptr,byte * end_ptr,dict_index_t * index,bool compressed,buf_block_t * block,mtr_t * mtr)1613 btr_parse_page_reorganize(
1614 /*======================*/
1615 	byte*		ptr,	/*!< in: buffer */
1616 	byte*		end_ptr,/*!< in: buffer end */
1617 	dict_index_t*	index,	/*!< in: record descriptor */
1618 	bool		compressed,/*!< in: true if compressed page */
1619 	buf_block_t*	block,	/*!< in: page to be reorganized, or NULL */
1620 	mtr_t*		mtr)	/*!< in: mtr or NULL */
1621 {
1622 	ulint	level;
1623 
1624 	ut_ad(ptr != NULL);
1625 	ut_ad(end_ptr != NULL);
1626 	ut_ad(index != NULL);
1627 
1628 	/* If dealing with a compressed page the record has the
1629 	compression level used during original compression written in
1630 	one byte. Otherwise record is empty. */
1631 	if (compressed) {
1632 		if (ptr == end_ptr) {
1633 			return(NULL);
1634 		}
1635 
1636 		level = mach_read_from_1(ptr);
1637 
1638 		ut_a(level <= 9);
1639 		++ptr;
1640 	} else {
1641 		level = page_zip_level;
1642 	}
1643 
1644 	if (block != NULL) {
1645 		btr_page_reorganize_block(true, level, block, index, mtr);
1646 	}
1647 
1648 	return(ptr);
1649 }
1650 
1651 #ifndef UNIV_HOTBACKUP
1652 /*************************************************************//**
1653 Empties an index page.  @see btr_page_create(). */
1654 static
1655 void
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1656 btr_page_empty(
1657 /*===========*/
1658 	buf_block_t*	block,	/*!< in: page to be emptied */
1659 	page_zip_des_t*	page_zip,/*!< out: compressed page, or NULL */
1660 	dict_index_t*	index,	/*!< in: index of the page */
1661 	ulint		level,	/*!< in: the B-tree level of the page */
1662 	mtr_t*		mtr)	/*!< in: mtr */
1663 {
1664 	page_t*	page = buf_block_get_frame(block);
1665 
1666 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1667 	ut_ad(page_zip == buf_block_get_page_zip(block));
1668 #ifdef UNIV_ZIP_DEBUG
1669 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1670 #endif /* UNIV_ZIP_DEBUG */
1671 
1672 	btr_search_drop_page_hash_index(block);
1673 
1674 	/* Recreate the page: note that global data on page (possible
1675 	segment headers, next page-field, etc.) is preserved intact */
1676 
1677 	if (page_zip) {
1678 		page_create_zip(block, index, level, 0, NULL, mtr);
1679 	} else {
1680 		page_create(block, mtr, dict_table_is_comp(index->table),
1681 			    dict_index_is_spatial(index));
1682 		btr_page_set_level(page, NULL, level, mtr);
1683 	}
1684 }
1685 
1686 /*************************************************************//**
1687 Makes tree one level higher by splitting the root, and inserts
1688 the tuple. It is assumed that mtr contains an x-latch on the tree.
1689 NOTE that the operation of this function must always succeed,
1690 we cannot reverse it: therefore enough free disk space must be
1691 guaranteed to be available before this function is called.
1692 @return inserted record */
1693 rec_t*
btr_root_raise_and_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1694 btr_root_raise_and_insert(
1695 /*======================*/
1696 	ulint		flags,	/*!< in: undo logging and locking flags */
1697 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert: must be
1698 				on the root page; when the function returns,
1699 				the cursor is positioned on the predecessor
1700 				of the inserted record */
1701 	ulint**		offsets,/*!< out: offsets on inserted record */
1702 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1703 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
1704 	ulint		n_ext,	/*!< in: number of externally stored columns */
1705 	mtr_t*		mtr)	/*!< in: mtr */
1706 {
1707 	dict_index_t*	index;
1708 	page_t*		root;
1709 	page_t*		new_page;
1710 	ulint		new_page_no;
1711 	rec_t*		rec;
1712 	dtuple_t*	node_ptr;
1713 	ulint		level;
1714 	rec_t*		node_ptr_rec;
1715 	page_cur_t*	page_cursor;
1716 	page_zip_des_t*	root_page_zip;
1717 	page_zip_des_t*	new_page_zip;
1718 	buf_block_t*	root_block;
1719 	buf_block_t*	new_block;
1720 
1721 	root = btr_cur_get_page(cursor);
1722 	root_block = btr_cur_get_block(cursor);
1723 	root_page_zip = buf_block_get_page_zip(root_block);
1724 	ut_ad(!page_is_empty(root));
1725 	index = btr_cur_get_index(cursor);
1726 #ifdef UNIV_ZIP_DEBUG
1727 	ut_a(!root_page_zip || page_zip_validate(root_page_zip, root, index));
1728 #endif /* UNIV_ZIP_DEBUG */
1729 #ifdef UNIV_BTR_DEBUG
1730 	if (!dict_index_is_ibuf(index)) {
1731 		ulint	space = dict_index_get_space(index);
1732 
1733 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1734 					    + root, space));
1735 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1736 					    + root, space));
1737 	}
1738 
1739 	ut_a(dict_index_get_page(index) == page_get_page_no(root));
1740 #endif /* UNIV_BTR_DEBUG */
1741 	ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
1742 					MTR_MEMO_X_LOCK
1743 					| MTR_MEMO_SX_LOCK)
1744 	      || dict_table_is_intrinsic(index->table));
1745 	ut_ad(mtr_is_block_fix(
1746 		mtr, root_block, MTR_MEMO_PAGE_X_FIX, index->table));
1747 
1748 	/* Allocate a new page to the tree. Root splitting is done by first
1749 	moving the root records to the new page, emptying the root, putting
1750 	a node pointer to the new page, and then splitting the new page. */
1751 
1752 	level = btr_page_get_level(root, mtr);
1753 
1754 	new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1755 
1756 	new_page = buf_block_get_frame(new_block);
1757 	new_page_zip = buf_block_get_page_zip(new_block);
1758 	ut_a(!new_page_zip == !root_page_zip);
1759 	ut_a(!new_page_zip
1760 	     || page_zip_get_size(new_page_zip)
1761 	     == page_zip_get_size(root_page_zip));
1762 
1763 	btr_page_create(new_block, new_page_zip, index, level, mtr);
1764 
1765 	/* Set the next node and previous node fields of new page */
1766 	btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
1767 	btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
1768 
1769 	/* Copy the records from root to the new page one by one. */
1770 
1771 	if (0
1772 #ifdef UNIV_ZIP_COPY
1773 	    || new_page_zip
1774 #endif /* UNIV_ZIP_COPY */
1775 	    || !page_copy_rec_list_end(new_block, root_block,
1776 				       page_get_infimum_rec(root),
1777 				       index, mtr)) {
1778 		ut_a(new_page_zip);
1779 
1780 		/* Copy the page byte for byte. */
1781 		page_zip_copy_recs(new_page_zip, new_page,
1782 				   root_page_zip, root, index, mtr);
1783 
1784 		/* Update the lock table and possible hash index. */
1785 
1786 		if (!dict_table_is_locking_disabled(index->table)) {
1787 			lock_move_rec_list_end(new_block, root_block,
1788 					       page_get_infimum_rec(root));
1789 		}
1790 
1791 		/* Move any existing predicate locks */
1792 		if (dict_index_is_spatial(index)) {
1793 			lock_prdt_rec_move(new_block, root_block);
1794 		}
1795 
1796 		btr_search_move_or_delete_hash_entries(new_block, root_block,
1797 						       index);
1798 	}
1799 
1800 	/* If this is a pessimistic insert which is actually done to
1801 	perform a pessimistic update then we have stored the lock
1802 	information of the record to be inserted on the infimum of the
1803 	root page: we cannot discard the lock structs on the root page */
1804 
1805 	if (!dict_table_is_locking_disabled(index->table)) {
1806 		lock_update_root_raise(new_block, root_block);
1807 	}
1808 
1809 	/* Create a memory heap where the node pointer is stored */
1810 	if (!*heap) {
1811 		*heap = mem_heap_create(1000);
1812 	}
1813 
1814 	rec = page_rec_get_next(page_get_infimum_rec(new_page));
1815 	new_page_no = new_block->page.id.page_no();
1816 
1817 	/* Build the node pointer (= node key and page address) for the
1818 	child */
1819 	if (dict_index_is_spatial(index)) {
1820 		rtr_mbr_t		new_mbr;
1821 
1822 		rtr_page_cal_mbr(index, new_block, &new_mbr, *heap);
1823 		node_ptr = rtr_index_build_node_ptr(
1824 			index, &new_mbr, rec, new_page_no, *heap, level);
1825 	} else {
1826 		node_ptr = dict_index_build_node_ptr(
1827 			index, rec, new_page_no, *heap, level);
1828 	}
1829 	/* The node pointer must be marked as the predefined minimum record,
1830 	as there is no lower alphabetical limit to records in the leftmost
1831 	node of a level: */
1832 	dtuple_set_info_bits(node_ptr,
1833 			     dtuple_get_info_bits(node_ptr)
1834 			     | REC_INFO_MIN_REC_FLAG);
1835 
1836 	/* Rebuild the root page to get free space */
1837 	btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
1838 
1839 	/* Set the next node and previous node fields, although
1840 	they should already have been set.  The previous node field
1841 	must be FIL_NULL if root_page_zip != NULL, because the
1842 	REC_INFO_MIN_REC_FLAG (of the first user record) will be
1843 	set if and only if btr_page_get_prev() == FIL_NULL. */
1844 	btr_page_set_next(root, root_page_zip, FIL_NULL, mtr);
1845 	btr_page_set_prev(root, root_page_zip, FIL_NULL, mtr);
1846 
1847 	page_cursor = btr_cur_get_page_cur(cursor);
1848 
1849 	/* Insert node pointer to the root */
1850 
1851 	page_cur_set_before_first(root_block, page_cursor);
1852 
1853 	node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
1854 					     index, offsets, heap, 0, mtr);
1855 
1856 	/* The root page should only contain the node pointer
1857 	to new_page at this point.  Thus, the data should fit. */
1858 	ut_a(node_ptr_rec);
1859 
1860 	/* We play safe and reset the free bits for the new page */
1861 
1862 	if (!dict_index_is_clust(index)
1863 	    && !dict_table_is_temporary(index->table)) {
1864 		ibuf_reset_free_bits(new_block);
1865 	}
1866 
1867 	/* Reposition the cursor to the child node */
1868 	page_cur_search(new_block, index, tuple, page_cursor);
1869 
1870 	/* Split the child and insert tuple */
1871 	if (dict_index_is_spatial(index)) {
1872 		/* Split rtree page and insert tuple */
1873 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
1874 						 tuple, n_ext, mtr));
1875 	} else {
1876 		return(btr_page_split_and_insert(flags, cursor, offsets, heap,
1877 						 tuple, n_ext, mtr));
1878 	}
1879 }
1880 
1881 /*************************************************************//**
1882 Decides if the page should be split at the convergence point of inserts
1883 converging to the left.
1884 @return TRUE if split recommended */
1885 ibool
btr_page_get_split_rec_to_left(btr_cur_t * cursor,rec_t ** split_rec)1886 btr_page_get_split_rec_to_left(
1887 /*===========================*/
1888 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
1889 	rec_t**		split_rec) /*!< out: if split recommended,
1890 				the first record on upper half page,
1891 				or NULL if tuple to be inserted should
1892 				be first */
1893 {
1894 	page_t*	page;
1895 	rec_t*	insert_point;
1896 	rec_t*	infimum;
1897 
1898 	page = btr_cur_get_page(cursor);
1899 	insert_point = btr_cur_get_rec(cursor);
1900 
1901 	if (page_header_get_ptr(page, PAGE_LAST_INSERT)
1902 	    == page_rec_get_next(insert_point)) {
1903 
1904 		infimum = page_get_infimum_rec(page);
1905 
1906 		/* If the convergence is in the middle of a page, include also
1907 		the record immediately before the new insert to the upper
1908 		page. Otherwise, we could repeatedly move from page to page
1909 		lots of records smaller than the convergence point. */
1910 
1911 		if (infimum != insert_point
1912 		    && page_rec_get_next(infimum) != insert_point) {
1913 
1914 			*split_rec = insert_point;
1915 		} else {
1916 			*split_rec = page_rec_get_next(insert_point);
1917 		}
1918 
1919 		return(TRUE);
1920 	}
1921 
1922 	return(FALSE);
1923 }
1924 
1925 /*************************************************************//**
1926 Decides if the page should be split at the convergence point of inserts
1927 converging to the right.
1928 @return TRUE if split recommended */
1929 ibool
btr_page_get_split_rec_to_right(btr_cur_t * cursor,rec_t ** split_rec)1930 btr_page_get_split_rec_to_right(
1931 /*============================*/
1932 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
1933 	rec_t**		split_rec) /*!< out: if split recommended,
1934 				the first record on upper half page,
1935 				or NULL if tuple to be inserted should
1936 				be first */
1937 {
1938 	page_t*	page;
1939 	rec_t*	insert_point;
1940 
1941 	page = btr_cur_get_page(cursor);
1942 	insert_point = btr_cur_get_rec(cursor);
1943 
1944 	/* We use eager heuristics: if the new insert would be right after
1945 	the previous insert on the same page, we assume that there is a
1946 	pattern of sequential inserts here. */
1947 
1948 	if (page_header_get_ptr(page, PAGE_LAST_INSERT) == insert_point) {
1949 
1950 		rec_t*	next_rec;
1951 
1952 		next_rec = page_rec_get_next(insert_point);
1953 
1954 		if (page_rec_is_supremum(next_rec)) {
1955 split_at_new:
1956 			/* Split at the new record to insert */
1957 			*split_rec = NULL;
1958 		} else {
1959 			rec_t*	next_next_rec = page_rec_get_next(next_rec);
1960 			if (page_rec_is_supremum(next_next_rec)) {
1961 
1962 				goto split_at_new;
1963 			}
1964 
1965 			/* If there are >= 2 user records up from the insert
1966 			point, split all but 1 off. We want to keep one because
1967 			then sequential inserts can use the adaptive hash
1968 			index, as they can do the necessary checks of the right
1969 			search position just by looking at the records on this
1970 			page. */
1971 
1972 			*split_rec = next_next_rec;
1973 		}
1974 
1975 		return(TRUE);
1976 	}
1977 
1978 	return(FALSE);
1979 }
1980 
1981 /*************************************************************//**
1982 Calculates a split record such that the tuple will certainly fit on
1983 its half-page when the split is performed. We assume in this function
1984 only that the cursor page has at least one user record.
1985 @return split record, or NULL if tuple will be the first record on
1986 the lower or upper half-page (determined by btr_page_tuple_smaller()) */
1987 static
1988 rec_t*
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext)1989 btr_page_get_split_rec(
1990 /*===================*/
1991 	btr_cur_t*	cursor,	/*!< in: cursor at which insert should be made */
1992 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
1993 	ulint		n_ext)	/*!< in: number of externally stored columns */
1994 {
1995 	page_t*		page;
1996 	page_zip_des_t*	page_zip;
1997 	ulint		insert_size;
1998 	ulint		free_space;
1999 	ulint		total_data;
2000 	ulint		total_n_recs;
2001 	ulint		total_space;
2002 	ulint		incl_data;
2003 	rec_t*		ins_rec;
2004 	rec_t*		rec;
2005 	rec_t*		next_rec;
2006 	ulint		n;
2007 	mem_heap_t*	heap;
2008 	ulint*		offsets;
2009 
2010 	page = btr_cur_get_page(cursor);
2011 
2012 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2013 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2014 
2015 	page_zip = btr_cur_get_page_zip(cursor);
2016 	if (page_zip) {
2017 		/* Estimate the free space of an empty compressed page. */
2018 		ulint	free_space_zip = page_zip_empty_size(
2019 			cursor->index->n_fields,
2020 			page_zip_get_size(page_zip));
2021 
2022 		if (free_space > (ulint) free_space_zip) {
2023 			free_space = (ulint) free_space_zip;
2024 		}
2025 	}
2026 
2027 	/* free_space is now the free space of a created new page */
2028 
2029 	total_data   = page_get_data_size(page) + insert_size;
2030 	total_n_recs = page_get_n_recs(page) + 1;
2031 	ut_ad(total_n_recs >= 2);
2032 	total_space  = total_data + page_dir_calc_reserved_space(total_n_recs);
2033 
2034 	n = 0;
2035 	incl_data = 0;
2036 	ins_rec = btr_cur_get_rec(cursor);
2037 	rec = page_get_infimum_rec(page);
2038 
2039 	heap = NULL;
2040 	offsets = NULL;
2041 
2042 	/* We start to include records to the left half, and when the
2043 	space reserved by them exceeds half of total_space, then if
2044 	the included records fit on the left page, they will be put there
2045 	if something was left over also for the right page,
2046 	otherwise the last included record will be the first on the right
2047 	half page */
2048 
2049 	do {
2050 		/* Decide the next record to include */
2051 		if (rec == ins_rec) {
2052 			rec = NULL;	/* NULL denotes that tuple is
2053 					now included */
2054 		} else if (rec == NULL) {
2055 			rec = page_rec_get_next(ins_rec);
2056 		} else {
2057 			rec = page_rec_get_next(rec);
2058 		}
2059 
2060 		if (rec == NULL) {
2061 			/* Include tuple */
2062 			incl_data += insert_size;
2063 		} else {
2064 			offsets = rec_get_offsets(rec, cursor->index,
2065 						  offsets, ULINT_UNDEFINED,
2066 						  &heap);
2067 			incl_data += rec_offs_size(offsets);
2068 		}
2069 
2070 		n++;
2071 	} while (incl_data + page_dir_calc_reserved_space(n)
2072 		 < total_space / 2);
2073 
2074 	if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
2075 		/* The next record will be the first on
2076 		the right half page if it is not the
2077 		supremum record of page */
2078 
2079 		if (rec == ins_rec) {
2080 			rec = NULL;
2081 
2082 			goto func_exit;
2083 		} else if (rec == NULL) {
2084 			next_rec = page_rec_get_next(ins_rec);
2085 		} else {
2086 			next_rec = page_rec_get_next(rec);
2087 		}
2088 		ut_ad(next_rec);
2089 		if (!page_rec_is_supremum(next_rec)) {
2090 			rec = next_rec;
2091 		}
2092 	}
2093 
2094 func_exit:
2095 	if (heap) {
2096 		mem_heap_free(heap);
2097 	}
2098 	return(rec);
2099 }
2100 
2101 /*************************************************************//**
2102 Returns TRUE if the insert fits on the appropriate half-page with the
2103 chosen split_rec.
2104 @return true if fits */
2105 static MY_ATTRIBUTE((nonnull(1,3,4,6), warn_unused_result))
2106 bool
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,ulint ** offsets,const dtuple_t * tuple,ulint n_ext,mem_heap_t ** heap)2107 btr_page_insert_fits(
2108 /*=================*/
2109 	btr_cur_t*	cursor,	/*!< in: cursor at which insert
2110 				should be made */
2111 	const rec_t*	split_rec,/*!< in: suggestion for first record
2112 				on upper half-page, or NULL if
2113 				tuple to be inserted should be first */
2114 	ulint**		offsets,/*!< in: rec_get_offsets(
2115 				split_rec, cursor->index); out: garbage */
2116 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2117 	ulint		n_ext,	/*!< in: number of externally stored columns */
2118 	mem_heap_t**	heap)	/*!< in: temporary memory heap */
2119 {
2120 	page_t*		page;
2121 	ulint		insert_size;
2122 	ulint		free_space;
2123 	ulint		total_data;
2124 	ulint		total_n_recs;
2125 	const rec_t*	rec;
2126 	const rec_t*	end_rec;
2127 
2128 	page = btr_cur_get_page(cursor);
2129 
2130 	ut_ad(!split_rec
2131 	      || !page_is_comp(page) == !rec_offs_comp(*offsets));
2132 	ut_ad(!split_rec
2133 	      || rec_offs_validate(split_rec, cursor->index, *offsets));
2134 
2135 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2136 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2137 
2138 	/* free_space is now the free space of a created new page */
2139 
2140 	total_data   = page_get_data_size(page) + insert_size;
2141 	total_n_recs = page_get_n_recs(page) + 1;
2142 
2143 	/* We determine which records (from rec to end_rec, not including
2144 	end_rec) will end up on the other half page from tuple when it is
2145 	inserted. */
2146 
2147 	if (split_rec == NULL) {
2148 		rec = page_rec_get_next(page_get_infimum_rec(page));
2149 		end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
2150 
2151 	} else if (cmp_dtuple_rec(tuple, split_rec, *offsets) >= 0) {
2152 
2153 		rec = page_rec_get_next(page_get_infimum_rec(page));
2154 		end_rec = split_rec;
2155 	} else {
2156 		rec = split_rec;
2157 		end_rec = page_get_supremum_rec(page);
2158 	}
2159 
2160 	if (total_data + page_dir_calc_reserved_space(total_n_recs)
2161 	    <= free_space) {
2162 
2163 		/* Ok, there will be enough available space on the
2164 		half page where the tuple is inserted */
2165 
2166 		return(true);
2167 	}
2168 
2169 	while (rec != end_rec) {
2170 		/* In this loop we calculate the amount of reserved
2171 		space after rec is removed from page. */
2172 
2173 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2174 					   ULINT_UNDEFINED, heap);
2175 
2176 		total_data -= rec_offs_size(*offsets);
2177 		total_n_recs--;
2178 
2179 		if (total_data + page_dir_calc_reserved_space(total_n_recs)
2180 		    <= free_space) {
2181 
2182 			/* Ok, there will be enough available space on the
2183 			half page where the tuple is inserted */
2184 
2185 			return(true);
2186 		}
2187 
2188 		rec = page_rec_get_next_const(rec);
2189 	}
2190 
2191 	return(false);
2192 }
2193 
2194 /*******************************************************//**
2195 Inserts a data tuple to a tree on a non-leaf level. It is assumed
2196 that mtr holds an x-latch on the tree. */
2197 void
btr_insert_on_non_leaf_level_func(ulint flags,dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,ulint line,mtr_t * mtr)2198 btr_insert_on_non_leaf_level_func(
2199 /*==============================*/
2200 	ulint		flags,	/*!< in: undo logging and locking flags */
2201 	dict_index_t*	index,	/*!< in: index */
2202 	ulint		level,	/*!< in: level, must be > 0 */
2203 	dtuple_t*	tuple,	/*!< in: the record to be inserted */
2204 	const char*	file,	/*!< in: file name */
2205 	ulint		line,	/*!< in: line where called */
2206 	mtr_t*		mtr)	/*!< in: mtr */
2207 {
2208 	big_rec_t*	dummy_big_rec;
2209 	btr_cur_t	cursor;
2210 	dberr_t		err;
2211 	rec_t*		rec;
2212 	mem_heap_t*	heap = NULL;
2213 	ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2214 	ulint*          offsets         = offsets_;
2215 	rec_offs_init(offsets_);
2216 	rtr_info_t	rtr_info;
2217 
2218 	ut_ad(level > 0);
2219 
2220 	if (!dict_index_is_spatial(index)) {
2221 		if (dict_table_is_intrinsic(index->table)) {
2222 			btr_cur_search_to_nth_level_with_no_latch(
2223 				index, level, tuple, PAGE_CUR_LE, &cursor,
2224 				__FILE__, __LINE__, mtr);
2225 		} else {
2226 			btr_cur_search_to_nth_level(
2227 				index, level, tuple, PAGE_CUR_LE,
2228 				BTR_CONT_MODIFY_TREE,
2229 				&cursor, 0, file, line, mtr);
2230 		}
2231 	} else {
2232 		/* For spatial index, initialize structures to track
2233 		its parents etc. */
2234 		rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2235 
2236 		rtr_info_update_btr(&cursor, &rtr_info);
2237 
2238 		btr_cur_search_to_nth_level(index, level, tuple,
2239 					    PAGE_CUR_RTREE_INSERT,
2240 					    BTR_CONT_MODIFY_TREE,
2241 					    &cursor, 0, file, line, mtr);
2242 	}
2243 
2244 	ut_ad(cursor.flag == BTR_CUR_BINARY);
2245 
2246 	err = btr_cur_optimistic_insert(
2247 		flags
2248 		| BTR_NO_LOCKING_FLAG
2249 		| BTR_KEEP_SYS_FLAG
2250 		| BTR_NO_UNDO_LOG_FLAG,
2251 		&cursor, &offsets, &heap,
2252 		tuple, &rec, &dummy_big_rec, 0, NULL, mtr);
2253 
2254 	if (err == DB_FAIL) {
2255 		err = btr_cur_pessimistic_insert(flags
2256 						 | BTR_NO_LOCKING_FLAG
2257 						 | BTR_KEEP_SYS_FLAG
2258 						 | BTR_NO_UNDO_LOG_FLAG,
2259 						 &cursor, &offsets, &heap,
2260 						 tuple, &rec,
2261 						 &dummy_big_rec, 0, NULL, mtr);
2262 		ut_a(err == DB_SUCCESS);
2263 	}
2264 
2265 	if (heap != NULL) {
2266 		mem_heap_free(heap);
2267 	}
2268 
2269 	if (dict_index_is_spatial(index)) {
2270 		ut_ad(cursor.rtr_info);
2271 
2272 		rtr_clean_rtr_info(&rtr_info, true);
2273 	}
2274 }
2275 
2276 /**************************************************************//**
2277 Attaches the halves of an index page on the appropriate level in an
2278 index tree. */
2279 static MY_ATTRIBUTE((nonnull))
2280 void
btr_attach_half_pages(ulint flags,dict_index_t * index,buf_block_t * block,const rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2281 btr_attach_half_pages(
2282 /*==================*/
2283 	ulint		flags,		/*!< in: undo logging and
2284 					locking flags */
2285 	dict_index_t*	index,		/*!< in: the index tree */
2286 	buf_block_t*	block,		/*!< in/out: page to be split */
2287 	const rec_t*	split_rec,	/*!< in: first record on upper
2288 					half page */
2289 	buf_block_t*	new_block,	/*!< in/out: the new half page */
2290 	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
2291 	mtr_t*		mtr)		/*!< in: mtr */
2292 {
2293 	ulint		prev_page_no;
2294 	ulint		next_page_no;
2295 	ulint		level;
2296 	page_t*		page		= buf_block_get_frame(block);
2297 	page_t*		lower_page;
2298 	page_t*		upper_page;
2299 	ulint		lower_page_no;
2300 	ulint		upper_page_no;
2301 	page_zip_des_t*	lower_page_zip;
2302 	page_zip_des_t*	upper_page_zip;
2303 	dtuple_t*	node_ptr_upper;
2304 	mem_heap_t*	heap;
2305 	buf_block_t*	prev_block = NULL;
2306 	buf_block_t*	next_block = NULL;
2307 
2308 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2309 	ut_ad(mtr_is_block_fix(
2310 		mtr, new_block, MTR_MEMO_PAGE_X_FIX, index->table));
2311 
2312 	/* Create a memory heap where the data tuple is stored */
2313 	heap = mem_heap_create(1024);
2314 
2315 	/* Based on split direction, decide upper and lower pages */
2316 	if (direction == FSP_DOWN) {
2317 
2318 		btr_cur_t	cursor;
2319 		ulint*		offsets;
2320 
2321 		lower_page = buf_block_get_frame(new_block);
2322 		lower_page_no = new_block->page.id.page_no();
2323 		lower_page_zip = buf_block_get_page_zip(new_block);
2324 		upper_page = buf_block_get_frame(block);
2325 		upper_page_no = block->page.id.page_no();
2326 		upper_page_zip = buf_block_get_page_zip(block);
2327 
2328 		/* Look up the index for the node pointer to page */
2329 		offsets = btr_page_get_father_block(NULL, heap, index,
2330 						    block, mtr, &cursor);
2331 
2332 		/* Replace the address of the old child node (= page) with the
2333 		address of the new lower half */
2334 
2335 		btr_node_ptr_set_child_page_no(
2336 			btr_cur_get_rec(&cursor),
2337 			btr_cur_get_page_zip(&cursor),
2338 			offsets, lower_page_no, mtr);
2339 		mem_heap_empty(heap);
2340 	} else {
2341 		lower_page = buf_block_get_frame(block);
2342 		lower_page_no = block->page.id.page_no();
2343 		lower_page_zip = buf_block_get_page_zip(block);
2344 		upper_page = buf_block_get_frame(new_block);
2345 		upper_page_no = new_block->page.id.page_no();
2346 		upper_page_zip = buf_block_get_page_zip(new_block);
2347 	}
2348 
2349 	/* Get the previous and next pages of page */
2350 	prev_page_no = btr_page_get_prev(page, mtr);
2351 	next_page_no = btr_page_get_next(page, mtr);
2352 
2353 	const ulint	space = block->page.id.space();
2354 
2355 	/* for consistency, both blocks should be locked, before change */
2356 	if (prev_page_no != FIL_NULL && direction == FSP_DOWN) {
2357 		prev_block = btr_block_get(
2358 			page_id_t(space, prev_page_no), block->page.size,
2359 			RW_X_LATCH, index, mtr);
2360 	}
2361 	if (next_page_no != FIL_NULL && direction != FSP_DOWN) {
2362 		next_block = btr_block_get(
2363 			page_id_t(space, next_page_no), block->page.size,
2364 			RW_X_LATCH, index, mtr);
2365 	}
2366 
2367 	/* Get the level of the split pages */
2368 	level = btr_page_get_level(buf_block_get_frame(block), mtr);
2369 	ut_ad(level
2370 	      == btr_page_get_level(buf_block_get_frame(new_block), mtr));
2371 
2372 	/* Build the node pointer (= node key and page address) for the upper
2373 	half */
2374 
2375 	node_ptr_upper = dict_index_build_node_ptr(index, split_rec,
2376 						   upper_page_no, heap, level);
2377 
2378 	/* Insert it next to the pointer to the lower half. Note that this
2379 	may generate recursion leading to a split on the higher level. */
2380 
2381 	btr_insert_on_non_leaf_level(flags, index, level + 1,
2382 				     node_ptr_upper, mtr);
2383 
2384 	/* Free the memory heap */
2385 	mem_heap_free(heap);
2386 
2387 	/* Update page links of the level */
2388 
2389 	if (prev_block) {
2390 #ifdef UNIV_BTR_DEBUG
2391 		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
2392 		ut_a(btr_page_get_next(prev_block->frame, mtr)
2393 		     == block->page.id.page_no());
2394 #endif /* UNIV_BTR_DEBUG */
2395 
2396 		btr_page_set_next(buf_block_get_frame(prev_block),
2397 				  buf_block_get_page_zip(prev_block),
2398 				  lower_page_no, mtr);
2399 	}
2400 
2401 	if (next_block) {
2402 #ifdef UNIV_BTR_DEBUG
2403 		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
2404 		ut_a(btr_page_get_prev(next_block->frame, mtr)
2405 		     == page_get_page_no(page));
2406 #endif /* UNIV_BTR_DEBUG */
2407 
2408 		btr_page_set_prev(buf_block_get_frame(next_block),
2409 				  buf_block_get_page_zip(next_block),
2410 				  upper_page_no, mtr);
2411 	}
2412 
2413 	if (direction == FSP_DOWN) {
2414 		/* lower_page is new */
2415 		btr_page_set_prev(lower_page, lower_page_zip,
2416 				  prev_page_no, mtr);
2417 	} else {
2418 		ut_ad(btr_page_get_prev(lower_page, mtr) == prev_page_no);
2419 	}
2420 
2421 	btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
2422 	btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
2423 
2424 	if (direction != FSP_DOWN) {
2425 		/* upper_page is new */
2426 		btr_page_set_next(upper_page, upper_page_zip,
2427 				  next_page_no, mtr);
2428 	} else {
2429 		ut_ad(btr_page_get_next(upper_page, mtr) == next_page_no);
2430 	}
2431 }
2432 
2433 /*************************************************************//**
2434 Determine if a tuple is smaller than any record on the page.
2435 @return TRUE if smaller */
2436 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2437 bool
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,ulint ** offsets,ulint n_uniq,mem_heap_t ** heap)2438 btr_page_tuple_smaller(
2439 /*===================*/
2440 	btr_cur_t*	cursor,	/*!< in: b-tree cursor */
2441 	const dtuple_t*	tuple,	/*!< in: tuple to consider */
2442 	ulint**		offsets,/*!< in/out: temporary storage */
2443 	ulint		n_uniq,	/*!< in: number of unique fields
2444 				in the index page records */
2445 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2446 {
2447 	buf_block_t*	block;
2448 	const rec_t*	first_rec;
2449 	page_cur_t	pcur;
2450 
2451 	/* Read the first user record in the page. */
2452 	block = btr_cur_get_block(cursor);
2453 	page_cur_set_before_first(block, &pcur);
2454 	page_cur_move_to_next(&pcur);
2455 	first_rec = page_cur_get_rec(&pcur);
2456 
2457 	*offsets = rec_get_offsets(
2458 		first_rec, cursor->index, *offsets,
2459 		n_uniq, heap);
2460 
2461 	return(cmp_dtuple_rec(tuple, first_rec, *offsets) < 0);
2462 }
2463 
2464 /** Insert the tuple into the right sibling page, if the cursor is at the end
2465 of a page.
2466 @param[in]	flags	undo logging and locking flags
2467 @param[in,out]	cursor	cursor at which to insert; when the function succeeds,
2468 			the cursor is positioned before the insert point.
2469 @param[out]	offsets	offsets on inserted record
2470 @param[in,out]	heap	memory heap for allocating offsets
2471 @param[in]	tuple	tuple to insert
2472 @param[in]	n_ext	number of externally stored columns
2473 @param[in,out]	mtr	mini-transaction
2474 @return	inserted record (first record on the right sibling page);
2475 	the cursor will be positioned on the page infimum
2476 @retval	NULL if the operation was not performed */
2477 static
2478 rec_t*
btr_insert_into_right_sibling(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2479 btr_insert_into_right_sibling(
2480 	ulint		flags,
2481 	btr_cur_t*	cursor,
2482 	ulint**		offsets,
2483 	mem_heap_t*	heap,
2484 	const dtuple_t*	tuple,
2485 	ulint		n_ext,
2486 	mtr_t*		mtr)
2487 {
2488 	buf_block_t*	block = btr_cur_get_block(cursor);
2489 	page_t*		page = buf_block_get_frame(block);
2490 	ulint		next_page_no = btr_page_get_next(page, mtr);
2491 
2492 	ut_ad(dict_table_is_intrinsic(cursor->index->table)
2493 	      || mtr_memo_contains_flagged(
2494 			mtr, dict_index_get_lock(cursor->index),
2495 			MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2496 	ut_ad(mtr_is_block_fix(
2497 		mtr, block, MTR_MEMO_PAGE_X_FIX, cursor->index->table));
2498 	ut_ad(heap);
2499 
2500 	if (next_page_no == FIL_NULL || !page_rec_is_supremum(
2501 			page_rec_get_next(btr_cur_get_rec(cursor)))) {
2502 
2503 		return(NULL);
2504 	}
2505 
2506 	page_cur_t	next_page_cursor;
2507 	buf_block_t*	next_block;
2508 	page_t*		next_page;
2509 	btr_cur_t	next_father_cursor;
2510 	rec_t*		rec = NULL;
2511 	ulint		max_size;
2512 
2513 	const ulint	space = block->page.id.space();
2514 
2515 	next_block = btr_block_get(
2516 		page_id_t(space, next_page_no), block->page.size,
2517 		RW_X_LATCH, cursor->index, mtr);
2518 	next_page = buf_block_get_frame(next_block);
2519 
2520 	bool	is_leaf = page_is_leaf(next_page);
2521 
2522 	btr_page_get_father(
2523 		cursor->index, next_block, mtr, &next_father_cursor);
2524 
2525 	page_cur_search(
2526 		next_block, cursor->index, tuple, PAGE_CUR_LE,
2527 		&next_page_cursor);
2528 
2529 	max_size = page_get_max_insert_size_after_reorganize(next_page, 1);
2530 
2531 	/* Extends gap lock for the next page */
2532 	if (!dict_table_is_locking_disabled(cursor->index->table)) {
2533 		lock_update_split_left(next_block, block);
2534 	}
2535 
2536 	rec = page_cur_tuple_insert(
2537 		&next_page_cursor, tuple, cursor->index, offsets, &heap,
2538 		n_ext, mtr);
2539 
2540 	if (rec == NULL) {
2541 		if (is_leaf
2542 		    && next_block->page.size.is_compressed()
2543 		    && !dict_index_is_clust(cursor->index)
2544 		    && !dict_table_is_temporary(cursor->index->table)) {
2545 			/* Reset the IBUF_BITMAP_FREE bits, because
2546 			page_cur_tuple_insert() will have attempted page
2547 			reorganize before failing. */
2548 			ibuf_reset_free_bits(next_block);
2549 		}
2550 		return(NULL);
2551 	}
2552 
2553 	ibool	compressed;
2554 	dberr_t	err;
2555 	ulint	level = btr_page_get_level(next_page, mtr);
2556 
2557 	/* adjust cursor position */
2558 	*btr_cur_get_page_cur(cursor) = next_page_cursor;
2559 
2560 	ut_ad(btr_cur_get_rec(cursor) == page_get_infimum_rec(next_page));
2561 	ut_ad(page_rec_get_next(page_get_infimum_rec(next_page)) == rec);
2562 
2563 	/* We have to change the parent node pointer */
2564 
2565 	compressed = btr_cur_pessimistic_delete(
2566 		&err, TRUE, &next_father_cursor,
2567 		BTR_CREATE_FLAG, false, mtr);
2568 
2569 	ut_a(err == DB_SUCCESS);
2570 
2571 	if (!compressed) {
2572 		btr_cur_compress_if_useful(&next_father_cursor, FALSE, mtr);
2573 	}
2574 
2575 	dtuple_t*	node_ptr = dict_index_build_node_ptr(
2576 		cursor->index, rec, next_block->page.id.page_no(),
2577 		heap, level);
2578 
2579 	btr_insert_on_non_leaf_level(
2580 		flags, cursor->index, level + 1, node_ptr, mtr);
2581 
2582 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2583 
2584 	if (is_leaf
2585 	    && !dict_index_is_clust(cursor->index)
2586 	    && !dict_table_is_temporary(cursor->index->table)) {
2587 		/* Update the free bits of the B-tree page in the
2588 		insert buffer bitmap. */
2589 
2590 		if (next_block->page.size.is_compressed()) {
2591 			ibuf_update_free_bits_zip(next_block, mtr);
2592 		} else {
2593 			ibuf_update_free_bits_if_full(
2594 				next_block, max_size,
2595 				rec_offs_size(*offsets) + PAGE_DIR_SLOT_SIZE);
2596 		}
2597 	}
2598 
2599 	return(rec);
2600 }
2601 
2602 /*************************************************************//**
2603 Splits an index page to halves and inserts the tuple. It is assumed
2604 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2605 released within this function! NOTE that the operation of this
2606 function must always succeed, we cannot reverse it: therefore enough
2607 free disk space (2 pages) must be guaranteed to be available before
2608 this function is called.
2609 @return inserted record */
2610 rec_t*
btr_page_split_and_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2611 btr_page_split_and_insert(
2612 /*======================*/
2613 	ulint		flags,	/*!< in: undo logging and locking flags */
2614 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
2615 				function returns, the cursor is positioned
2616 				on the predecessor of the inserted record */
2617 	ulint**		offsets,/*!< out: offsets on inserted record */
2618 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
2619 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2620 	ulint		n_ext,	/*!< in: number of externally stored columns */
2621 	mtr_t*		mtr)	/*!< in: mtr */
2622 {
2623 	buf_block_t*	block;
2624 	page_t*		page;
2625 	page_zip_des_t*	page_zip;
2626 	ulint		page_no;
2627 	byte		direction;
2628 	ulint		hint_page_no;
2629 	buf_block_t*	new_block;
2630 	page_t*		new_page;
2631 	page_zip_des_t*	new_page_zip;
2632 	rec_t*		split_rec;
2633 	buf_block_t*	left_block;
2634 	buf_block_t*	right_block;
2635 	buf_block_t*	insert_block;
2636 	page_cur_t*	page_cursor;
2637 	rec_t*		first_rec;
2638 	byte*		buf = 0; /* remove warning */
2639 	rec_t*		move_limit;
2640 	ibool		insert_will_fit;
2641 	ibool		insert_left;
2642 	ulint		n_iterations = 0;
2643 	rec_t*		rec;
2644 	ulint		n_uniq;
2645 	dict_index_t*	index;
2646 
2647 	index = btr_cur_get_index(cursor);
2648 
2649 	if (dict_index_is_spatial(index)) {
2650 		/* Split rtree page and update parent */
2651 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2652 						 tuple, n_ext, mtr));
2653 	}
2654 
2655 	if (!*heap) {
2656 		*heap = mem_heap_create(1024);
2657 	}
2658 	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
2659 func_start:
2660 	mem_heap_empty(*heap);
2661 	*offsets = NULL;
2662 
2663 	ut_ad(mtr_memo_contains_flagged(mtr,
2664 					dict_index_get_lock(cursor->index),
2665 					MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK)
2666 	      || dict_table_is_intrinsic(cursor->index->table));
2667 	ut_ad(!dict_index_is_online_ddl(cursor->index)
2668 	      || (flags & BTR_CREATE_FLAG)
2669 	      || dict_index_is_clust(cursor->index));
2670 	ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
2671 				  RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)
2672 	      || dict_table_is_intrinsic(cursor->index->table));
2673 
2674 	block = btr_cur_get_block(cursor);
2675 	page = buf_block_get_frame(block);
2676 	page_zip = buf_block_get_page_zip(block);
2677 
2678 	ut_ad(mtr_is_block_fix(
2679 		mtr, block, MTR_MEMO_PAGE_X_FIX, cursor->index->table));
2680 	ut_ad(!page_is_empty(page));
2681 
2682 	/* try to insert to the next page if possible before split */
2683 	rec = btr_insert_into_right_sibling(
2684 		flags, cursor, offsets, *heap, tuple, n_ext, mtr);
2685 
2686 	if (rec != NULL) {
2687 		return(rec);
2688 	}
2689 
2690 	page_no = block->page.id.page_no();
2691 
2692 	/* 1. Decide the split record; split_rec == NULL means that the
2693 	tuple to be inserted should be the first record on the upper
2694 	half-page */
2695 	insert_left = FALSE;
2696 
2697 	if (n_iterations > 0) {
2698 		direction = FSP_UP;
2699 		hint_page_no = page_no + 1;
2700 		split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
2701 
2702 		if (split_rec == NULL) {
2703 			insert_left = btr_page_tuple_smaller(
2704 				cursor, tuple, offsets, n_uniq, heap);
2705 		}
2706 	} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
2707 		direction = FSP_UP;
2708 		hint_page_no = page_no + 1;
2709 
2710 	} else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) {
2711 		direction = FSP_DOWN;
2712 		hint_page_no = page_no - 1;
2713 		ut_ad(split_rec);
2714 	} else {
2715 		direction = FSP_UP;
2716 		hint_page_no = page_no + 1;
2717 
2718 		/* If there is only one record in the index page, we
2719 		can't split the node in the middle by default. We need
2720 		to determine whether the new record will be inserted
2721 		to the left or right. */
2722 
2723 		if (page_get_n_recs(page) > 1) {
2724 			split_rec = page_get_middle_rec(page);
2725 		} else if (btr_page_tuple_smaller(cursor, tuple,
2726 						  offsets, n_uniq, heap)) {
2727 			split_rec = page_rec_get_next(
2728 				page_get_infimum_rec(page));
2729 		} else {
2730 			split_rec = NULL;
2731 		}
2732 	}
2733 
2734 	/* 2. Allocate a new page to the index */
2735 	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2736 				   btr_page_get_level(page, mtr), mtr, mtr);
2737 
2738 	new_page = buf_block_get_frame(new_block);
2739 	new_page_zip = buf_block_get_page_zip(new_block);
2740 	btr_page_create(new_block, new_page_zip, cursor->index,
2741 			btr_page_get_level(page, mtr), mtr);
2742 
2743 	/* 3. Calculate the first record on the upper half-page, and the
2744 	first record (move_limit) on original page which ends up on the
2745 	upper half */
2746 
2747 	if (split_rec) {
2748 		first_rec = move_limit = split_rec;
2749 
2750 		*offsets = rec_get_offsets(split_rec, cursor->index, *offsets,
2751 					   n_uniq, heap);
2752 
2753 		insert_left = cmp_dtuple_rec(tuple, split_rec, *offsets) < 0;
2754 
2755 		if (!insert_left && new_page_zip && n_iterations > 0) {
2756 			/* If a compressed page has already been split,
2757 			avoid further splits by inserting the record
2758 			to an empty page. */
2759 			split_rec = NULL;
2760 			goto insert_empty;
2761 		}
2762 	} else if (insert_left) {
2763 		ut_a(n_iterations > 0);
2764 		first_rec = page_rec_get_next(page_get_infimum_rec(page));
2765 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2766 	} else {
2767 insert_empty:
2768 		ut_ad(!split_rec);
2769 		ut_ad(!insert_left);
2770 		buf = UT_NEW_ARRAY_NOKEY(
2771 			byte,
2772 			rec_get_converted_size(cursor->index, tuple, n_ext));
2773 
2774 		first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
2775 						      tuple, n_ext);
2776 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2777 	}
2778 
2779 	/* 4. Do first the modifications in the tree structure */
2780 
2781 	btr_attach_half_pages(flags, cursor->index, block,
2782 			      first_rec, new_block, direction, mtr);
2783 
2784 	/* If the split is made on the leaf level and the insert will fit
2785 	on the appropriate half-page, we may release the tree x-latch.
2786 	We can then move the records after releasing the tree latch,
2787 	thus reducing the tree latch contention. */
2788 
2789 	if (split_rec) {
2790 		insert_will_fit = !new_page_zip
2791 			&& btr_page_insert_fits(cursor, split_rec,
2792 						offsets, tuple, n_ext, heap);
2793 	} else {
2794 		if (!insert_left) {
2795 			UT_DELETE_ARRAY(buf);
2796 			buf = NULL;
2797 		}
2798 
2799 		insert_will_fit = !new_page_zip
2800 			&& btr_page_insert_fits(cursor, NULL,
2801 						offsets, tuple, n_ext, heap);
2802 	}
2803 
2804 	if (!srv_read_only_mode
2805 	    && !dict_table_is_intrinsic(cursor->index->table)
2806 	    && insert_will_fit
2807 	    && page_is_leaf(page)
2808 	    && !dict_index_is_online_ddl(cursor->index)) {
2809 
2810 		mtr->memo_release(
2811 			dict_index_get_lock(cursor->index),
2812 			MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK);
2813 
2814 		/* NOTE: We cannot release root block latch here, because it
2815 		has segment header and already modified in most of cases.*/
2816 	}
2817 
2818 	/* 5. Move then the records to the new page */
2819 	if (direction == FSP_DOWN) {
2820 		/*		fputs("Split left\n", stderr); */
2821 
2822 		if (0
2823 #ifdef UNIV_ZIP_COPY
2824 		    || page_zip
2825 #endif /* UNIV_ZIP_COPY */
2826 		    || !page_move_rec_list_start(new_block, block, move_limit,
2827 						 cursor->index, mtr)) {
2828 			/* For some reason, compressing new_page failed,
2829 			even though it should contain fewer records than
2830 			the original page.  Copy the page byte for byte
2831 			and then delete the records from both pages
2832 			as appropriate.  Deleting will always succeed. */
2833 			ut_a(new_page_zip);
2834 
2835 			page_zip_copy_recs(new_page_zip, new_page,
2836 					   page_zip, page, cursor->index, mtr);
2837 			page_delete_rec_list_end(move_limit - page + new_page,
2838 						 new_block, cursor->index,
2839 						 ULINT_UNDEFINED,
2840 						 ULINT_UNDEFINED, mtr);
2841 
2842 			/* Update the lock table and possible hash index. */
2843 
2844 			if (!dict_table_is_locking_disabled(
2845 				cursor->index->table)) {
2846 				lock_move_rec_list_start(
2847 					new_block, block, move_limit,
2848 					new_page + PAGE_NEW_INFIMUM);
2849 			}
2850 
2851 			btr_search_move_or_delete_hash_entries(
2852 				new_block, block, cursor->index);
2853 
2854 			/* Delete the records from the source page. */
2855 
2856 			page_delete_rec_list_start(move_limit, block,
2857 						   cursor->index, mtr);
2858 		}
2859 
2860 		left_block = new_block;
2861 		right_block = block;
2862 
2863 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
2864 			lock_update_split_left(right_block, left_block);
2865 		}
2866 	} else {
2867 		/*		fputs("Split right\n", stderr); */
2868 
2869 		if (0
2870 #ifdef UNIV_ZIP_COPY
2871 		    || page_zip
2872 #endif /* UNIV_ZIP_COPY */
2873 		    || !page_move_rec_list_end(new_block, block, move_limit,
2874 					       cursor->index, mtr)) {
2875 			/* For some reason, compressing new_page failed,
2876 			even though it should contain fewer records than
2877 			the original page.  Copy the page byte for byte
2878 			and then delete the records from both pages
2879 			as appropriate.  Deleting will always succeed. */
2880 			ut_a(new_page_zip);
2881 
2882 			page_zip_copy_recs(new_page_zip, new_page,
2883 					   page_zip, page, cursor->index, mtr);
2884 			page_delete_rec_list_start(move_limit - page
2885 						   + new_page, new_block,
2886 						   cursor->index, mtr);
2887 
2888 			/* Update the lock table and possible hash index. */
2889 			if (!dict_table_is_locking_disabled(
2890 				cursor->index->table)) {
2891 				lock_move_rec_list_end(
2892 					new_block, block, move_limit);
2893 			}
2894 
2895 			ut_ad(!dict_index_is_spatial(index));
2896 
2897 			btr_search_move_or_delete_hash_entries(
2898 				new_block, block, cursor->index);
2899 
2900 			/* Delete the records from the source page. */
2901 
2902 			page_delete_rec_list_end(move_limit, block,
2903 						 cursor->index,
2904 						 ULINT_UNDEFINED,
2905 						 ULINT_UNDEFINED, mtr);
2906 		}
2907 
2908 		left_block = block;
2909 		right_block = new_block;
2910 
2911 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
2912 			lock_update_split_right(right_block, left_block);
2913 		}
2914 	}
2915 
2916 #ifdef UNIV_ZIP_DEBUG
2917 	if (page_zip) {
2918 		ut_a(page_zip_validate(page_zip, page, cursor->index));
2919 		ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
2920 	}
2921 #endif /* UNIV_ZIP_DEBUG */
2922 
2923 	/* At this point, split_rec, move_limit and first_rec may point
2924 	to garbage on the old page. */
2925 
2926 	/* 6. The split and the tree modification is now completed. Decide the
2927 	page where the tuple should be inserted */
2928 
2929 	if (insert_left) {
2930 		insert_block = left_block;
2931 	} else {
2932 		insert_block = right_block;
2933 	}
2934 
2935 	/* 7. Reposition the cursor for insert and try insertion */
2936 	page_cursor = btr_cur_get_page_cur(cursor);
2937 
2938 	page_cur_search(insert_block, cursor->index, tuple, page_cursor);
2939 
2940 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
2941 				    offsets, heap, n_ext, mtr);
2942 
2943 #ifdef UNIV_ZIP_DEBUG
2944 	{
2945 		page_t*		insert_page
2946 			= buf_block_get_frame(insert_block);
2947 
2948 		page_zip_des_t*	insert_page_zip
2949 			= buf_block_get_page_zip(insert_block);
2950 
2951 		ut_a(!insert_page_zip
2952 		     || page_zip_validate(insert_page_zip, insert_page,
2953 					  cursor->index));
2954 	}
2955 #endif /* UNIV_ZIP_DEBUG */
2956 
2957 	if (rec != NULL) {
2958 
2959 		goto func_exit;
2960 	}
2961 
2962 	/* 8. If insert did not fit, try page reorganization.
2963 	For compressed pages, page_cur_tuple_insert() will have
2964 	attempted this already. */
2965 
2966 	if (page_cur_get_page_zip(page_cursor)
2967 	    || !btr_page_reorganize(page_cursor, cursor->index, mtr)) {
2968 
2969 		goto insert_failed;
2970 	}
2971 
2972 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
2973 				    offsets, heap, n_ext, mtr);
2974 
2975 	if (rec == NULL) {
2976 		/* The insert did not fit on the page: loop back to the
2977 		start of the function for a new split */
2978 insert_failed:
2979 		/* We play safe and reset the free bits for new_page */
2980 		if (!dict_index_is_clust(cursor->index)
2981 		    && !dict_table_is_temporary(cursor->index->table)) {
2982 			ibuf_reset_free_bits(new_block);
2983 			ibuf_reset_free_bits(block);
2984 		}
2985 
2986 		n_iterations++;
2987 		ut_ad(n_iterations < 2
2988 		      || buf_block_get_page_zip(insert_block));
2989 		ut_ad(!insert_will_fit);
2990 
2991 		goto func_start;
2992 	}
2993 
2994 func_exit:
2995 	/* Insert fit on the page: update the free bits for the
2996 	left and right pages in the same mtr */
2997 
2998 	if (!dict_index_is_clust(cursor->index)
2999 	    && !dict_table_is_temporary(cursor->index->table)
3000 	    && page_is_leaf(page)) {
3001 
3002 		ibuf_update_free_bits_for_two_pages_low(
3003 			left_block, right_block, mtr);
3004 	}
3005 
3006 	MONITOR_INC(MONITOR_INDEX_SPLIT);
3007 
3008 	ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
3009 	ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
3010 
3011 	ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
3012 	return(rec);
3013 }
3014 
3015 /** Removes a page from the level list of pages.
3016 @param[in]	space		space where removed
3017 @param[in]	page_size	page size
3018 @param[in,out]	page		page to remove
3019 @param[in]	index		index tree
3020 @param[in,out]	mtr		mini-transaction */
3021 # define btr_level_list_remove(space,page_size,page,index,mtr)		\
3022 	btr_level_list_remove_func(space,page_size,page,index,mtr)
3023 
3024 /** Removes a page from the level list of pages.
3025 @param[in]	space		space where removed
3026 @param[in]	page_size	page size
3027 @param[in,out]	page		page to remove
3028 @param[in]	index		index tree
3029 @param[in,out]	mtr		mini-transaction */
3030 static
3031 void
btr_level_list_remove_func(ulint space,const page_size_t & page_size,page_t * page,const dict_index_t * index,mtr_t * mtr)3032 btr_level_list_remove_func(
3033 	ulint			space,
3034 	const page_size_t&	page_size,
3035 	page_t*			page,
3036 	const dict_index_t*	index,
3037 	mtr_t*			mtr)
3038 {
3039 	ut_ad(page != NULL);
3040 	ut_ad(mtr != NULL);
3041 	ut_ad(mtr_is_page_fix(mtr, page, MTR_MEMO_PAGE_X_FIX, index->table));
3042 	ut_ad(space == page_get_space_id(page));
3043 	/* Get the previous and next page numbers of page */
3044 
3045 	const ulint	prev_page_no = btr_page_get_prev(page, mtr);
3046 	const ulint	next_page_no = btr_page_get_next(page, mtr);
3047 
3048 	/* Update page links of the level */
3049 
3050 	if (prev_page_no != FIL_NULL) {
3051 		buf_block_t*	prev_block
3052 			= btr_block_get(page_id_t(space, prev_page_no),
3053 					page_size, RW_X_LATCH, index, mtr);
3054 
3055 		page_t*		prev_page
3056 			= buf_block_get_frame(prev_block);
3057 #ifdef UNIV_BTR_DEBUG
3058 		ut_a(page_is_comp(prev_page) == page_is_comp(page));
3059 		ut_a(btr_page_get_next(prev_page, mtr)
3060 		     == page_get_page_no(page));
3061 #endif /* UNIV_BTR_DEBUG */
3062 
3063 		btr_page_set_next(prev_page,
3064 				  buf_block_get_page_zip(prev_block),
3065 				  next_page_no, mtr);
3066 	}
3067 
3068 	if (next_page_no != FIL_NULL) {
3069 		buf_block_t*	next_block
3070 			= btr_block_get(
3071 				page_id_t(space, next_page_no), page_size,
3072 				RW_X_LATCH, index, mtr);
3073 
3074 		page_t*		next_page
3075 			= buf_block_get_frame(next_block);
3076 #ifdef UNIV_BTR_DEBUG
3077 		ut_a(page_is_comp(next_page) == page_is_comp(page));
3078 		ut_a(btr_page_get_prev(next_page, mtr)
3079 		     == page_get_page_no(page));
3080 #endif /* UNIV_BTR_DEBUG */
3081 
3082 		btr_page_set_prev(next_page,
3083 				  buf_block_get_page_zip(next_block),
3084 				  prev_page_no, mtr);
3085 	}
3086 }
3087 
3088 /****************************************************************//**
3089 Writes the redo log record for setting an index record as the predefined
3090 minimum record. */
3091 UNIV_INLINE
3092 void
btr_set_min_rec_mark_log(rec_t * rec,mlog_id_t type,mtr_t * mtr)3093 btr_set_min_rec_mark_log(
3094 /*=====================*/
3095 	rec_t*		rec,	/*!< in: record */
3096 	mlog_id_t	type,	/*!< in: MLOG_COMP_REC_MIN_MARK or
3097 				MLOG_REC_MIN_MARK */
3098 	mtr_t*		mtr)	/*!< in: mtr */
3099 {
3100 	mlog_write_initial_log_record(rec, type, mtr);
3101 
3102 	/* Write rec offset as a 2-byte ulint */
3103 	mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
3104 }
3105 #else /* !UNIV_HOTBACKUP */
3106 # define btr_set_min_rec_mark_log(rec,comp,mtr) ((void) 0)
3107 #endif /* !UNIV_HOTBACKUP */
3108 
3109 /****************************************************************//**
3110 Parses the redo log record for setting an index record as the predefined
3111 minimum record.
3112 @return end of log record or NULL */
3113 byte*
btr_parse_set_min_rec_mark(byte * ptr,byte * end_ptr,ulint comp,page_t * page,mtr_t * mtr)3114 btr_parse_set_min_rec_mark(
3115 /*=======================*/
3116 	byte*	ptr,	/*!< in: buffer */
3117 	byte*	end_ptr,/*!< in: buffer end */
3118 	ulint	comp,	/*!< in: nonzero=compact page format */
3119 	page_t*	page,	/*!< in: page or NULL */
3120 	mtr_t*	mtr)	/*!< in: mtr or NULL */
3121 {
3122 	rec_t*	rec;
3123 
3124 	if (end_ptr < ptr + 2) {
3125 
3126 		return(NULL);
3127 	}
3128 
3129 	if (page) {
3130 		ut_a(!page_is_comp(page) == !comp);
3131 
3132 		rec = page + mach_read_from_2(ptr);
3133 
3134 		btr_set_min_rec_mark(rec, mtr);
3135 	}
3136 
3137 	return(ptr + 2);
3138 }
3139 
3140 /****************************************************************//**
3141 Sets a record as the predefined minimum record. */
3142 void
btr_set_min_rec_mark(rec_t * rec,mtr_t * mtr)3143 btr_set_min_rec_mark(
3144 /*=================*/
3145 	rec_t*	rec,	/*!< in: record */
3146 	mtr_t*	mtr)	/*!< in: mtr */
3147 {
3148 	ulint	info_bits;
3149 
3150 	if (page_rec_is_comp(rec)) {
3151 		info_bits = rec_get_info_bits(rec, TRUE);
3152 
3153 		rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3154 
3155 		btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
3156 	} else {
3157 		info_bits = rec_get_info_bits(rec, FALSE);
3158 
3159 		rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3160 
3161 		btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
3162 	}
3163 }
3164 
3165 #ifndef UNIV_HOTBACKUP
3166 /*************************************************************//**
3167 Deletes on the upper level the node pointer to a page. */
3168 void
btr_node_ptr_delete(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3169 btr_node_ptr_delete(
3170 /*================*/
3171 	dict_index_t*	index,	/*!< in: index tree */
3172 	buf_block_t*	block,	/*!< in: page whose node pointer is deleted */
3173 	mtr_t*		mtr)	/*!< in: mtr */
3174 {
3175 	btr_cur_t	cursor;
3176 	ibool		compressed;
3177 	dberr_t		err;
3178 
3179 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3180 
3181 	/* Delete node pointer on father page */
3182 	btr_page_get_father(index, block, mtr, &cursor);
3183 
3184 	compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor,
3185 						BTR_CREATE_FLAG, false, mtr);
3186 	ut_a(err == DB_SUCCESS);
3187 
3188 	if (!compressed) {
3189 		btr_cur_compress_if_useful(&cursor, FALSE, mtr);
3190 	}
3191 }
3192 
3193 /*************************************************************//**
3194 If page is the only on its level, this function moves its records to the
3195 father page, thus reducing the tree height.
3196 @return father block */
3197 static
3198 buf_block_t*
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3199 btr_lift_page_up(
3200 /*=============*/
3201 	dict_index_t*	index,	/*!< in: index tree */
3202 	buf_block_t*	block,	/*!< in: page which is the only on its level;
3203 				must not be empty: use
3204 				btr_discard_only_page_on_level if the last
3205 				record from the page should be removed */
3206 	mtr_t*		mtr)	/*!< in: mtr */
3207 {
3208 	buf_block_t*	father_block;
3209 	page_t*		father_page;
3210 	ulint		page_level;
3211 	page_zip_des_t*	father_page_zip;
3212 	page_t*		page		= buf_block_get_frame(block);
3213 	ulint		root_page_no;
3214 	buf_block_t*	blocks[BTR_MAX_LEVELS];
3215 	ulint		n_blocks;	/*!< last used index in blocks[] */
3216 	ulint		i;
3217 	bool		lift_father_up;
3218 	buf_block_t*	block_orig	= block;
3219 
3220 	ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
3221 	ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
3222 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3223 
3224 	page_level = btr_page_get_level(page, mtr);
3225 	root_page_no = dict_index_get_page(index);
3226 
3227 	{
3228 		btr_cur_t	cursor;
3229 		ulint*		offsets	= NULL;
3230 		mem_heap_t*	heap	= mem_heap_create(
3231 			sizeof(*offsets)
3232 			* (REC_OFFS_HEADER_SIZE + 1 + 1 + index->n_fields));
3233 		buf_block_t*	b;
3234 
3235 		if (dict_index_is_spatial(index)) {
3236 			offsets = rtr_page_get_father_block(
3237 				NULL, heap, index, block, mtr,
3238 				NULL, &cursor);
3239 		} else {
3240 			offsets = btr_page_get_father_block(offsets, heap,
3241 							    index, block,
3242 							    mtr, &cursor);
3243 		}
3244 		father_block = btr_cur_get_block(&cursor);
3245 		father_page_zip = buf_block_get_page_zip(father_block);
3246 		father_page = buf_block_get_frame(father_block);
3247 
3248 		n_blocks = 0;
3249 
3250 		/* Store all ancestor pages so we can reset their
3251 		levels later on.  We have to do all the searches on
3252 		the tree now because later on, after we've replaced
3253 		the first level, the tree is in an inconsistent state
3254 		and can not be searched. */
3255 		for (b = father_block;
3256 		     b->page.id.page_no() != root_page_no; ) {
3257 			ut_a(n_blocks < BTR_MAX_LEVELS);
3258 
3259 			if (dict_index_is_spatial(index)) {
3260 				offsets = rtr_page_get_father_block(
3261 					NULL, heap, index, b, mtr,
3262 					NULL, &cursor);
3263 			} else {
3264 				offsets = btr_page_get_father_block(offsets,
3265 								    heap,
3266 								    index, b,
3267 								    mtr,
3268 								    &cursor);
3269 			}
3270 
3271 			blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
3272 		}
3273 
3274 		lift_father_up = (n_blocks && page_level == 0);
3275 		if (lift_father_up) {
3276 			/* The father page also should be the only on its level (not
3277 			root). We should lift up the father page at first.
3278 			Because the leaf page should be lifted up only for root page.
3279 			The freeing page is based on page_level (==0 or !=0)
3280 			to choose segment. If the page_level is changed ==0 from !=0,
3281 			later freeing of the page doesn't find the page allocation
3282 			to be freed.*/
3283 
3284 			block = father_block;
3285 			page = buf_block_get_frame(block);
3286 			page_level = btr_page_get_level(page, mtr);
3287 
3288 			ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
3289 			ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
3290 			ut_ad(mtr_is_block_fix(
3291 				mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3292 
3293 			father_block = blocks[0];
3294 			father_page_zip = buf_block_get_page_zip(father_block);
3295 			father_page = buf_block_get_frame(father_block);
3296 		}
3297 
3298 		mem_heap_free(heap);
3299 	}
3300 
3301 	btr_search_drop_page_hash_index(block);
3302 
3303 	/* Make the father empty */
3304 	btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
3305 	page_level++;
3306 
3307 	/* Copy the records to the father page one by one. */
3308 	if (0
3309 #ifdef UNIV_ZIP_COPY
3310 	    || father_page_zip
3311 #endif /* UNIV_ZIP_COPY */
3312 	    || !page_copy_rec_list_end(father_block, block,
3313 				       page_get_infimum_rec(page),
3314 				       index, mtr)) {
3315 		const page_zip_des_t*	page_zip
3316 			= buf_block_get_page_zip(block);
3317 		ut_a(father_page_zip);
3318 		ut_a(page_zip);
3319 
3320 		/* Copy the page byte for byte. */
3321 		page_zip_copy_recs(father_page_zip, father_page,
3322 				   page_zip, page, index, mtr);
3323 
3324 		/* Update the lock table and possible hash index. */
3325 
3326 		if (!dict_table_is_locking_disabled(index->table)) {
3327 			lock_move_rec_list_end(father_block, block,
3328 					       page_get_infimum_rec(page));
3329 		}
3330 
3331 		/* Also update the predicate locks */
3332 		if (dict_index_is_spatial(index)) {
3333 			lock_prdt_rec_move(father_block, block);
3334 		}
3335 
3336 		btr_search_move_or_delete_hash_entries(father_block, block,
3337 						       index);
3338 	}
3339 
3340 	if (!dict_table_is_locking_disabled(index->table)) {
3341 		/* Free predicate page locks on the block */
3342 		if (dict_index_is_spatial(index)) {
3343 			lock_mutex_enter();
3344 			lock_prdt_page_free_from_discard(
3345 				block, lock_sys->prdt_page_hash);
3346 			lock_mutex_exit();
3347 		}
3348 		lock_update_copy_and_discard(father_block, block);
3349 	}
3350 
3351 	/* Go upward to root page, decrementing levels by one. */
3352 	for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
3353 		page_t*		page	= buf_block_get_frame(blocks[i]);
3354 		page_zip_des_t*	page_zip= buf_block_get_page_zip(blocks[i]);
3355 
3356 		ut_ad(btr_page_get_level(page, mtr) == page_level + 1);
3357 
3358 		btr_page_set_level(page, page_zip, page_level, mtr);
3359 #ifdef UNIV_ZIP_DEBUG
3360 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3361 #endif /* UNIV_ZIP_DEBUG */
3362 	}
3363 
3364 	if (dict_index_is_spatial(index)) {
3365 		rtr_check_discard_page(index, NULL, block);
3366 	}
3367 
3368 	/* Free the file page */
3369 	btr_page_free(index, block, mtr);
3370 
3371 	/* We play it safe and reset the free bits for the father */
3372 	if (!dict_index_is_clust(index)
3373 	    && !dict_table_is_temporary(index->table)) {
3374 		ibuf_reset_free_bits(father_block);
3375 	}
3376 	ut_ad(page_validate(father_page, index));
3377 	ut_ad(btr_check_node_ptr(index, father_block, mtr));
3378 
3379 	return(lift_father_up ? block_orig : father_block);
3380 }
3381 
3382 /*************************************************************//**
3383 Tries to merge the page first to the left immediate brother if such a
3384 brother exists, and the node pointers to the current page and to the brother
3385 reside on the same page. If the left brother does not satisfy these
3386 conditions, looks at the right brother. If the page is the only one on that
3387 level lifts the records of the page to the father page, thus reducing the
3388 tree height. It is assumed that mtr holds an x-latch on the tree and on the
3389 page. If cursor is on the leaf level, mtr must also hold x-latches to the
3390 brothers, if they exist.
3391 @return TRUE on success */
3392 ibool
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3393 btr_compress(
3394 /*=========*/
3395 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to merge
3396 				or lift; the page must not be empty:
3397 				when deleting records, use btr_discard_page()
3398 				if the page would become empty */
3399 	ibool		adjust,	/*!< in: TRUE if should adjust the
3400 				cursor position even if compression occurs */
3401 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3402 {
3403 	dict_index_t*	index;
3404 	ulint		space;
3405 	ulint		left_page_no;
3406 	ulint		right_page_no;
3407 	buf_block_t*	merge_block;
3408 	page_t*		merge_page = NULL;
3409 	page_zip_des_t*	merge_page_zip;
3410 	ibool		is_left;
3411 	buf_block_t*	block;
3412 	page_t*		page;
3413 	btr_cur_t	father_cursor;
3414 	mem_heap_t*	heap;
3415 	ulint*		offsets;
3416 	ulint		nth_rec = 0; /* remove bogus warning */
3417 	bool		mbr_changed = false;
3418 #ifdef UNIV_DEBUG
3419 	bool		leftmost_child;
3420 #endif
3421 	DBUG_ENTER("btr_compress");
3422 
3423 	block = btr_cur_get_block(cursor);
3424 	page = btr_cur_get_page(cursor);
3425 	index = btr_cur_get_index(cursor);
3426 
3427 	btr_assert_not_corrupted(block, index);
3428 
3429 #ifdef UNIV_DEBUG
3430 	if (dict_index_is_spatial(index)) {
3431 		ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3432 						MTR_MEMO_X_LOCK));
3433 	} else {
3434 		ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3435 						MTR_MEMO_X_LOCK
3436 						| MTR_MEMO_SX_LOCK)
3437 		      || dict_table_is_intrinsic(index->table));
3438 	}
3439 #endif /* UNIV_DEBUG */
3440 
3441 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3442 	space = dict_index_get_space(index);
3443 
3444 	const page_size_t	page_size(dict_table_page_size(index->table));
3445 
3446 	MONITOR_INC(MONITOR_INDEX_MERGE_ATTEMPTS);
3447 
3448 	left_page_no = btr_page_get_prev(page, mtr);
3449 	right_page_no = btr_page_get_next(page, mtr);
3450 
3451 #ifdef UNIV_DEBUG
3452 	if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3453 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3454 			page_rec_get_next(page_get_infimum_rec(page)),
3455 			page_is_comp(page)));
3456 	}
3457 #endif /* UNIV_DEBUG */
3458 
3459 	heap = mem_heap_create(100);
3460 
3461 	if (dict_index_is_spatial(index)) {
3462 		offsets = rtr_page_get_father_block(
3463 			NULL, heap, index, block, mtr, cursor, &father_cursor);
3464 		ut_ad(cursor->page_cur.block->page.id.page_no()
3465 		      == block->page.id.page_no());
3466 		rec_t*  my_rec = father_cursor.page_cur.rec;
3467 
3468 		ulint page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3469 
3470 		if (page_no != block->page.id.page_no()) {
3471 			ib::info() << "father positioned on page "
3472 				<< page_no << "instead of "
3473 				<< block->page.id.page_no();
3474 			offsets = btr_page_get_father_block(
3475 				NULL, heap, index, block, mtr, &father_cursor);
3476 		}
3477 	} else {
3478 		offsets = btr_page_get_father_block(
3479 			NULL, heap, index, block, mtr, &father_cursor);
3480 	}
3481 
3482 	if (adjust) {
3483 		nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3484 		ut_ad(nth_rec > 0);
3485 	}
3486 
3487 	if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3488 		/* The page is the only one on the level, lift the records
3489 		to the father */
3490 
3491 		merge_block = btr_lift_page_up(index, block, mtr);
3492 		goto func_exit;
3493 	}
3494 
3495 	ut_d(leftmost_child =
3496 		left_page_no != FIL_NULL
3497 		&& (page_rec_get_next(
3498 			page_get_infimum_rec(
3499 				btr_cur_get_page(&father_cursor)))
3500 		    == btr_cur_get_rec(&father_cursor)));
3501 
3502 	/* Decide the page to which we try to merge and which will inherit
3503 	the locks */
3504 
3505 	is_left = btr_can_merge_with_page(cursor, left_page_no,
3506 					  &merge_block, mtr);
3507 
3508 	DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3509 retry:
3510 	if (!is_left
3511 	   && !btr_can_merge_with_page(cursor, right_page_no, &merge_block,
3512 				       mtr)) {
3513 		if (!merge_block) {
3514 			merge_page = NULL;
3515 		}
3516 		goto err_exit;
3517 	}
3518 
3519 	merge_page = buf_block_get_frame(merge_block);
3520 
3521 #ifdef UNIV_BTR_DEBUG
3522 	if (is_left) {
3523 		ut_a(btr_page_get_next(merge_page, mtr)
3524 		     == block->page.id.page_no());
3525 	} else {
3526 		ut_a(btr_page_get_prev(merge_page, mtr)
3527 		     == block->page.id.page_no());
3528 	}
3529 #endif /* UNIV_BTR_DEBUG */
3530 
3531 #ifdef UNIV_GIS_DEBUG
3532 	if (dict_index_is_spatial(index)) {
3533 		if (is_left) {
3534 			fprintf(stderr, "GIS_DIAG: merge left  %ld to %ld \n",
3535 				(long) block->page.id.page_no(), left_page_no);
3536 		} else {
3537 			fprintf(stderr, "GIS_DIAG: merge right %ld to %ld\n",
3538 				(long) block->page.id.page_no(), right_page_no);
3539 		}
3540 	}
3541 #endif /* UNIV_GIS_DEBUG */
3542 
3543 	ut_ad(page_validate(merge_page, index));
3544 
3545 	merge_page_zip = buf_block_get_page_zip(merge_block);
3546 #ifdef UNIV_ZIP_DEBUG
3547 	if (merge_page_zip) {
3548 		const page_zip_des_t*	page_zip
3549 			= buf_block_get_page_zip(block);
3550 		ut_a(page_zip);
3551 		ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3552 		ut_a(page_zip_validate(page_zip, page, index));
3553 	}
3554 #endif /* UNIV_ZIP_DEBUG */
3555 
3556 	/* Move records to the merge page */
3557 	if (is_left) {
3558 		btr_cur_t	cursor2;
3559 		rtr_mbr_t	new_mbr;
3560 		ulint*		offsets2 = NULL;
3561 
3562 		/* For rtree, we need to update father's mbr. */
3563 		if (dict_index_is_spatial(index)) {
3564 			/* We only support merge pages with the same parent
3565 			page */
3566 			if (!rtr_check_same_block(
3567 				index, &cursor2,
3568 				btr_cur_get_block(&father_cursor),
3569 				merge_block, heap)) {
3570 				is_left = false;
3571 				goto retry;
3572 			}
3573 
3574 			/* Set rtr_info for cursor2, since it is
3575 			necessary in recursive page merge. */
3576 			cursor2.rtr_info = cursor->rtr_info;
3577 			cursor2.tree_height = cursor->tree_height;
3578 
3579 			offsets2 = rec_get_offsets(
3580 				btr_cur_get_rec(&cursor2), index,
3581 				NULL, ULINT_UNDEFINED, &heap);
3582 
3583 			/* Check if parent entry needs to be updated */
3584 			mbr_changed = rtr_merge_mbr_changed(
3585 				&cursor2, &father_cursor,
3586 				offsets2, offsets, &new_mbr,
3587 				merge_block, block, index);
3588 		}
3589 
3590 		rec_t*	orig_pred = page_copy_rec_list_start(
3591 			merge_block, block, page_get_supremum_rec(page),
3592 			index, mtr);
3593 
3594 		if (!orig_pred) {
3595 			goto err_exit;
3596 		}
3597 
3598 		btr_search_drop_page_hash_index(block);
3599 
3600 		/* Remove the page from the level list */
3601 		btr_level_list_remove(space, page_size, page, index, mtr);
3602 
3603 		if (dict_index_is_spatial(index)) {
3604 			rec_t*  my_rec = father_cursor.page_cur.rec;
3605 
3606 			ulint page_no = btr_node_ptr_get_child_page_no(
3607 						my_rec, offsets);
3608 
3609 			if (page_no != block->page.id.page_no()) {
3610 
3611 				ib::fatal() << "father positioned on "
3612 					<< page_no << " instead of "
3613 					<< block->page.id.page_no();
3614 
3615 				ut_ad(0);
3616 			}
3617 
3618 			if (mbr_changed) {
3619 #ifdef UNIV_DEBUG
3620 				bool	success = rtr_update_mbr_field(
3621 					&cursor2, offsets2, &father_cursor,
3622 					merge_page, &new_mbr, NULL, mtr);
3623 
3624 				ut_ad(success);
3625 #else
3626 				rtr_update_mbr_field(
3627 					&cursor2, offsets2, &father_cursor,
3628 					merge_page, &new_mbr, NULL, mtr);
3629 #endif
3630 			} else {
3631 				rtr_node_ptr_delete(
3632 					index, &father_cursor, block, mtr);
3633 			}
3634 
3635 			/* No GAP lock needs to be worrying about */
3636 			lock_mutex_enter();
3637 			lock_prdt_page_free_from_discard(
3638 				block, lock_sys->prdt_page_hash);
3639 			lock_rec_free_all_from_discard_page(block);
3640 			lock_mutex_exit();
3641 		} else {
3642 			btr_node_ptr_delete(index, block, mtr);
3643 			if (!dict_table_is_locking_disabled(index->table)) {
3644 				lock_update_merge_left(
3645 					merge_block, orig_pred, block);
3646 			}
3647 		}
3648 
3649 		if (adjust) {
3650 			nth_rec += page_rec_get_n_recs_before(orig_pred);
3651 		}
3652 	} else {
3653 		rec_t*		orig_succ;
3654 		ibool		compressed;
3655 		dberr_t		err;
3656 		btr_cur_t	cursor2;
3657 					/* father cursor pointing to node ptr
3658 					of the right sibling */
3659 #ifdef UNIV_BTR_DEBUG
3660 		byte		fil_page_prev[4];
3661 #endif /* UNIV_BTR_DEBUG */
3662 
3663 		if (dict_index_is_spatial(index)) {
3664 			cursor2.rtr_info = NULL;
3665 
3666 			/* For spatial index, we disallow merge of blocks
3667 			with different parents, since the merge would need
3668 			to update entry (for MBR and Primary key) in the
3669 			parent of block being merged */
3670 			if (!rtr_check_same_block(
3671 				index, &cursor2,
3672 				btr_cur_get_block(&father_cursor),
3673 				merge_block, heap)) {
3674 				goto err_exit;
3675 			}
3676 
3677 			/* Set rtr_info for cursor2, since it is
3678 			necessary in recursive page merge. */
3679 			cursor2.rtr_info = cursor->rtr_info;
3680 			cursor2.tree_height = cursor->tree_height;
3681 		} else {
3682 			btr_page_get_father(index, merge_block, mtr, &cursor2);
3683 		}
3684 
3685 		if (merge_page_zip && left_page_no == FIL_NULL) {
3686 
3687 			/* The function page_zip_compress(), which will be
3688 			invoked by page_copy_rec_list_end() below,
3689 			requires that FIL_PAGE_PREV be FIL_NULL.
3690 			Clear the field, but prepare to restore it. */
3691 #ifdef UNIV_BTR_DEBUG
3692 			memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3693 #endif /* UNIV_BTR_DEBUG */
3694 #if FIL_NULL != 0xffffffff
3695 # error "FIL_NULL != 0xffffffff"
3696 #endif
3697 			memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
3698 		}
3699 
3700 		orig_succ = page_copy_rec_list_end(merge_block, block,
3701 						   page_get_infimum_rec(page),
3702 						   cursor->index, mtr);
3703 
3704 		if (!orig_succ) {
3705 			ut_a(merge_page_zip);
3706 #ifdef UNIV_BTR_DEBUG
3707 			if (left_page_no == FIL_NULL) {
3708 				/* FIL_PAGE_PREV was restored from
3709 				merge_page_zip. */
3710 				ut_a(!memcmp(fil_page_prev,
3711 					     merge_page + FIL_PAGE_PREV, 4));
3712 			}
3713 #endif /* UNIV_BTR_DEBUG */
3714 			goto err_exit;
3715 		}
3716 
3717 		btr_search_drop_page_hash_index(block);
3718 
3719 #ifdef UNIV_BTR_DEBUG
3720 		if (merge_page_zip && left_page_no == FIL_NULL) {
3721 
3722 			/* Restore FIL_PAGE_PREV in order to avoid an assertion
3723 			failure in btr_level_list_remove(), which will set
3724 			the field again to FIL_NULL.  Even though this makes
3725 			merge_page and merge_page_zip inconsistent for a
3726 			split second, it is harmless, because the pages
3727 			are X-latched. */
3728 			memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3729 		}
3730 #endif /* UNIV_BTR_DEBUG */
3731 
3732 		/* Remove the page from the level list */
3733 		btr_level_list_remove(space, page_size, page, index, mtr);
3734 
3735 		ut_ad(btr_node_ptr_get_child_page_no(
3736 			btr_cur_get_rec(&father_cursor), offsets)
3737 			== block->page.id.page_no());
3738 
3739 		/* Replace the address of the old child node (= page) with the
3740 		address of the merge page to the right */
3741 		btr_node_ptr_set_child_page_no(
3742 			btr_cur_get_rec(&father_cursor),
3743 			btr_cur_get_page_zip(&father_cursor),
3744 			offsets, right_page_no, mtr);
3745 
3746 #ifdef UNIV_DEBUG
3747 		if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3748 			ut_ad(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3749 				page_rec_get_next(page_get_infimum_rec(
3750 					buf_block_get_frame(merge_block))),
3751 				page_is_comp(page)));
3752 		}
3753 #endif /* UNIV_DEBUG */
3754 
3755 		/* For rtree, we need to update father's mbr. */
3756 		if (dict_index_is_spatial(index)) {
3757 			ulint*	offsets2;
3758 			ulint	rec_info;
3759 
3760 			offsets2 = rec_get_offsets(
3761 				btr_cur_get_rec(&cursor2),
3762 				index, NULL, ULINT_UNDEFINED, &heap);
3763 
3764 			ut_ad(btr_node_ptr_get_child_page_no(
3765 				btr_cur_get_rec(&cursor2), offsets2)
3766 				== right_page_no);
3767 
3768 			rec_info = rec_get_info_bits(
3769 				btr_cur_get_rec(&father_cursor),
3770 				rec_offs_comp(offsets));
3771 			if (rec_info & REC_INFO_MIN_REC_FLAG) {
3772 				/* When the father node ptr is minimal rec,
3773 				we will keep it and delete the node ptr of
3774 				merge page. */
3775 				rtr_merge_and_update_mbr(&father_cursor,
3776 							 &cursor2,
3777 							 offsets, offsets2,
3778 							 merge_page,
3779 							 merge_block,
3780 							 block, index, mtr);
3781 			} else {
3782 				/* Otherwise, we will keep the node ptr of
3783 				merge page and delete the father node ptr.
3784 				This is for keeping the rec order in upper
3785 				level. */
3786 				rtr_merge_and_update_mbr(&cursor2,
3787 							 &father_cursor,
3788 							 offsets2, offsets,
3789 							 merge_page,
3790 							 merge_block,
3791 							 block, index, mtr);
3792 			}
3793 			lock_mutex_enter();
3794 			lock_prdt_page_free_from_discard(
3795 				block, lock_sys->prdt_page_hash);
3796 			lock_rec_free_all_from_discard_page(block);
3797 			lock_mutex_exit();
3798 		} else {
3799 
3800 			compressed = btr_cur_pessimistic_delete(&err, TRUE,
3801 								&cursor2,
3802 								BTR_CREATE_FLAG,
3803 								false, mtr);
3804 			ut_a(err == DB_SUCCESS);
3805 
3806 			if (!compressed) {
3807 				btr_cur_compress_if_useful(&cursor2,
3808 							   FALSE,
3809 							   mtr);
3810 			}
3811 
3812 			if (!dict_table_is_locking_disabled(index->table)) {
3813 				lock_update_merge_right(
3814 					merge_block, orig_succ, block);
3815 			}
3816 		}
3817 	}
3818 
3819 	if (!dict_index_is_clust(index)
3820 	    && !dict_table_is_temporary(index->table)
3821 	    && page_is_leaf(merge_page)) {
3822 		/* Update the free bits of the B-tree page in the
3823 		insert buffer bitmap.  This has to be done in a
3824 		separate mini-transaction that is committed before the
3825 		main mini-transaction.  We cannot update the insert
3826 		buffer bitmap in this mini-transaction, because
3827 		btr_compress() can be invoked recursively without
3828 		committing the mini-transaction in between.  Since
3829 		insert buffer bitmap pages have a lower rank than
3830 		B-tree pages, we must not access other pages in the
3831 		same mini-transaction after accessing an insert buffer
3832 		bitmap page. */
3833 
3834 		/* The free bits in the insert buffer bitmap must
3835 		never exceed the free space on a page.  It is safe to
3836 		decrement or reset the bits in the bitmap in a
3837 		mini-transaction that is committed before the
3838 		mini-transaction that affects the free space. */
3839 
3840 		/* It is unsafe to increment the bits in a separately
3841 		committed mini-transaction, because in crash recovery,
3842 		the free bits could momentarily be set too high. */
3843 
3844 		if (page_size.is_compressed()) {
3845 			/* Because the free bits may be incremented
3846 			and we cannot update the insert buffer bitmap
3847 			in the same mini-transaction, the only safe
3848 			thing we can do here is the pessimistic
3849 			approach: reset the free bits. */
3850 			ibuf_reset_free_bits(merge_block);
3851 		} else {
3852 			/* On uncompressed pages, the free bits will
3853 			never increase here.  Thus, it is safe to
3854 			write the bits accurately in a separate
3855 			mini-transaction. */
3856 			ibuf_update_free_bits_if_full(merge_block,
3857 						      UNIV_PAGE_SIZE,
3858 						      ULINT_UNDEFINED);
3859 		}
3860 	}
3861 
3862 	ut_ad(page_validate(merge_page, index));
3863 #ifdef UNIV_ZIP_DEBUG
3864 	ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page,
3865 						  index));
3866 #endif /* UNIV_ZIP_DEBUG */
3867 
3868 	if (dict_index_is_spatial(index)) {
3869 #ifdef UNIV_GIS_DEBUG
3870 		fprintf(stderr, "GIS_DIAG: compressed away  %ld\n",
3871 			(long) block->page.id.page_no());
3872 		fprintf(stderr, "GIS_DIAG: merged to %ld\n",
3873 			(long) merge_block->page.id.page_no());
3874 #endif
3875 
3876 		rtr_check_discard_page(index, NULL, block);
3877 	}
3878 
3879 	/* Free the file page */
3880 	btr_page_free(index, block, mtr);
3881 
3882 	/* btr_check_node_ptr() needs parent block latched.
3883 	If the merge_block's parent block is not same,
3884 	we cannot use btr_check_node_ptr() */
3885 	ut_ad(leftmost_child
3886 	      || btr_check_node_ptr(index, merge_block, mtr));
3887 func_exit:
3888 	mem_heap_free(heap);
3889 
3890 	if (adjust) {
3891 		ut_ad(nth_rec > 0);
3892 		btr_cur_position(
3893 			index,
3894 			page_rec_get_nth(merge_block->frame, nth_rec),
3895 			merge_block, cursor);
3896 	}
3897 
3898 	MONITOR_INC(MONITOR_INDEX_MERGE_SUCCESSFUL);
3899 
3900 	DBUG_RETURN(TRUE);
3901 
3902 err_exit:
3903 	/* We play it safe and reset the free bits. */
3904 	if (page_size.is_compressed()
3905 	    && merge_page
3906 	    && page_is_leaf(merge_page)
3907 	    && !dict_index_is_clust(index)) {
3908 
3909 		ibuf_reset_free_bits(merge_block);
3910 	}
3911 
3912 	mem_heap_free(heap);
3913 	DBUG_RETURN(FALSE);
3914 }
3915 
3916 /*************************************************************//**
3917 Discards a page that is the only page on its level.  This will empty
3918 the whole B-tree, leaving just an empty root page.  This function
3919 should never be reached, because btr_compress(), which is invoked in
3920 delete operations, calls btr_lift_page_up() to flatten the B-tree. */
3921 static
3922 void
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3923 btr_discard_only_page_on_level(
3924 /*===========================*/
3925 	dict_index_t*	index,	/*!< in: index tree */
3926 	buf_block_t*	block,	/*!< in: page which is the only on its level */
3927 	mtr_t*		mtr)	/*!< in: mtr */
3928 {
3929 	ulint		page_level = 0;
3930 	trx_id_t	max_trx_id;
3931 
3932 	/* Save the PAGE_MAX_TRX_ID from the leaf page. */
3933 	max_trx_id = page_get_max_trx_id(buf_block_get_frame(block));
3934 
3935 	while (block->page.id.page_no() != dict_index_get_page(index)) {
3936 		btr_cur_t	cursor;
3937 		buf_block_t*	father;
3938 		const page_t*	page	= buf_block_get_frame(block);
3939 
3940 		ut_a(page_get_n_recs(page) == 1);
3941 		ut_a(page_level == btr_page_get_level(page, mtr));
3942 		ut_a(btr_page_get_prev(page, mtr) == FIL_NULL);
3943 		ut_a(btr_page_get_next(page, mtr) == FIL_NULL);
3944 
3945 		ut_ad(mtr_is_block_fix(
3946 			mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3947 		btr_search_drop_page_hash_index(block);
3948 
3949 		if (dict_index_is_spatial(index)) {
3950 			/* Check any concurrent search having this page */
3951 			rtr_check_discard_page(index, NULL, block);
3952 			rtr_page_get_father(index, block, mtr, NULL, &cursor);
3953 		} else {
3954 			btr_page_get_father(index, block, mtr, &cursor);
3955 		}
3956 		father = btr_cur_get_block(&cursor);
3957 
3958 		if (!dict_table_is_locking_disabled(index->table)) {
3959 			lock_update_discard(
3960 				father, PAGE_HEAP_NO_SUPREMUM, block);
3961 		}
3962 
3963 		/* Free the file page */
3964 		btr_page_free(index, block, mtr);
3965 
3966 		block = father;
3967 		page_level++;
3968 	}
3969 
3970 	/* block is the root page, which must be empty, except
3971 	for the node pointer to the (now discarded) block(s). */
3972 
3973 #ifdef UNIV_BTR_DEBUG
3974 	if (!dict_index_is_ibuf(index)) {
3975 		const page_t*	root	= buf_block_get_frame(block);
3976 		const ulint	space	= dict_index_get_space(index);
3977 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
3978 					    + root, space));
3979 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
3980 					    + root, space));
3981 	}
3982 #endif /* UNIV_BTR_DEBUG */
3983 
3984 	btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
3985 	ut_ad(page_is_leaf(buf_block_get_frame(block)));
3986 
3987 	if (!dict_index_is_clust(index)
3988 	    && !dict_table_is_temporary(index->table)) {
3989 		/* We play it safe and reset the free bits for the root */
3990 		ibuf_reset_free_bits(block);
3991 
3992 		ut_a(max_trx_id);
3993 		page_set_max_trx_id(block,
3994 				    buf_block_get_page_zip(block),
3995 				    max_trx_id, mtr);
3996 	}
3997 }
3998 
3999 /*************************************************************//**
4000 Discards a page from a B-tree. This is used to remove the last record from
4001 a B-tree page: the whole page must be removed at the same time. This cannot
4002 be used for the root page, which is allowed to be empty. */
4003 void
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)4004 btr_discard_page(
4005 /*=============*/
4006 	btr_cur_t*	cursor,	/*!< in: cursor on the page to discard: not on
4007 				the root page */
4008 	mtr_t*		mtr)	/*!< in: mtr */
4009 {
4010 	dict_index_t*	index;
4011 	ulint		left_page_no;
4012 	ulint		right_page_no;
4013 	buf_block_t*	merge_block;
4014 	page_t*		merge_page;
4015 	buf_block_t*	block;
4016 	page_t*		page;
4017 	rec_t*		node_ptr;
4018 #ifdef UNIV_DEBUG
4019 	btr_cur_t	parent_cursor;
4020 	bool		parent_is_different = false;
4021 #endif
4022 
4023 	block = btr_cur_get_block(cursor);
4024 	index = btr_cur_get_index(cursor);
4025 
4026 	ut_ad(dict_index_get_page(index) != block->page.id.page_no());
4027 
4028 	ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
4029 					MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK)
4030 	      || dict_table_is_intrinsic(index->table));
4031 
4032 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
4033 
4034 	const ulint	space = dict_index_get_space(index);
4035 
4036 	MONITOR_INC(MONITOR_INDEX_DISCARD);
4037 
4038 #ifdef UNIV_DEBUG
4039 	if (dict_index_is_spatial(index)) {
4040 		rtr_page_get_father(index, block, mtr, cursor, &parent_cursor);
4041 	} else {
4042 		btr_page_get_father(index, block, mtr, &parent_cursor);
4043 	}
4044 #endif
4045 
4046 	/* Decide the page which will inherit the locks */
4047 
4048 	left_page_no = btr_page_get_prev(buf_block_get_frame(block), mtr);
4049 	right_page_no = btr_page_get_next(buf_block_get_frame(block), mtr);
4050 
4051 	const page_size_t	page_size(dict_table_page_size(index->table));
4052 
4053 	if (left_page_no != FIL_NULL) {
4054 		merge_block = btr_block_get(
4055 			page_id_t(space, left_page_no), page_size,
4056 			RW_X_LATCH, index, mtr);
4057 
4058 		merge_page = buf_block_get_frame(merge_block);
4059 #ifdef UNIV_BTR_DEBUG
4060 		ut_a(btr_page_get_next(merge_page, mtr)
4061 		     == block->page.id.page_no());
4062 #endif /* UNIV_BTR_DEBUG */
4063 		ut_d(parent_is_different =
4064 			(page_rec_get_next(
4065 				page_get_infimum_rec(
4066 					btr_cur_get_page(
4067 						&parent_cursor)))
4068 			 == btr_cur_get_rec(&parent_cursor)));
4069 	} else if (right_page_no != FIL_NULL) {
4070 		merge_block = btr_block_get(
4071 			page_id_t(space, right_page_no), page_size,
4072 			RW_X_LATCH, index, mtr);
4073 
4074 		merge_page = buf_block_get_frame(merge_block);
4075 #ifdef UNIV_BTR_DEBUG
4076 		ut_a(btr_page_get_prev(merge_page, mtr)
4077 		     == block->page.id.page_no());
4078 #endif /* UNIV_BTR_DEBUG */
4079 		ut_d(parent_is_different = page_rec_is_supremum(
4080 			page_rec_get_next(btr_cur_get_rec(&parent_cursor))));
4081 	} else {
4082 		btr_discard_only_page_on_level(index, block, mtr);
4083 
4084 		return;
4085 	}
4086 
4087 	page = buf_block_get_frame(block);
4088 	ut_a(page_is_comp(merge_page) == page_is_comp(page));
4089 	btr_search_drop_page_hash_index(block);
4090 
4091 	if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
4092 
4093 		/* We have to mark the leftmost node pointer on the right
4094 		side page as the predefined minimum record */
4095 		node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));
4096 
4097 		ut_ad(page_rec_is_user_rec(node_ptr));
4098 
4099 		/* This will make page_zip_validate() fail on merge_page
4100 		until btr_level_list_remove() completes.  This is harmless,
4101 		because everything will take place within a single
4102 		mini-transaction and because writing to the redo log
4103 		is an atomic operation (performed by mtr_commit()). */
4104 		btr_set_min_rec_mark(node_ptr, mtr);
4105 	}
4106 
4107 	if (dict_index_is_spatial(index)) {
4108 		btr_cur_t	father_cursor;
4109 
4110 		/* Since rtr_node_ptr_delete doesn't contain get father
4111 		node ptr, so, we need to get father node ptr first and then
4112 		delete it. */
4113 		rtr_page_get_father(index, block, mtr, cursor, &father_cursor);
4114 		rtr_node_ptr_delete(index, &father_cursor, block, mtr);
4115 	} else {
4116 		btr_node_ptr_delete(index, block, mtr);
4117 	}
4118 
4119 	/* Remove the page from the level list */
4120 	btr_level_list_remove(space, page_size, page, index, mtr);
4121 #ifdef UNIV_ZIP_DEBUG
4122 	{
4123 		page_zip_des_t*	merge_page_zip
4124 			= buf_block_get_page_zip(merge_block);
4125 		ut_a(!merge_page_zip
4126 		     || page_zip_validate(merge_page_zip, merge_page, index));
4127 	}
4128 #endif /* UNIV_ZIP_DEBUG */
4129 
4130 	if (!dict_table_is_locking_disabled(index->table)) {
4131 		if (left_page_no != FIL_NULL) {
4132 			lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
4133 					    block);
4134 		} else {
4135 			lock_update_discard(merge_block,
4136 					    lock_get_min_heap_no(merge_block),
4137 					    block);
4138 		}
4139 	}
4140 
4141 	if (dict_index_is_spatial(index)) {
4142 		rtr_check_discard_page(index, cursor, block);
4143 	}
4144 
4145 	/* Free the file page */
4146 	btr_page_free(index, block, mtr);
4147 
4148 	/* btr_check_node_ptr() needs parent block latched.
4149 	If the merge_block's parent block is not same,
4150 	we cannot use btr_check_node_ptr() */
4151 	ut_ad(parent_is_different
4152 	      || btr_check_node_ptr(index, merge_block, mtr));
4153 }
4154 
4155 #ifdef UNIV_BTR_PRINT
4156 /*************************************************************//**
4157 Prints size info of a B-tree. */
4158 void
btr_print_size(dict_index_t * index)4159 btr_print_size(
4160 /*===========*/
4161 	dict_index_t*	index)	/*!< in: index tree */
4162 {
4163 	page_t*		root;
4164 	fseg_header_t*	seg;
4165 	mtr_t		mtr;
4166 
4167 	if (dict_index_is_ibuf(index)) {
4168 		fputs("Sorry, cannot print info of an ibuf tree:"
4169 		      " use ibuf functions\n", stderr);
4170 
4171 		return;
4172 	}
4173 
4174 	mtr_start(&mtr);
4175 
4176 	root = btr_root_get(index, &mtr);
4177 
4178 	seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4179 
4180 	fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
4181 	fseg_print(seg, &mtr);
4182 
4183 	if (!dict_index_is_ibuf(index)) {
4184 
4185 		seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
4186 
4187 		fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
4188 		fseg_print(seg, &mtr);
4189 	}
4190 
4191 	mtr_commit(&mtr);
4192 }
4193 
4194 /************************************************************//**
4195 Prints recursively index tree pages. */
4196 static
4197 void
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,ulint ** offsets,mtr_t * mtr)4198 btr_print_recursive(
4199 /*================*/
4200 	dict_index_t*	index,	/*!< in: index tree */
4201 	buf_block_t*	block,	/*!< in: index page */
4202 	ulint		width,	/*!< in: print this many entries from start
4203 				and end */
4204 	mem_heap_t**	heap,	/*!< in/out: heap for rec_get_offsets() */
4205 	ulint**		offsets,/*!< in/out: buffer for rec_get_offsets() */
4206 	mtr_t*		mtr)	/*!< in: mtr */
4207 {
4208 	const page_t*	page	= buf_block_get_frame(block);
4209 	page_cur_t	cursor;
4210 	ulint		n_recs;
4211 	ulint		i	= 0;
4212 	mtr_t		mtr2;
4213 
4214 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_SX_FIX, index->table));
4215 
4216 	ib::info() << "NODE ON LEVEL " << btr_page_get_level(page, mtr)
4217 		<< " page " << block->page.id;
4218 
4219 	page_print(block, index, width, width);
4220 
4221 	n_recs = page_get_n_recs(page);
4222 
4223 	page_cur_set_before_first(block, &cursor);
4224 	page_cur_move_to_next(&cursor);
4225 
4226 	while (!page_cur_is_after_last(&cursor)) {
4227 
4228 		if (page_is_leaf(page)) {
4229 
4230 			/* If this is the leaf level, do nothing */
4231 
4232 		} else if ((i <= width) || (i >= n_recs - width)) {
4233 
4234 			const rec_t*	node_ptr;
4235 
4236 			mtr_start(&mtr2);
4237 
4238 			node_ptr = page_cur_get_rec(&cursor);
4239 
4240 			*offsets = rec_get_offsets(node_ptr, index, *offsets,
4241 						   ULINT_UNDEFINED, heap);
4242 			btr_print_recursive(index,
4243 					    btr_node_ptr_get_child(node_ptr,
4244 								   index,
4245 								   *offsets,
4246 								   &mtr2),
4247 					    width, heap, offsets, &mtr2);
4248 			mtr_commit(&mtr2);
4249 		}
4250 
4251 		page_cur_move_to_next(&cursor);
4252 		i++;
4253 	}
4254 }
4255 
4256 /**************************************************************//**
4257 Prints directories and other info of all nodes in the tree. */
4258 void
btr_print_index(dict_index_t * index,ulint width)4259 btr_print_index(
4260 /*============*/
4261 	dict_index_t*	index,	/*!< in: index */
4262 	ulint		width)	/*!< in: print this many entries from start
4263 				and end */
4264 {
4265 	mtr_t		mtr;
4266 	buf_block_t*	root;
4267 	mem_heap_t*	heap	= NULL;
4268 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
4269 	ulint*		offsets	= offsets_;
4270 	rec_offs_init(offsets_);
4271 
4272 	fputs("--------------------------\n"
4273 	      "INDEX TREE PRINT\n", stderr);
4274 
4275 	mtr_start(&mtr);
4276 
4277 	root = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4278 
4279 	btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
4280 	if (heap) {
4281 		mem_heap_free(heap);
4282 	}
4283 
4284 	mtr_commit(&mtr);
4285 
4286 	ut_ad(btr_validate_index(index, 0, false));
4287 }
4288 #endif /* UNIV_BTR_PRINT */
4289 
4290 #ifdef UNIV_DEBUG
4291 /************************************************************//**
4292 Checks that the node pointer to a page is appropriate.
4293 @return TRUE */
4294 ibool
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4295 btr_check_node_ptr(
4296 /*===============*/
4297 	dict_index_t*	index,	/*!< in: index tree */
4298 	buf_block_t*	block,	/*!< in: index page */
4299 	mtr_t*		mtr)	/*!< in: mtr */
4300 {
4301 	mem_heap_t*	heap;
4302 	dtuple_t*	tuple;
4303 	ulint*		offsets;
4304 	btr_cur_t	cursor;
4305 	page_t*		page = buf_block_get_frame(block);
4306 
4307 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
4308 
4309 	if (dict_index_get_page(index) == block->page.id.page_no()) {
4310 
4311 		return(TRUE);
4312 	}
4313 
4314 	heap = mem_heap_create(256);
4315 
4316 	if (dict_index_is_spatial(index)) {
4317 		offsets = rtr_page_get_father_block(NULL, heap, index, block, mtr,
4318 						    NULL, &cursor);
4319 	} else {
4320 		offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
4321 						    &cursor);
4322 	}
4323 
4324 	if (page_is_leaf(page)) {
4325 
4326 		goto func_exit;
4327 	}
4328 
4329 	tuple = dict_index_build_node_ptr(
4330 		index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
4331 		btr_page_get_level(page, mtr));
4332 
4333 	/* For spatial index, the MBR in the parent rec could be different
4334 	with that of first rec of child, their relationship should be
4335 	"WITHIN" relationship */
4336 	if (dict_index_is_spatial(index)) {
4337 		ut_a(!cmp_dtuple_rec_with_gis(
4338 			tuple, btr_cur_get_rec(&cursor),
4339 			offsets, PAGE_CUR_WITHIN));
4340 	} else {
4341 		ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
4342 	}
4343 func_exit:
4344 	mem_heap_free(heap);
4345 
4346 	return(TRUE);
4347 }
4348 #endif /* UNIV_DEBUG */
4349 
4350 /************************************************************//**
4351 Display identification information for a record. */
4352 static
4353 void
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)4354 btr_index_rec_validate_report(
4355 /*==========================*/
4356 	const page_t*		page,	/*!< in: index page */
4357 	const rec_t*		rec,	/*!< in: index record */
4358 	const dict_index_t*	index)	/*!< in: index */
4359 {
4360 	ib::info() << "Record in index " << index->name
4361 		<< " of table " << index->table->name
4362 		<< ", page " << page_id_t(page_get_space_id(page),
4363 					  page_get_page_no(page))
4364 		<< ", at offset " << page_offset(rec);
4365 }
4366 
4367 /************************************************************//**
4368 Checks the size and number of fields in a record based on the definition of
4369 the index.
4370 @return TRUE if ok */
4371 ibool
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)4372 btr_index_rec_validate(
4373 /*===================*/
4374 	const rec_t*		rec,		/*!< in: index record */
4375 	const dict_index_t*	index,		/*!< in: index */
4376 	ibool			dump_on_error)	/*!< in: TRUE if the function
4377 						should print hex dump of record
4378 						and page on error */
4379 {
4380 	ulint		len;
4381 	ulint		n;
4382 	ulint		i;
4383 	const page_t*	page;
4384 	mem_heap_t*	heap	= NULL;
4385 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
4386 	ulint*		offsets	= offsets_;
4387 	rec_offs_init(offsets_);
4388 
4389 	page = page_align(rec);
4390 
4391 	if (dict_index_is_ibuf(index)) {
4392 		/* The insert buffer index tree can contain records from any
4393 		other index: we cannot check the number of fields or
4394 		their length */
4395 
4396 		return(TRUE);
4397 	}
4398 
4399 #ifdef VIRTUAL_INDEX_DEBUG
4400 	if (dict_index_has_virtual(index)) {
4401 		fprintf(stderr, "index name is %s\n", index->name());
4402 	}
4403 #endif
4404 	if ((ibool)!!page_is_comp(page) != dict_table_is_comp(index->table)) {
4405 		btr_index_rec_validate_report(page, rec, index);
4406 
4407 		ib::error() << "Compact flag=" << !!page_is_comp(page)
4408 			<< ", should be " << dict_table_is_comp(index->table);
4409 
4410 		return(FALSE);
4411 	}
4412 
4413 	n = dict_index_get_n_fields(index);
4414 
4415 	if (!page_is_comp(page)
4416 	    && (rec_get_n_fields_old(rec) != n
4417 		/* a record for older SYS_INDEXES table
4418 		(missing merge_threshold column) is acceptable. */
4419 		&& !(index->id == DICT_INDEXES_ID
4420 		     && rec_get_n_fields_old(rec) == n - 1))) {
4421 		btr_index_rec_validate_report(page, rec, index);
4422 
4423 		ib::error() << "Has " << rec_get_n_fields_old(rec)
4424 			<< " fields, should have " << n;
4425 
4426 		if (dump_on_error) {
4427 			fputs("InnoDB: corrupt record ", stderr);
4428 			rec_print_old(stderr, rec);
4429 			putc('\n', stderr);
4430 		}
4431 		return(FALSE);
4432 	}
4433 
4434 	offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
4435 
4436 	for (i = 0; i < n; i++) {
4437 		dict_field_t*	field = dict_index_get_nth_field(index, i);
4438 		ulint		fixed_size = dict_col_get_fixed_size(
4439 						dict_field_get_col(field),
4440 						page_is_comp(page));
4441 
4442 		rec_get_nth_field_offs(offsets, i, &len);
4443 
4444 		/* Note that if fixed_size != 0, it equals the
4445 		length of a fixed-size column in the clustered index,
4446 		except the DATA_POINT, whose length would be MBR_LEN
4447 		when it's indexed in a R-TREE. We should adjust it here.
4448 		A prefix index of the column is of fixed, but different
4449 		length.  When fixed_size == 0, prefix_len is the maximum
4450 		length of the prefix index column. */
4451 
4452 		if (dict_field_get_col(field)->mtype == DATA_POINT) {
4453 			ut_ad(fixed_size == DATA_POINT_LEN);
4454 			if (dict_index_is_spatial(index)) {
4455 				/* For DATA_POINT data, when it has R-tree
4456 				index, the fixed_len is the MBR of the point.
4457 				But if it's a primary key and on R-TREE
4458 				as the PK pointer, the length shall be
4459 				DATA_POINT_LEN as well. */
4460 				ut_ad((field->fixed_len == DATA_MBR_LEN
4461 				       && i == 0)
4462 				      || (field->fixed_len == DATA_POINT_LEN
4463 					  && i != 0));
4464 				fixed_size = field->fixed_len;
4465 			}
4466 		}
4467 
4468 		if ((field->prefix_len == 0
4469 		     && len != UNIV_SQL_NULL && fixed_size
4470 		     && len != fixed_size)
4471 		    || (field->prefix_len > 0
4472 			&& len != UNIV_SQL_NULL
4473 			&& len
4474 			> field->prefix_len)) {
4475 
4476 			btr_index_rec_validate_report(page, rec, index);
4477 
4478 			ib::error	error;
4479 
4480 			error << "Field " << i << " len is " << len
4481 				<< ", should be " << fixed_size;
4482 
4483 			if (dump_on_error) {
4484 				error << "; ";
4485 				rec_print(error.m_oss, rec,
4486 					  rec_get_info_bits(
4487 						  rec, rec_offs_comp(offsets)),
4488 					  offsets);
4489 			}
4490 			if (heap) {
4491 				mem_heap_free(heap);
4492 			}
4493 			return(FALSE);
4494 		}
4495 	}
4496 
4497 #ifdef VIRTUAL_INDEX_DEBUG
4498 	if (dict_index_has_virtual(index)) {
4499 		rec_print_new(stderr, rec, offsets);
4500 	}
4501 #endif
4502 
4503 	if (heap) {
4504 		mem_heap_free(heap);
4505 	}
4506 	return(TRUE);
4507 }
4508 
4509 /************************************************************//**
4510 Checks the size and number of fields in records based on the definition of
4511 the index.
4512 @return TRUE if ok */
4513 static
4514 ibool
btr_index_page_validate(buf_block_t * block,dict_index_t * index)4515 btr_index_page_validate(
4516 /*====================*/
4517 	buf_block_t*	block,	/*!< in: index page */
4518 	dict_index_t*	index)	/*!< in: index */
4519 {
4520 	page_cur_t	cur;
4521 	ibool		ret	= TRUE;
4522 #ifndef NDEBUG
4523 	ulint		nth	= 1;
4524 #endif /* !NDEBUG */
4525 
4526 	page_cur_set_before_first(block, &cur);
4527 
4528 	/* Directory slot 0 should only contain the infimum record. */
4529 	DBUG_EXECUTE_IF("check_table_rec_next",
4530 			ut_a(page_rec_get_nth_const(
4531 				     page_cur_get_page(&cur), 0)
4532 			     == cur.rec);
4533 			ut_a(page_dir_slot_get_n_owned(
4534 				     page_dir_get_nth_slot(
4535 					     page_cur_get_page(&cur), 0))
4536 			     == 1););
4537 
4538 	page_cur_move_to_next(&cur);
4539 
4540 	for (;;) {
4541 		if (page_cur_is_after_last(&cur)) {
4542 
4543 			break;
4544 		}
4545 
4546 		if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4547 
4548 			return(FALSE);
4549 		}
4550 
4551 		/* Verify that page_rec_get_nth_const() is correctly
4552 		retrieving each record. */
4553 		DBUG_EXECUTE_IF("check_table_rec_next",
4554 				ut_a(cur.rec == page_rec_get_nth_const(
4555 					     page_cur_get_page(&cur),
4556 					     page_rec_get_n_recs_before(
4557 						     cur.rec)));
4558 				ut_a(nth++ == page_rec_get_n_recs_before(
4559 					     cur.rec)););
4560 
4561 		page_cur_move_to_next(&cur);
4562 	}
4563 
4564 	return(ret);
4565 }
4566 
4567 /************************************************************//**
4568 Report an error on one page of an index tree. */
4569 static
4570 void
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4571 btr_validate_report1(
4572 /*=================*/
4573 	dict_index_t*		index,	/*!< in: index */
4574 	ulint			level,	/*!< in: B-tree level */
4575 	const buf_block_t*	block)	/*!< in: index page */
4576 {
4577 	ib::error	error;
4578 	error << "In page " << block->page.id.page_no()
4579 		<< " of index " << index->name
4580 		<< " of table " << index->table->name;
4581 
4582 	if (level > 0) {
4583 		error << ", index tree level " << level;
4584 	}
4585 }
4586 
4587 /************************************************************//**
4588 Report an error on two pages of an index tree. */
4589 static
4590 void
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4591 btr_validate_report2(
4592 /*=================*/
4593 	const dict_index_t*	index,	/*!< in: index */
4594 	ulint			level,	/*!< in: B-tree level */
4595 	const buf_block_t*	block1,	/*!< in: first index page */
4596 	const buf_block_t*	block2)	/*!< in: second index page */
4597 {
4598 	ib::error	error;
4599 	error << "In pages " << block1->page.id
4600 		<< " and " << block2->page.id << " of index " << index->name
4601 		<< " of table " << index->table->name;
4602 
4603 	if (level > 0) {
4604 		error << ", index tree level " << level;
4605 	}
4606 }
4607 
4608 /************************************************************//**
4609 Validates index tree level.
4610 @return TRUE if ok */
4611 static
4612 bool
btr_validate_level(dict_index_t * index,const trx_t * trx,ulint level,bool lockout)4613 btr_validate_level(
4614 /*===============*/
4615 	dict_index_t*	index,	/*!< in: index tree */
4616 	const trx_t*	trx,	/*!< in: transaction or NULL */
4617 	ulint		level,	/*!< in: level number */
4618 	bool		lockout)/*!< in: true if X-latch index is intended */
4619 {
4620 	buf_block_t*	block;
4621 	page_t*		page;
4622 	buf_block_t*	right_block = 0; /* remove warning */
4623 	page_t*		right_page = 0; /* remove warning */
4624 	page_t*		father_page;
4625 	btr_cur_t	node_cur;
4626 	btr_cur_t	right_node_cur;
4627 	rec_t*		rec;
4628 	ulint		right_page_no;
4629 	ulint		left_page_no;
4630 	page_cur_t	cursor;
4631 	dtuple_t*	node_ptr_tuple;
4632 	bool		ret	= true;
4633 	mtr_t		mtr;
4634 	mem_heap_t*	heap	= mem_heap_create(256);
4635 	fseg_header_t*	seg;
4636 	ulint*		offsets	= NULL;
4637 	ulint*		offsets2= NULL;
4638 #ifdef UNIV_ZIP_DEBUG
4639 	page_zip_des_t*	page_zip;
4640 #endif /* UNIV_ZIP_DEBUG */
4641 	ulint		savepoint = 0;
4642 	ulint		savepoint2 = 0;
4643 	ulint		parent_page_no = FIL_NULL;
4644 	ulint		parent_right_page_no = FIL_NULL;
4645 	bool		rightmost_child = false;
4646 
4647 	mtr_start(&mtr);
4648 
4649 	if (!srv_read_only_mode) {
4650 		if (lockout) {
4651 			mtr_x_lock(dict_index_get_lock(index), &mtr);
4652 		} else {
4653 			mtr_sx_lock(dict_index_get_lock(index), &mtr);
4654 		}
4655 	}
4656 
4657 	block = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4658 	page = buf_block_get_frame(block);
4659 	seg = page + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4660 
4661 #ifdef UNIV_DEBUG
4662 	if (dict_index_is_spatial(index)) {
4663 		fprintf(stderr, "Root page no: %lu\n",
4664 			(ulong) page_get_page_no(page));
4665 	}
4666 #endif
4667 
4668 	const fil_space_t*	space	= fil_space_get(index->space);
4669 	const page_size_t	table_page_size(
4670 		dict_table_page_size(index->table));
4671 	const page_size_t	space_page_size(space->flags);
4672 
4673 	if (!table_page_size.equals_to(space_page_size)) {
4674 
4675 		ib::warn() << "Flags mismatch: table=" << index->table->flags
4676 			<< ", tablespace=" << space->flags;
4677 
4678 		mtr_commit(&mtr);
4679 
4680 		return(false);
4681 	}
4682 
4683 	while (level != btr_page_get_level(page, &mtr)) {
4684 		const rec_t*	node_ptr;
4685 
4686 		if (fseg_page_is_free(seg,
4687 				      block->page.id.space(),
4688 				      block->page.id.page_no())) {
4689 
4690 			btr_validate_report1(index, level, block);
4691 
4692 			ib::warn() << "Page is free";
4693 
4694 			ret = false;
4695 		}
4696 
4697 		ut_a(index->space == block->page.id.space());
4698 		ut_a(index->space == page_get_space_id(page));
4699 #ifdef UNIV_ZIP_DEBUG
4700 		page_zip = buf_block_get_page_zip(block);
4701 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4702 #endif /* UNIV_ZIP_DEBUG */
4703 		ut_a(!page_is_leaf(page));
4704 
4705 		page_cur_set_before_first(block, &cursor);
4706 		page_cur_move_to_next(&cursor);
4707 
4708 		node_ptr = page_cur_get_rec(&cursor);
4709 		offsets = rec_get_offsets(node_ptr, index, offsets,
4710 					  ULINT_UNDEFINED, &heap);
4711 
4712 		savepoint2 = mtr_set_savepoint(&mtr);
4713 		block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4714 		page = buf_block_get_frame(block);
4715 
4716 		/* For R-Tree, since record order might not be the same as
4717 		linked index page in the lower level, we need to travers
4718 		backwards to get the first page rec in this level.
4719 		This is only used for index validation. Spatial index
4720 		does not use such scan for any of its DML or query
4721 		operations  */
4722 		if (dict_index_is_spatial(index)) {
4723 			left_page_no = btr_page_get_prev(page, &mtr);
4724 
4725 			while (left_page_no != FIL_NULL) {
4726 				page_id_t	left_page_id(
4727 					index->space, left_page_no);
4728 				/* To obey latch order of tree blocks,
4729 				we should release the right_block once to
4730 				obtain lock of the uncle block. */
4731 				mtr_release_block_at_savepoint(
4732 					&mtr, savepoint2, block);
4733 
4734 				savepoint2 = mtr_set_savepoint(&mtr);
4735 				block = btr_block_get(
4736 					left_page_id,
4737 					table_page_size,
4738 					RW_SX_LATCH, index, &mtr);
4739 				page = buf_block_get_frame(block);
4740 				left_page_no = btr_page_get_prev(page, &mtr);
4741 			}
4742 		}
4743 	}
4744 
4745 	/* Now we are on the desired level. Loop through the pages on that
4746 	level. */
4747 
4748 	if (level == 0) {
4749 		/* Leaf pages are managed in their own file segment. */
4750 		seg -= PAGE_BTR_SEG_TOP - PAGE_BTR_SEG_LEAF;
4751 	}
4752 
4753 loop:
4754 	mem_heap_empty(heap);
4755 	offsets = offsets2 = NULL;
4756 	if (!srv_read_only_mode) {
4757 		if (lockout) {
4758 			mtr_x_lock(dict_index_get_lock(index), &mtr);
4759 		} else {
4760 			mtr_sx_lock(dict_index_get_lock(index), &mtr);
4761 		}
4762 	}
4763 
4764 #ifdef UNIV_ZIP_DEBUG
4765 	page_zip = buf_block_get_page_zip(block);
4766 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4767 #endif /* UNIV_ZIP_DEBUG */
4768 
4769 	ut_a(block->page.id.space() == index->space);
4770 
4771 	if (fseg_page_is_free(seg,
4772 			      block->page.id.space(),
4773 			      block->page.id.page_no())) {
4774 
4775 		btr_validate_report1(index, level, block);
4776 
4777 		ib::warn() << "Page is marked as free";
4778 		ret = false;
4779 
4780 	} else if (btr_page_get_index_id(page) != index->id) {
4781 
4782 		ib::error() << "Page index id " << btr_page_get_index_id(page)
4783 			<< " != data dictionary index id " << index->id;
4784 
4785 		ret = false;
4786 
4787 	} else if (!page_validate(page, index)) {
4788 
4789 		btr_validate_report1(index, level, block);
4790 		ret = false;
4791 
4792 	} else if (level == 0 && !btr_index_page_validate(block, index)) {
4793 
4794 		/* We are on level 0. Check that the records have the right
4795 		number of fields, and field lengths are right. */
4796 
4797 		ret = false;
4798 	}
4799 
4800 	ut_a(btr_page_get_level(page, &mtr) == level);
4801 
4802 	right_page_no = btr_page_get_next(page, &mtr);
4803 	left_page_no = btr_page_get_prev(page, &mtr);
4804 
4805 	ut_a(!page_is_empty(page)
4806 	     || (level == 0
4807 		 && page_get_page_no(page) == dict_index_get_page(index)));
4808 
4809 	if (right_page_no != FIL_NULL) {
4810 		const rec_t*	right_rec;
4811 		savepoint = mtr_set_savepoint(&mtr);
4812 
4813 		right_block = btr_block_get(
4814 			page_id_t(index->space, right_page_no),
4815 			table_page_size,
4816 			RW_SX_LATCH, index, &mtr);
4817 
4818 		right_page = buf_block_get_frame(right_block);
4819 
4820 		if (btr_page_get_prev(right_page, &mtr)
4821 		    != page_get_page_no(page)) {
4822 
4823 			btr_validate_report2(index, level, block, right_block);
4824 			fputs("InnoDB: broken FIL_PAGE_NEXT"
4825 			      " or FIL_PAGE_PREV links\n", stderr);
4826 
4827 			ret = false;
4828 		}
4829 
4830 		if (page_is_comp(right_page) != page_is_comp(page)) {
4831 			btr_validate_report2(index, level, block, right_block);
4832 			fputs("InnoDB: 'compact' flag mismatch\n", stderr);
4833 
4834 			ret = false;
4835 
4836 			goto node_ptr_fails;
4837 		}
4838 
4839 		rec = page_rec_get_prev(page_get_supremum_rec(page));
4840 		right_rec = page_rec_get_next(page_get_infimum_rec(
4841 						      right_page));
4842 		offsets = rec_get_offsets(rec, index,
4843 					  offsets, ULINT_UNDEFINED, &heap);
4844 		offsets2 = rec_get_offsets(right_rec, index,
4845 					   offsets2, ULINT_UNDEFINED, &heap);
4846 
4847 		/* For spatial index, we cannot guarantee the key ordering
4848 		across pages, so skip the record compare verification for
4849 		now. Will enhanced in special R-Tree index validation scheme */
4850 		if (!dict_index_is_spatial(index)
4851 		    && cmp_rec_rec(rec, right_rec,
4852 				   offsets, offsets2, index, false) >= 0) {
4853 
4854 			btr_validate_report2(index, level, block, right_block);
4855 
4856 			fputs("InnoDB: records in wrong order"
4857 			      " on adjacent pages\n", stderr);
4858 
4859 			fputs("InnoDB: record ", stderr);
4860 			rec = page_rec_get_prev(page_get_supremum_rec(page));
4861 			rec_print(stderr, rec, index);
4862 			putc('\n', stderr);
4863 			fputs("InnoDB: record ", stderr);
4864 			rec = page_rec_get_next(
4865 				page_get_infimum_rec(right_page));
4866 			rec_print(stderr, rec, index);
4867 			putc('\n', stderr);
4868 
4869 			ret = false;
4870 		}
4871 	}
4872 
4873 	if (level > 0 && left_page_no == FIL_NULL) {
4874 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
4875 			     page_rec_get_next(page_get_infimum_rec(page)),
4876 			     page_is_comp(page)));
4877 	}
4878 
4879 	/* Similarly skip the father node check for spatial index for now,
4880 	for a couple of reasons:
4881 	1) As mentioned, there is no ordering relationship between records
4882 	in parent level and linked pages in the child level.
4883 	2) Search parent from root is very costly for R-tree.
4884 	We will add special validation mechanism for R-tree later (WL #7520) */
4885 	if (!dict_index_is_spatial(index)
4886 	    && block->page.id.page_no() != dict_index_get_page(index)) {
4887 
4888 		/* Check father node pointers */
4889 		rec_t*	node_ptr;
4890 
4891 		btr_cur_position(
4892 			index, page_rec_get_next(page_get_infimum_rec(page)),
4893 			block, &node_cur);
4894 		offsets = btr_page_get_father_node_ptr_for_validate(
4895 			offsets, heap, &node_cur, &mtr);
4896 
4897 		father_page = btr_cur_get_page(&node_cur);
4898 		node_ptr = btr_cur_get_rec(&node_cur);
4899 
4900 		parent_page_no = page_get_page_no(father_page);
4901 		parent_right_page_no = btr_page_get_next(father_page, &mtr);
4902 		rightmost_child = page_rec_is_supremum(
4903 					page_rec_get_next(node_ptr));
4904 
4905 		btr_cur_position(
4906 			index,
4907 			page_rec_get_prev(page_get_supremum_rec(page)),
4908 			block, &node_cur);
4909 
4910 		offsets = btr_page_get_father_node_ptr_for_validate(
4911 				offsets, heap, &node_cur, &mtr);
4912 
4913 		if (node_ptr != btr_cur_get_rec(&node_cur)
4914 		    || btr_node_ptr_get_child_page_no(node_ptr, offsets)
4915 				     != block->page.id.page_no()) {
4916 
4917 			btr_validate_report1(index, level, block);
4918 
4919 			fputs("InnoDB: node pointer to the page is wrong\n",
4920 			      stderr);
4921 
4922 			fputs("InnoDB: node ptr ", stderr);
4923 			rec_print(stderr, node_ptr, index);
4924 
4925 			rec = btr_cur_get_rec(&node_cur);
4926 			fprintf(stderr, "\n"
4927 				"InnoDB: node ptr child page n:o %lu\n",
4928 				(ulong) btr_node_ptr_get_child_page_no(
4929 					rec, offsets));
4930 
4931 			fputs("InnoDB: record on page ", stderr);
4932 			rec_print_new(stderr, rec, offsets);
4933 			putc('\n', stderr);
4934 			ret = false;
4935 
4936 			goto node_ptr_fails;
4937 		}
4938 
4939 		if (!page_is_leaf(page)) {
4940 			node_ptr_tuple = dict_index_build_node_ptr(
4941 				index,
4942 				page_rec_get_next(page_get_infimum_rec(page)),
4943 				0, heap, btr_page_get_level(page, &mtr));
4944 
4945 			if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
4946 					   offsets)) {
4947 				const rec_t* first_rec = page_rec_get_next(
4948 					page_get_infimum_rec(page));
4949 
4950 				btr_validate_report1(index, level, block);
4951 
4952 				ib::error() << "Node ptrs differ on levels > 0";
4953 
4954 				fputs("InnoDB: node ptr ",stderr);
4955 				rec_print_new(stderr, node_ptr, offsets);
4956 				fputs("InnoDB: first rec ", stderr);
4957 				rec_print(stderr, first_rec, index);
4958 				putc('\n', stderr);
4959 				ret = false;
4960 
4961 				goto node_ptr_fails;
4962 			}
4963 		}
4964 
4965 		if (left_page_no == FIL_NULL) {
4966 			ut_a(node_ptr == page_rec_get_next(
4967 				     page_get_infimum_rec(father_page)));
4968 			ut_a(btr_page_get_prev(father_page, &mtr) == FIL_NULL);
4969 		}
4970 
4971 		if (right_page_no == FIL_NULL) {
4972 			ut_a(node_ptr == page_rec_get_prev(
4973 				     page_get_supremum_rec(father_page)));
4974 			ut_a(btr_page_get_next(father_page, &mtr) == FIL_NULL);
4975 		} else {
4976 			const rec_t*	right_node_ptr;
4977 
4978 			right_node_ptr = page_rec_get_next(node_ptr);
4979 
4980 			if (!lockout && rightmost_child) {
4981 
4982 				/* To obey latch order of tree blocks,
4983 				we should release the right_block once to
4984 				obtain lock of the uncle block. */
4985 				mtr_release_block_at_savepoint(
4986 					&mtr, savepoint, right_block);
4987 
4988 				btr_block_get(
4989 					page_id_t(index->space,
4990 						  parent_right_page_no),
4991 					table_page_size,
4992 					RW_SX_LATCH, index, &mtr);
4993 
4994 				right_block = btr_block_get(
4995 					page_id_t(index->space,
4996 						  right_page_no),
4997 					table_page_size,
4998 					RW_SX_LATCH, index, &mtr);
4999 			}
5000 
5001 			btr_cur_position(
5002 				index, page_rec_get_next(
5003 					page_get_infimum_rec(
5004 						buf_block_get_frame(
5005 							right_block))),
5006 				right_block, &right_node_cur);
5007 
5008 			offsets = btr_page_get_father_node_ptr_for_validate(
5009 					offsets, heap, &right_node_cur, &mtr);
5010 
5011 			if (right_node_ptr
5012 			    != page_get_supremum_rec(father_page)) {
5013 
5014 				if (btr_cur_get_rec(&right_node_cur)
5015 				    != right_node_ptr) {
5016 					ret = false;
5017 					fputs("InnoDB: node pointer to"
5018 					      " the right page is wrong\n",
5019 					      stderr);
5020 
5021 					btr_validate_report1(index, level,
5022 							     block);
5023 				}
5024 			} else {
5025 				page_t*	right_father_page
5026 					= btr_cur_get_page(&right_node_cur);
5027 
5028 				if (btr_cur_get_rec(&right_node_cur)
5029 				    != page_rec_get_next(
5030 					    page_get_infimum_rec(
5031 						    right_father_page))) {
5032 					ret = false;
5033 					fputs("InnoDB: node pointer 2 to"
5034 					      " the right page is wrong\n",
5035 					      stderr);
5036 
5037 					btr_validate_report1(index, level,
5038 							     block);
5039 				}
5040 
5041 				if (page_get_page_no(right_father_page)
5042 				    != btr_page_get_next(father_page, &mtr)) {
5043 
5044 					ret = false;
5045 					fputs("InnoDB: node pointer 3 to"
5046 					      " the right page is wrong\n",
5047 					      stderr);
5048 
5049 					btr_validate_report1(index, level,
5050 							     block);
5051 				}
5052 			}
5053 		}
5054 	}
5055 
5056 node_ptr_fails:
5057 	/* Commit the mini-transaction to release the latch on 'page'.
5058 	Re-acquire the latch on right_page, which will become 'page'
5059 	on the next loop.  The page has already been checked. */
5060 	mtr_commit(&mtr);
5061 
5062 	if (trx_is_interrupted(trx)) {
5063 		/* On interrupt, return the current status. */
5064 	} else if (right_page_no != FIL_NULL) {
5065 
5066 		mtr_start(&mtr);
5067 
5068 		if (!lockout) {
5069 			if (rightmost_child) {
5070 				if (parent_right_page_no != FIL_NULL) {
5071 					btr_block_get(
5072 						page_id_t(
5073 							index->space,
5074 							parent_right_page_no),
5075 						table_page_size,
5076 						RW_SX_LATCH, index, &mtr);
5077 				}
5078 			} else if (parent_page_no != FIL_NULL) {
5079 				btr_block_get(
5080 					page_id_t(index->space,
5081 						  parent_page_no),
5082 					table_page_size,
5083 					RW_SX_LATCH, index, &mtr);
5084 			}
5085 		}
5086 
5087 		block = btr_block_get(
5088 			page_id_t(index->space, right_page_no),
5089 			table_page_size,
5090 			RW_SX_LATCH, index, &mtr);
5091 
5092 		page = buf_block_get_frame(block);
5093 
5094 		goto loop;
5095 	}
5096 
5097 	mem_heap_free(heap);
5098 
5099 	return(ret);
5100 }
5101 
5102 /**************************************************************//**
5103 Do an index level validation of spaital index tree.
5104 @return	true if no error found */
5105 bool
btr_validate_spatial_index(dict_index_t * index,const trx_t * trx)5106 btr_validate_spatial_index(
5107 /*=======================*/
5108 	dict_index_t*	index,	/*!< in: index */
5109 	const trx_t*	trx)	/*!< in: transaction or NULL */
5110 {
5111 
5112 	mtr_t	mtr;
5113 	bool	ok = true;
5114 
5115 	mtr_start(&mtr);
5116 
5117 	mtr_x_lock(dict_index_get_lock(index), &mtr);
5118 
5119 	page_t*	root = btr_root_get(index, &mtr);
5120 	ulint	n = btr_page_get_level(root, &mtr);
5121 
5122 #ifdef UNIV_RTR_DEBUG
5123 	fprintf(stderr, "R-tree level is %lu\n", n);
5124 #endif /* UNIV_RTR_DEBUG */
5125 
5126 	for (ulint i = 0; i <= n; ++i) {
5127 #ifdef UNIV_RTR_DEBUG
5128 		fprintf(stderr, "Level %lu:\n", n - i);
5129 #endif /* UNIV_RTR_DEBUG */
5130 
5131 		if (!btr_validate_level(index, trx, n - i, true)) {
5132 			ok = false;
5133 			break;
5134 		}
5135 	}
5136 
5137 	mtr_commit(&mtr);
5138 
5139 	return(ok);
5140 }
5141 
5142 /**************************************************************//**
5143 Checks the consistency of an index tree.
5144 @return	DB_SUCCESS if ok, error code if not */
5145 dberr_t
btr_validate_index(dict_index_t * index,const trx_t * trx,bool lockout)5146 btr_validate_index(
5147 /*===============*/
5148 	dict_index_t*	index,	/*!< in: index */
5149 	const trx_t*	trx,	/*!< in: transaction or NULL */
5150 	bool		lockout)/*!< in: true if X-latch index is intended */
5151 {
5152 	dberr_t err = DB_SUCCESS;
5153 
5154 	/* Full Text index are implemented by auxiliary tables,
5155 	not the B-tree */
5156 	if (dict_index_is_online_ddl(index) || (index->type & DICT_FTS)) {
5157 		return(err);
5158 	}
5159 
5160 	if (dict_index_is_spatial(index)) {
5161 		if (!btr_validate_spatial_index(index, trx)) {
5162 			err = DB_ERROR;
5163 		}
5164 	}
5165 
5166 	mtr_t		mtr;
5167 
5168 	mtr_start(&mtr);
5169 
5170 	if (!srv_read_only_mode) {
5171 		if (lockout) {
5172 			mtr_x_lock(dict_index_get_lock(index), &mtr);
5173 		} else {
5174 			mtr_sx_lock(dict_index_get_lock(index), &mtr);
5175 		}
5176 	}
5177 
5178 	page_t*	root = btr_root_get(index, &mtr);
5179 
5180 	if (root == NULL && !index->is_readable()) {
5181 		err = DB_DECRYPTION_FAILED;
5182 		mtr_commit(&mtr);
5183 		return err;
5184 	}
5185 
5186 	SRV_CORRUPT_TABLE_CHECK(root,
5187 	{
5188 		mtr_commit(&mtr);
5189 		return DB_CORRUPTION;
5190 	});
5191 
5192 	ulint	n = btr_page_get_level(root, &mtr);
5193 
5194 	for (ulint i = 0; i <= n; ++i) {
5195 
5196 		if (!btr_validate_level(index, trx, n - i, lockout)) {
5197 			err = DB_CORRUPTION;
5198 			break;
5199 		}
5200 	}
5201 
5202 	mtr_commit(&mtr);
5203 
5204 	return(err);
5205 }
5206 
5207 /**************************************************************//**
5208 Checks if the page in the cursor can be merged with given page.
5209 If necessary, re-organize the merge_page.
5210 @return	true if possible to merge. */
5211 bool
btr_can_merge_with_page(btr_cur_t * cursor,ulint page_no,buf_block_t ** merge_block,mtr_t * mtr)5212 btr_can_merge_with_page(
5213 /*====================*/
5214 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
5215 	ulint		page_no,	/*!< in: a sibling page */
5216 	buf_block_t**	merge_block,	/*!< out: the merge block */
5217 	mtr_t*		mtr)		/*!< in: mini-transaction */
5218 {
5219 	dict_index_t*	index;
5220 	page_t*		page;
5221 	ulint		n_recs;
5222 	ulint		data_size;
5223 	ulint		max_ins_size_reorg;
5224 	ulint		max_ins_size;
5225 	buf_block_t*	mblock;
5226 	page_t*		mpage;
5227 	DBUG_ENTER("btr_can_merge_with_page");
5228 
5229 	if (page_no == FIL_NULL) {
5230 		*merge_block = NULL;
5231 		DBUG_RETURN(false);
5232 	}
5233 
5234 	index = btr_cur_get_index(cursor);
5235 	page = btr_cur_get_page(cursor);
5236 
5237 	const page_id_t		page_id(dict_index_get_space(index), page_no);
5238 	const page_size_t	page_size(dict_table_page_size(index->table));
5239 
5240 	mblock = btr_block_get(page_id, page_size, RW_X_LATCH, index, mtr);
5241 	mpage = buf_block_get_frame(mblock);
5242 
5243 	n_recs = page_get_n_recs(page);
5244 	data_size = page_get_data_size(page);
5245 
5246 	max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
5247 		mpage, n_recs);
5248 
5249 	if (data_size > max_ins_size_reorg) {
5250 		goto error;
5251 	}
5252 
5253 	/* If compression padding tells us that merging will result in
5254 	too packed up page i.e.: which is likely to cause compression
5255 	failure then don't merge the pages. */
5256 	if (page_size.is_compressed() && page_is_leaf(mpage)
5257 	    && (page_get_data_size(mpage) + data_size
5258 		>= dict_index_zip_pad_optimal_page_size(index))) {
5259 
5260 		goto error;
5261 	}
5262 
5263 
5264 	max_ins_size = page_get_max_insert_size(mpage, n_recs);
5265 
5266 	if (data_size > max_ins_size) {
5267 
5268 		/* We have to reorganize mpage */
5269 
5270 		if (!btr_page_reorganize_block(
5271 			    false, page_zip_level, mblock, index, mtr)) {
5272 
5273 			goto error;
5274 		}
5275 
5276 		max_ins_size = page_get_max_insert_size(mpage, n_recs);
5277 
5278 		ut_ad(page_validate(mpage, index));
5279 		ut_ad(max_ins_size == max_ins_size_reorg);
5280 
5281 		if (data_size > max_ins_size) {
5282 
5283 			/* Add fault tolerance, though this should
5284 			never happen */
5285 
5286 			goto error;
5287 		}
5288 	}
5289 
5290 	*merge_block = mblock;
5291 	DBUG_RETURN(true);
5292 
5293 error:
5294 	*merge_block = NULL;
5295 	DBUG_RETURN(false);
5296 }
5297 
5298 #endif /* !UNIV_HOTBACKUP */
5299