1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License, version 2.0,
8 as published by the Free Software Foundation.
9 
10 This program is also distributed with certain software (including
11 but not limited to OpenSSL) that is licensed under separate terms,
12 as designated in a particular file or component or in included license
13 documentation.  The authors of MySQL hereby grant you an additional
14 permission to link the program and your derivative works with the
15 separately licensed software that they have included with MySQL.
16 
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 GNU General Public License, version 2.0, for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
25 
26 *****************************************************************************/
27 
28 /**************************************************//**
29 @file btr/btr0btr.cc
30 The B-tree
31 
32 Created 6/2/1994 Heikki Tuuri
33 *******************************************************/
34 
35 #include "btr0btr.h"
36 
37 #ifdef UNIV_NONINL
38 #include "btr0btr.ic"
39 #endif
40 
41 #include "fsp0sysspace.h"
42 #include "page0page.h"
43 #include "page0zip.h"
44 #include "gis0rtree.h"
45 
46 #ifndef UNIV_HOTBACKUP
47 #include "btr0cur.h"
48 #include "btr0sea.h"
49 #include "btr0pcur.h"
50 #include "rem0cmp.h"
51 #include "lock0lock.h"
52 #include "ibuf0ibuf.h"
53 #include "trx0trx.h"
54 #include "srv0mon.h"
55 #include "gis0geo.h"
56 #include "ut0new.h"
57 #include "dict0boot.h"
58 
59 /**************************************************************//**
60 Checks if the page in the cursor can be merged with given page.
61 If necessary, re-organize the merge_page.
62 @return	true if possible to merge. */
63 bool
64 btr_can_merge_with_page(
65 /*====================*/
66 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
67 	ulint		page_no,	/*!< in: a sibling page */
68 	buf_block_t**	merge_block,	/*!< out: the merge block */
69 	mtr_t*		mtr);		/*!< in: mini-transaction */
70 
71 #endif /* UNIV_HOTBACKUP */
72 
73 /**************************************************************//**
74 Report that an index page is corrupted. */
75 void
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)76 btr_corruption_report(
77 /*==================*/
78 	const buf_block_t*	block,	/*!< in: corrupted block */
79 	const dict_index_t*	index)	/*!< in: index tree */
80 {
81 	ib::error()
82 		<< "Flag mismatch in page " << block->page.id
83 		<< " index " << index->name
84 		<< " of table " << index->table->name;
85 }
86 
87 #ifndef UNIV_HOTBACKUP
88 /*
89 Latching strategy of the InnoDB B-tree
90 --------------------------------------
91 A tree latch protects all non-leaf nodes of the tree. Each node of a tree
92 also has a latch of its own.
93 
94 A B-tree operation normally first acquires an S-latch on the tree. It
95 searches down the tree and releases the tree latch when it has the
96 leaf node latch. To save CPU time we do not acquire any latch on
97 non-leaf nodes of the tree during a search, those pages are only bufferfixed.
98 
99 If an operation needs to restructure the tree, it acquires an X-latch on
100 the tree before searching to a leaf node. If it needs, for example, to
101 split a leaf,
102 (1) InnoDB decides the split point in the leaf,
103 (2) allocates a new page,
104 (3) inserts the appropriate node pointer to the first non-leaf level,
105 (4) releases the tree X-latch,
106 (5) and then moves records from the leaf to the new allocated page.
107 
108 Node pointers
109 -------------
110 Leaf pages of a B-tree contain the index records stored in the
111 tree. On levels n > 0 we store 'node pointers' to pages on level
112 n - 1. For each page there is exactly one node pointer stored:
113 thus the our tree is an ordinary B-tree, not a B-link tree.
114 
115 A node pointer contains a prefix P of an index record. The prefix
116 is long enough so that it determines an index record uniquely.
117 The file page number of the child page is added as the last
118 field. To the child page we can store node pointers or index records
119 which are >= P in the alphabetical order, but < P1 if there is
120 a next node pointer on the level, and P1 is its prefix.
121 
122 If a node pointer with a prefix P points to a non-leaf child,
123 then the leftmost record in the child must have the same
124 prefix P. If it points to a leaf node, the child is not required
125 to contain any record with a prefix equal to P. The leaf case
126 is decided this way to allow arbitrary deletions in a leaf node
127 without touching upper levels of the tree.
128 
129 We have predefined a special minimum record which we
130 define as the smallest record in any alphabetical order.
131 A minimum record is denoted by setting a bit in the record
132 header. A minimum record acts as the prefix of a node pointer
133 which points to a leftmost node on any level of the tree.
134 
135 File page allocation
136 --------------------
137 In the root node of a B-tree there are two file segment headers.
138 The leaf pages of a tree are allocated from one file segment, to
139 make them consecutive on disk if possible. From the other file segment
140 we allocate pages for the non-leaf levels of the tree.
141 */
142 
143 #ifdef UNIV_BTR_DEBUG
144 /**************************************************************//**
145 Checks a file segment header within a B-tree root page.
146 @return TRUE if valid */
147 static
148 ibool
btr_root_fseg_validate(const fseg_header_t * seg_header,ulint space)149 btr_root_fseg_validate(
150 /*===================*/
151 	const fseg_header_t*	seg_header,	/*!< in: segment header */
152 	ulint			space)		/*!< in: tablespace identifier */
153 {
154 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
155 
156 	ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
157 	ut_a(offset >= FIL_PAGE_DATA);
158 	ut_a(offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
159 	return(TRUE);
160 }
161 #endif /* UNIV_BTR_DEBUG */
162 
163 /**************************************************************//**
164 Gets the root node of a tree and x- or s-latches it.
165 @return root page, x- or s-latched */
166 buf_block_t*
btr_root_block_get(const dict_index_t * index,ulint mode,mtr_t * mtr)167 btr_root_block_get(
168 /*===============*/
169 	const dict_index_t*	index,	/*!< in: index tree */
170 	ulint			mode,	/*!< in: either RW_S_LATCH
171 					or RW_X_LATCH */
172 	mtr_t*			mtr)	/*!< in: mtr */
173 {
174 	const ulint		space = dict_index_get_space(index);
175 	const page_id_t		page_id(space, dict_index_get_page(index));
176 	const page_size_t	page_size(dict_table_page_size(index->table));
177 
178 	buf_block_t*	block = btr_block_get(page_id, page_size, mode,
179 					      index, mtr);
180 
181 	btr_assert_not_corrupted(block, index);
182 #ifdef UNIV_BTR_DEBUG
183 	if (!dict_index_is_ibuf(index)) {
184 		const page_t*	root = buf_block_get_frame(block);
185 
186 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
187 					    + root, space));
188 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
189 					    + root, space));
190 	}
191 #endif /* UNIV_BTR_DEBUG */
192 
193 	return(block);
194 }
195 
196 /**************************************************************//**
197 Gets the root node of a tree and sx-latches it for segment access.
198 @return root page, sx-latched */
199 page_t*
btr_root_get(const dict_index_t * index,mtr_t * mtr)200 btr_root_get(
201 /*=========*/
202 	const dict_index_t*	index,	/*!< in: index tree */
203 	mtr_t*			mtr)	/*!< in: mtr */
204 {
205 	/* Intended to be used for segment list access.
206 	SX lock doesn't block reading user data by other threads.
207 	And block the segment list access by others.*/
208 	return(buf_block_get_frame(btr_root_block_get(index, RW_SX_LATCH,
209 						      mtr)));
210 }
211 
212 /**************************************************************//**
213 Gets the height of the B-tree (the level of the root, when the leaf
214 level is assumed to be 0). The caller must hold an S or X latch on
215 the index.
216 @return tree height (level of the root) */
217 ulint
btr_height_get(dict_index_t * index,mtr_t * mtr)218 btr_height_get(
219 /*===========*/
220 	dict_index_t*	index,	/*!< in: index tree */
221 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
222 {
223 	ulint		height;
224 	buf_block_t*	root_block;
225 
226 	ut_ad(srv_read_only_mode
227 	      || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
228 					   MTR_MEMO_S_LOCK
229 					   | MTR_MEMO_X_LOCK
230 					   | MTR_MEMO_SX_LOCK)
231 	      || dict_table_is_intrinsic(index->table));
232 
233 	/* S latches the page */
234 	root_block = btr_root_block_get(index, RW_S_LATCH, mtr);
235 
236 	height = btr_page_get_level(buf_block_get_frame(root_block), mtr);
237 
238 	/* Release the S latch on the root page. */
239 	mtr->memo_release(root_block, MTR_MEMO_PAGE_S_FIX);
240 
241 	ut_d(sync_check_unlock(&root_block->lock));
242 
243 	return(height);
244 }
245 
246 /**************************************************************//**
247 Checks a file segment header within a B-tree root page and updates
248 the segment header space id.
249 @return TRUE if valid */
250 static
251 bool
btr_root_fseg_adjust_on_import(fseg_header_t * seg_header,page_zip_des_t * page_zip,ulint space,mtr_t * mtr)252 btr_root_fseg_adjust_on_import(
253 /*===========================*/
254 	fseg_header_t*	seg_header,	/*!< in/out: segment header */
255 	page_zip_des_t*	page_zip,	/*!< in/out: compressed page,
256 					or NULL */
257 	ulint		space,		/*!< in: tablespace identifier */
258 	mtr_t*		mtr)		/*!< in/out: mini-transaction */
259 {
260 	ulint	offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
261 
262 	if (offset < FIL_PAGE_DATA
263 	    || offset > UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) {
264 
265 		return(FALSE);
266 
267 	} else if (page_zip) {
268 		mach_write_to_4(seg_header + FSEG_HDR_SPACE, space);
269 		page_zip_write_header(page_zip, seg_header + FSEG_HDR_SPACE,
270 				      4, mtr);
271 	} else {
272 		mlog_write_ulint(seg_header + FSEG_HDR_SPACE,
273 				 space, MLOG_4BYTES, mtr);
274 	}
275 
276 	return(TRUE);
277 }
278 
279 /**************************************************************//**
280 Checks and adjusts the root node of a tree during IMPORT TABLESPACE.
281 @return error code, or DB_SUCCESS */
282 dberr_t
btr_root_adjust_on_import(const dict_index_t * index)283 btr_root_adjust_on_import(
284 /*======================*/
285 	const dict_index_t*	index)	/*!< in: index tree */
286 {
287 	dberr_t			err;
288 	mtr_t			mtr;
289 	page_t*			page;
290 	buf_block_t*		block;
291 	page_zip_des_t*		page_zip;
292 	dict_table_t*		table = index->table;
293 	const ulint		space_id = dict_index_get_space(index);
294 	const page_id_t		page_id(space_id, dict_index_get_page(index));
295 	const page_size_t	page_size(dict_table_page_size(table));
296 
297 	DBUG_EXECUTE_IF("ib_import_trigger_corruption_3",
298 			return(DB_CORRUPTION););
299 
300 	mtr_start(&mtr);
301 
302 	mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
303 
304 	block = btr_block_get(page_id, page_size, RW_X_LATCH, index, &mtr);
305 
306 	page = buf_block_get_frame(block);
307 	page_zip = buf_block_get_page_zip(block);
308 
309 	if (!page_is_root(page)) {
310 
311 		err = DB_CORRUPTION;
312 
313 	} else if (dict_index_is_clust(index)) {
314 		bool	page_is_compact_format;
315 
316 		page_is_compact_format = page_is_comp(page) > 0;
317 
318 		/* Check if the page format and table format agree. */
319 		if (page_is_compact_format != dict_table_is_comp(table)) {
320 			err = DB_CORRUPTION;
321 		} else {
322 			/* Check that the table flags and the tablespace
323 			flags match. */
324 			ulint	flags = dict_tf_to_fsp_flags(
325 				table->flags,
326 				false,
327 				dict_table_is_encrypted(table));
328 			ulint	fsp_flags = fil_space_get_flags(table->space);
329 
330 			err = fsp_flags_are_equal(flags, fsp_flags)
331 			      ? DB_SUCCESS : DB_CORRUPTION;
332 		}
333 	} else {
334 		err = DB_SUCCESS;
335 	}
336 
337 	/* Check and adjust the file segment headers, if all OK so far. */
338 	if (err == DB_SUCCESS
339 	    && (!btr_root_fseg_adjust_on_import(
340 			FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
341 			+ page, page_zip, space_id, &mtr)
342 		|| !btr_root_fseg_adjust_on_import(
343 			FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
344 			+ page, page_zip, space_id, &mtr))) {
345 
346 		err = DB_CORRUPTION;
347 	}
348 
349 	mtr_commit(&mtr);
350 
351 	return(err);
352 }
353 
354 /**************************************************************//**
355 Creates a new index page (not the root, and also not
356 used in page reorganization).  @see btr_page_empty(). */
357 void
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)358 btr_page_create(
359 /*============*/
360 	buf_block_t*	block,	/*!< in/out: page to be created */
361 	page_zip_des_t*	page_zip,/*!< in/out: compressed page, or NULL */
362 	dict_index_t*	index,	/*!< in: index */
363 	ulint		level,	/*!< in: the B-tree level of the page */
364 	mtr_t*		mtr)	/*!< in: mtr */
365 {
366 	page_t*		page = buf_block_get_frame(block);
367 
368 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
369 
370 	if (page_zip) {
371 		page_create_zip(block, index, level, 0, NULL, mtr);
372 	} else {
373 		page_create(block, mtr, dict_table_is_comp(index->table),
374 			    dict_index_is_spatial(index));
375 		/* Set the level of the new index page */
376 		btr_page_set_level(page, NULL, level, mtr);
377 	}
378 
379 	/* For Spatial Index, initialize the Split Sequence Number */
380 	if (dict_index_is_spatial(index)) {
381 		page_set_ssn_id(block, page_zip, 0, mtr);
382 	}
383 
384 	btr_page_set_index_id(page, page_zip, index->id, mtr);
385 }
386 
387 /**************************************************************//**
388 Allocates a new file page to be used in an ibuf tree. Takes the page from
389 the free list of the tree, which must contain pages!
390 @return new allocated block, x-latched */
391 static
392 buf_block_t*
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)393 btr_page_alloc_for_ibuf(
394 /*====================*/
395 	dict_index_t*	index,	/*!< in: index tree */
396 	mtr_t*		mtr)	/*!< in: mtr */
397 {
398 	fil_addr_t	node_addr;
399 	page_t*		root;
400 	page_t*		new_page;
401 	buf_block_t*	new_block;
402 
403 	root = btr_root_get(index, mtr);
404 
405 	node_addr = flst_get_first(root + PAGE_HEADER
406 				   + PAGE_BTR_IBUF_FREE_LIST, mtr);
407 	ut_a(node_addr.page != FIL_NULL);
408 
409 	new_block = buf_page_get(
410 		page_id_t(dict_index_get_space(index), node_addr.page),
411 		dict_table_page_size(index->table),
412 		RW_X_LATCH, mtr);
413 
414 	new_page = buf_block_get_frame(new_block);
415 	buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
416 
417 	flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
418 		    new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
419 		    mtr);
420 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
421 			    mtr));
422 
423 	return(new_block);
424 }
425 
426 /**************************************************************//**
427 Allocates a new file page to be used in an index tree. NOTE: we assume
428 that the caller has made the reservation for free extents!
429 @retval NULL if no page could be allocated
430 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
431 (init_mtr == mtr, or the page was not previously freed in mtr)
432 @retval block (not allocated or initialized) otherwise */
433 static MY_ATTRIBUTE((nonnull, warn_unused_result))
434 buf_block_t*
btr_page_alloc_low(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)435 btr_page_alloc_low(
436 /*===============*/
437 	dict_index_t*	index,		/*!< in: index */
438 	ulint		hint_page_no,	/*!< in: hint of a good page */
439 	byte		file_direction,	/*!< in: direction where a possible
440 					page split is made */
441 	ulint		level,		/*!< in: level where the page is placed
442 					in the tree */
443 	mtr_t*		mtr,		/*!< in/out: mini-transaction
444 					for the allocation */
445 	mtr_t*		init_mtr)	/*!< in/out: mtr or another
446 					mini-transaction in which the
447 					page should be initialized.
448 					If init_mtr!=mtr, but the page
449 					is already X-latched in mtr, do
450 					not initialize the page. */
451 {
452 	fseg_header_t*	seg_header;
453 	page_t*		root;
454 
455 	root = btr_root_get(index, mtr);
456 
457 	if (level == 0) {
458 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
459 	} else {
460 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
461 	}
462 
463 	/* Parameter TRUE below states that the caller has made the
464 	reservation for free extents, and thus we know that a page can
465 	be allocated: */
466 
467 	return(fseg_alloc_free_page_general(
468 		       seg_header, hint_page_no, file_direction,
469 		       TRUE, mtr, init_mtr));
470 }
471 
472 /**************************************************************//**
473 Allocates a new file page to be used in an index tree. NOTE: we assume
474 that the caller has made the reservation for free extents!
475 @retval NULL if no page could be allocated
476 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
477 (init_mtr == mtr, or the page was not previously freed in mtr)
478 @retval block (not allocated or initialized) otherwise */
479 buf_block_t*
btr_page_alloc(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)480 btr_page_alloc(
481 /*===========*/
482 	dict_index_t*	index,		/*!< in: index */
483 	ulint		hint_page_no,	/*!< in: hint of a good page */
484 	byte		file_direction,	/*!< in: direction where a possible
485 					page split is made */
486 	ulint		level,		/*!< in: level where the page is placed
487 					in the tree */
488 	mtr_t*		mtr,		/*!< in/out: mini-transaction
489 					for the allocation */
490 	mtr_t*		init_mtr)	/*!< in/out: mini-transaction
491 					for x-latching and initializing
492 					the page */
493 {
494 	buf_block_t*	new_block;
495 
496 	if (dict_index_is_ibuf(index)) {
497 
498 		return(btr_page_alloc_for_ibuf(index, mtr));
499 	}
500 
501 	new_block = btr_page_alloc_low(
502 		index, hint_page_no, file_direction, level, mtr, init_mtr);
503 
504 	if (new_block) {
505 		buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
506 	}
507 
508 	return(new_block);
509 }
510 
511 /**************************************************************//**
512 Gets the number of pages in a B-tree.
513 @return number of pages, or ULINT_UNDEFINED if the index is unavailable */
514 ulint
btr_get_size(dict_index_t * index,ulint flag,mtr_t * mtr)515 btr_get_size(
516 /*=========*/
517 	dict_index_t*	index,	/*!< in: index */
518 	ulint		flag,	/*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
519 	mtr_t*		mtr)	/*!< in/out: mini-transaction where index
520 				is s-latched */
521 {
522 	fseg_header_t*	seg_header;
523 	page_t*		root;
524 	ulint		n;
525 	ulint		dummy;
526 
527 	ut_ad(srv_read_only_mode
528 	      || mtr_memo_contains(mtr, dict_index_get_lock(index),
529 				   MTR_MEMO_S_LOCK)
530 	      || dict_table_is_intrinsic(index->table));
531 
532 	if (index->page == FIL_NULL
533 	    || dict_index_is_online_ddl(index)
534 	    || !index->is_committed()) {
535 		return(ULINT_UNDEFINED);
536 	}
537 
538 	root = btr_root_get(index, mtr);
539 
540 	if (flag == BTR_N_LEAF_PAGES) {
541 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
542 
543 		fseg_n_reserved_pages(seg_header, &n, mtr);
544 
545 	} else if (flag == BTR_TOTAL_SIZE) {
546 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
547 
548 		n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
549 
550 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
551 
552 		n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
553 	} else {
554 		ut_error;
555 	}
556 
557 	return(n);
558 }
559 
560 /**************************************************************//**
561 Frees a page used in an ibuf tree. Puts the page to the free list of the
562 ibuf tree. */
563 static
564 void
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)565 btr_page_free_for_ibuf(
566 /*===================*/
567 	dict_index_t*	index,	/*!< in: index tree */
568 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
569 	mtr_t*		mtr)	/*!< in: mtr */
570 {
571 	page_t*		root;
572 
573 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
574 	root = btr_root_get(index, mtr);
575 
576 	flst_add_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
577 		       buf_block_get_frame(block)
578 		       + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
579 
580 	ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
581 			    mtr));
582 }
583 
584 /**************************************************************//**
585 Frees a file page used in an index tree. Can be used also to (BLOB)
586 external storage pages. */
587 void
btr_page_free_low(dict_index_t * index,buf_block_t * block,ulint level,mtr_t * mtr)588 btr_page_free_low(
589 /*==============*/
590 	dict_index_t*	index,	/*!< in: index tree */
591 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
592 	ulint		level,	/*!< in: page level (ULINT_UNDEFINED=BLOB) */
593 	mtr_t*		mtr)	/*!< in: mtr */
594 {
595 	fseg_header_t*	seg_header;
596 	page_t*		root;
597 
598 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
599 	/* The page gets invalid for optimistic searches: increment the frame
600 	modify clock */
601 
602 	buf_block_modify_clock_inc(block);
603 
604 	if (dict_index_is_ibuf(index)) {
605 
606 		btr_page_free_for_ibuf(index, block, mtr);
607 
608 		return;
609 	}
610 
611 	root = btr_root_get(index, mtr);
612 
613 	if (level == 0 || level == ULINT_UNDEFINED) {
614 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
615 	} else {
616 		seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
617 	}
618 
619 #ifdef UNIV_GIS_DEBUG
620 	if (dict_index_is_spatial(index)) {
621 		fprintf(stderr, "GIS_DIAG: Freed  %ld\n",
622 			(long) block->page.id.page_no());
623 	}
624 #endif
625 
626 	fseg_free_page(seg_header,
627 		       block->page.id.space(),
628 		       block->page.id.page_no(),
629 		       level != ULINT_UNDEFINED, mtr);
630 
631 	/* The page was marked free in the allocation bitmap, but it
632 	should remain buffer-fixed until mtr_commit(mtr) or until it
633 	is explicitly freed from the mini-transaction. */
634 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
635 	/* TODO: Discard any operations on the page from the redo log
636 	and remove the block from the flush list and the buffer pool.
637 	This would free up buffer pool earlier and reduce writes to
638 	both the tablespace and the redo log. */
639 }
640 
641 /**************************************************************//**
642 Frees a file page used in an index tree. NOTE: cannot free field external
643 storage pages because the page must contain info on its level. */
644 void
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr)645 btr_page_free(
646 /*==========*/
647 	dict_index_t*	index,	/*!< in: index tree */
648 	buf_block_t*	block,	/*!< in: block to be freed, x-latched */
649 	mtr_t*		mtr)	/*!< in: mtr */
650 {
651 	const page_t*	page	= buf_block_get_frame(block);
652 	ulint		level	= btr_page_get_level(page, mtr);
653 
654 	ut_ad(fil_page_index_page_check(block->frame));
655 	ut_ad(level != ULINT_UNDEFINED);
656 	btr_page_free_low(index, block, level, mtr);
657 }
658 
659 /**************************************************************//**
660 Sets the child node file address in a node pointer. */
661 UNIV_INLINE
662 void
btr_node_ptr_set_child_page_no(rec_t * rec,page_zip_des_t * page_zip,const ulint * offsets,ulint page_no,mtr_t * mtr)663 btr_node_ptr_set_child_page_no(
664 /*===========================*/
665 	rec_t*		rec,	/*!< in: node pointer record */
666 	page_zip_des_t*	page_zip,/*!< in/out: compressed page whose uncompressed
667 				part will be updated, or NULL */
668 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
669 	ulint		page_no,/*!< in: child node address */
670 	mtr_t*		mtr)	/*!< in: mtr */
671 {
672 	byte*	field;
673 	ulint	len;
674 
675 	ut_ad(rec_offs_validate(rec, NULL, offsets));
676 	ut_ad(!page_is_leaf(page_align(rec)));
677 	ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
678 
679 	/* The child address is in the last field */
680 	field = rec_get_nth_field(rec, offsets,
681 				  rec_offs_n_fields(offsets) - 1, &len);
682 
683 	ut_ad(len == REC_NODE_PTR_SIZE);
684 
685 	if (page_zip) {
686 		page_zip_write_node_ptr(page_zip, rec,
687 					rec_offs_data_size(offsets),
688 					page_no, mtr);
689 	} else {
690 		mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
691 	}
692 }
693 
694 /************************************************************//**
695 Returns the child page of a node pointer and sx-latches it.
696 @return child page, sx-latched */
697 UNIV_INTERN
698 buf_block_t*
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const ulint * offsets,mtr_t * mtr)699 btr_node_ptr_get_child(
700 /*===================*/
701 	const rec_t*	node_ptr,/*!< in: node pointer */
702 	dict_index_t*	index,	/*!< in: index */
703 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
704 	mtr_t*		mtr)	/*!< in: mtr */
705 {
706 	ut_ad(rec_offs_validate(node_ptr, index, offsets));
707 
708 	const page_id_t	page_id(
709 		page_get_space_id(page_align(node_ptr)),
710 		btr_node_ptr_get_child_page_no(node_ptr, offsets));
711 
712 	return(btr_block_get(page_id, dict_table_page_size(index->table),
713 			     RW_SX_LATCH, index, mtr));
714 }
715 
716 /************************************************************//**
717 Returns the upper level node pointer to a page. It is assumed that mtr holds
718 an sx-latch on the tree.
719 @return rec_get_offsets() of the node pointer record */
720 static
721 ulint*
btr_page_get_father_node_ptr_func(ulint * offsets,mem_heap_t * heap,btr_cur_t * cursor,ulint latch_mode,const char * file,ulint line,mtr_t * mtr)722 btr_page_get_father_node_ptr_func(
723 /*==============================*/
724 	ulint*		offsets,/*!< in: work area for the return value */
725 	mem_heap_t*	heap,	/*!< in: memory heap to use */
726 	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
727 				out: cursor on node pointer record,
728 				its page x-latched */
729 	ulint		latch_mode,/*!< in: BTR_CONT_MODIFY_TREE
730 				or BTR_CONT_SEARCH_TREE */
731 	const char*	file,	/*!< in: file name */
732 	ulint		line,	/*!< in: line where called */
733 	mtr_t*		mtr)	/*!< in: mtr */
734 {
735 	dtuple_t*	tuple;
736 	rec_t*		user_rec;
737 	rec_t*		node_ptr;
738 	ulint		level;
739 	ulint		page_no;
740 	dict_index_t*	index;
741 
742 	ut_ad(latch_mode == BTR_CONT_MODIFY_TREE
743 	      || latch_mode == BTR_CONT_SEARCH_TREE);
744 
745 	page_no = btr_cur_get_block(cursor)->page.id.page_no();
746 	index = btr_cur_get_index(cursor);
747 	ut_ad(!dict_index_is_spatial(index));
748 
749 	ut_ad(srv_read_only_mode
750 	      || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
751 					   MTR_MEMO_X_LOCK
752 					   | MTR_MEMO_SX_LOCK)
753 	      || dict_table_is_intrinsic(index->table));
754 
755 	ut_ad(dict_index_get_page(index) != page_no);
756 
757 	level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
758 
759 	user_rec = btr_cur_get_rec(cursor);
760 	ut_a(page_rec_is_user_rec(user_rec));
761 
762 	tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
763 	if (dict_table_is_intrinsic(index->table)) {
764 		btr_cur_search_to_nth_level_with_no_latch(
765 			index, level + 1, tuple, PAGE_CUR_LE, cursor,
766 			file, line, mtr);
767 	} else {
768 		btr_cur_search_to_nth_level(
769 			index, level + 1, tuple,
770 			PAGE_CUR_LE, latch_mode, cursor, 0,
771 			file, line, mtr);
772 	}
773 
774 	node_ptr = btr_cur_get_rec(cursor);
775 	ut_ad(!page_rec_is_comp(node_ptr)
776 	      || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
777 	offsets = rec_get_offsets(node_ptr, index, offsets,
778 				  ULINT_UNDEFINED, &heap);
779 
780 	if (btr_node_ptr_get_child_page_no(node_ptr, offsets) != page_no) {
781 		rec_t*	print_rec;
782 
783 		ib::error()
784 			<< "Corruption of an index tree: table "
785 			<< index->table->name
786 			<< " index " << index->name
787 			<< ", father ptr page no "
788 			<< btr_node_ptr_get_child_page_no(node_ptr, offsets)
789 			<< ", child page no " << page_no;
790 
791 		print_rec = page_rec_get_next(
792 			page_get_infimum_rec(page_align(user_rec)));
793 		offsets = rec_get_offsets(print_rec, index,
794 					  offsets, ULINT_UNDEFINED, &heap);
795 		page_rec_print(print_rec, offsets);
796 		offsets = rec_get_offsets(node_ptr, index, offsets,
797 					  ULINT_UNDEFINED, &heap);
798 		page_rec_print(node_ptr, offsets);
799 
800 		ib::fatal()
801 			<< "You should dump + drop + reimport the table to"
802 			<< " fix the corruption. If the crash happens at"
803 			<< " database startup. " << FORCE_RECOVERY_MSG
804 			<< " Then dump + drop + reimport.";
805 	}
806 
807 	return(offsets);
808 }
809 
810 #define btr_page_get_father_node_ptr(of,heap,cur,mtr)			\
811 	btr_page_get_father_node_ptr_func(				\
812 		of,heap,cur,BTR_CONT_MODIFY_TREE,__FILE__,__LINE__,mtr)
813 
814 #define btr_page_get_father_node_ptr_for_validate(of,heap,cur,mtr)	\
815 	btr_page_get_father_node_ptr_func(				\
816 		of,heap,cur,BTR_CONT_SEARCH_TREE,__FILE__,__LINE__,mtr)
817 
818 /************************************************************//**
819 Returns the upper level node pointer to a page. It is assumed that mtr holds
820 an x-latch on the tree.
821 @return rec_get_offsets() of the node pointer record */
822 static
823 ulint*
btr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)824 btr_page_get_father_block(
825 /*======================*/
826 	ulint*		offsets,/*!< in: work area for the return value */
827 	mem_heap_t*	heap,	/*!< in: memory heap to use */
828 	dict_index_t*	index,	/*!< in: b-tree index */
829 	buf_block_t*	block,	/*!< in: child page in the index */
830 	mtr_t*		mtr,	/*!< in: mtr */
831 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
832 				its page x-latched */
833 {
834 	rec_t*	rec
835 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
836 								 block)));
837 	btr_cur_position(index, rec, block, cursor);
838 	return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
839 }
840 
841 /************************************************************//**
842 Seeks to the upper level node pointer to a page.
843 It is assumed that mtr holds an x-latch on the tree. */
844 static
845 void
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)846 btr_page_get_father(
847 /*================*/
848 	dict_index_t*	index,	/*!< in: b-tree index */
849 	buf_block_t*	block,	/*!< in: child page in the index */
850 	mtr_t*		mtr,	/*!< in: mtr */
851 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
852 				its page x-latched */
853 {
854 	mem_heap_t*	heap;
855 	rec_t*		rec
856 		= page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
857 								 block)));
858 	btr_cur_position(index, rec, block, cursor);
859 
860 	heap = mem_heap_create(100);
861 	btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
862 	mem_heap_free(heap);
863 }
864 
865 /** Free a B-tree root page. btr_free_but_not_root() must already
866 have been called.
867 In a persistent tablespace, the caller must invoke fsp_init_file_page()
868 before mtr.commit().
869 @param[in,out]	block	index root page
870 @param[in,out]	mtr	mini-transaction */
871 static
872 void
btr_free_root(buf_block_t * block,mtr_t * mtr)873 btr_free_root(
874 	buf_block_t*	block,
875 	mtr_t*		mtr)
876 {
877 	fseg_header_t*	header;
878 
879 	ut_ad(mtr_memo_contains_flagged(mtr, block, MTR_MEMO_PAGE_X_FIX));
880 	ut_ad(mtr->is_named_space(block->page.id.space()));
881 
882 	btr_search_drop_page_hash_index(block);
883 
884 	header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
885 #ifdef UNIV_BTR_DEBUG
886 	ut_a(btr_root_fseg_validate(header, block->page.id.space()));
887 #endif /* UNIV_BTR_DEBUG */
888 
889 	while (!fseg_free_step(header, true, mtr)) {
890 		/* Free the entire segment in small steps. */
891 	}
892 }
893 
894 /** PAGE_INDEX_ID value for freed index B-trees */
895 static const index_id_t	BTR_FREED_INDEX_ID = 0;
896 
897 /** Invalidate an index root page so that btr_free_root_check()
898 will not find it.
899 @param[in,out]	block	index root page
900 @param[in,out]	mtr	mini-transaction */
901 static
902 void
btr_free_root_invalidate(buf_block_t * block,mtr_t * mtr)903 btr_free_root_invalidate(
904 	buf_block_t*	block,
905 	mtr_t*		mtr)
906 {
907 	ut_ad(page_is_root(block->frame));
908 
909 	btr_page_set_index_id(
910 		buf_block_get_frame(block),
911 		buf_block_get_page_zip(block),
912 		BTR_FREED_INDEX_ID, mtr);
913 }
914 
915 /** Prepare to free a B-tree.
916 @param[in]	page_id		page id
917 @param[in]	page_size	page size
918 @param[in]	index_id	PAGE_INDEX_ID contents
919 @param[in,out]	mtr		mini-transaction
920 @return root block, to invoke btr_free_but_not_root() and btr_free_root()
921 @retval NULL if the page is no longer a matching B-tree page */
922 static MY_ATTRIBUTE((warn_unused_result))
923 buf_block_t*
btr_free_root_check(const page_id_t & page_id,const page_size_t & page_size,index_id_t index_id,mtr_t * mtr)924 btr_free_root_check(
925 	const page_id_t&	page_id,
926 	const page_size_t&	page_size,
927 	index_id_t		index_id,
928 	mtr_t*			mtr)
929 {
930 	ut_ad(page_id.space() != srv_tmp_space.space_id());
931 	ut_ad(index_id != BTR_FREED_INDEX_ID);
932 
933 	buf_block_t*	block = buf_page_get(
934 		page_id, page_size, RW_X_LATCH, mtr);
935 	buf_block_dbg_add_level(block, SYNC_TREE_NODE);
936 
937 	if (fil_page_index_page_check(block->frame)
938 	    && index_id == btr_page_get_index_id(block->frame)) {
939 		/* This should be a root page.
940 		It should not be possible to reassign the same
941 		index_id for some other index in the tablespace. */
942 		ut_ad(page_is_root(block->frame));
943 	} else {
944 		block = NULL;
945 	}
946 
947 	return(block);
948 }
949 
950 /** Create the root node for a new index tree.
951 @param[in]	type			type of the index
952 @param[in]	space			space where created
953 @param[in]	page_size		page size
954 @param[in]	index_id		index id
955 @param[in]	index			index, or NULL when applying TRUNCATE
956 log record during recovery
957 @param[in]	btr_redo_create_info	used for applying TRUNCATE log
958 @param[in]	mtr			mini-transaction handle
959 record during recovery
960 @return page number of the created root, FIL_NULL if did not succeed */
961 ulint
btr_create(ulint type,ulint space,const page_size_t & page_size,index_id_t index_id,dict_index_t * index,const btr_create_t * btr_redo_create_info,mtr_t * mtr)962 btr_create(
963 	ulint			type,
964 	ulint			space,
965 	const page_size_t&	page_size,
966 	index_id_t		index_id,
967 	dict_index_t*		index,
968 	const btr_create_t*	btr_redo_create_info,
969 	mtr_t*			mtr)
970 {
971 	ulint			page_no;
972 	buf_block_t*		block;
973 	buf_frame_t*		frame;
974 	page_t*			page;
975 	page_zip_des_t*		page_zip;
976 
977 	ut_ad(mtr->is_named_space(space));
978 	ut_ad(index_id != BTR_FREED_INDEX_ID);
979 
980 	/* Create the two new segments (one, in the case of an ibuf tree) for
981 	the index tree; the segment headers are put on the allocated root page
982 	(for an ibuf tree, not in the root, but on a separate ibuf header
983 	page) */
984 
985 	if (type & DICT_IBUF) {
986 		/* Allocate first the ibuf header page */
987 		buf_block_t*	ibuf_hdr_block = fseg_create(
988 			space, 0,
989 			IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
990 
991 		if (ibuf_hdr_block == NULL) {
992 			return(FIL_NULL);
993 		}
994 
995 		buf_block_dbg_add_level(
996 			ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
997 
998 		ut_ad(ibuf_hdr_block->page.id.page_no()
999 		      == IBUF_HEADER_PAGE_NO);
1000 		/* Allocate then the next page to the segment: it will be the
1001 		tree root page */
1002 
1003 		block = fseg_alloc_free_page(
1004 			buf_block_get_frame(ibuf_hdr_block)
1005 			+ IBUF_HEADER + IBUF_TREE_SEG_HEADER,
1006 			IBUF_TREE_ROOT_PAGE_NO,
1007 			FSP_UP, mtr);
1008 		ut_ad(block->page.id.page_no() == IBUF_TREE_ROOT_PAGE_NO);
1009 	} else {
1010 		block = fseg_create(space, 0,
1011 				    PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
1012 	}
1013 
1014 	if (block == NULL) {
1015 
1016 		return(FIL_NULL);
1017 	}
1018 
1019 	page_no = block->page.id.page_no();
1020 	frame = buf_block_get_frame(block);
1021 
1022 	if (type & DICT_IBUF) {
1023 		/* It is an insert buffer tree: initialize the free list */
1024 		buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
1025 
1026 		ut_ad(page_no == IBUF_TREE_ROOT_PAGE_NO);
1027 
1028 		flst_init(frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
1029 	} else {
1030 		/* It is a non-ibuf tree: create a file segment for leaf
1031 		pages */
1032 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1033 
1034 		if (!fseg_create(space, page_no,
1035 				 PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr)) {
1036 			/* Not enough space for new segment, free root
1037 			segment before return. */
1038 			btr_free_root(block, mtr);
1039 			if (!dict_table_is_temporary(index->table)) {
1040 				btr_free_root_invalidate(block, mtr);
1041 			}
1042 
1043 			return(FIL_NULL);
1044 		}
1045 
1046 		/* The fseg create acquires a second latch on the page,
1047 		therefore we must declare it: */
1048 		buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1049 	}
1050 
1051 	/* Create a new index page on the allocated segment page */
1052 	page_zip = buf_block_get_page_zip(block);
1053 
1054 	if (page_zip) {
1055 		if (index != NULL) {
1056 			page = page_create_zip(block, index, 0, 0, NULL, mtr);
1057 		} else {
1058 			/* Create a compressed index page when applying
1059 			TRUNCATE log record during recovery */
1060 			ut_ad(btr_redo_create_info != NULL);
1061 
1062 			redo_page_compress_t	page_comp_info;
1063 
1064 			page_comp_info.type = type;
1065 
1066 			page_comp_info.index_id = index_id;
1067 
1068 			page_comp_info.n_fields =
1069 				btr_redo_create_info->n_fields;
1070 
1071 			page_comp_info.field_len =
1072 				btr_redo_create_info->field_len;
1073 
1074 			page_comp_info.fields = btr_redo_create_info->fields;
1075 
1076 			page_comp_info.trx_id_pos =
1077 				btr_redo_create_info->trx_id_pos;
1078 
1079 			page = page_create_zip(block, NULL, 0, 0,
1080 					       &page_comp_info, mtr);
1081 		}
1082 	} else {
1083 		if (index != NULL) {
1084 			page = page_create(block, mtr,
1085 					   dict_table_is_comp(index->table),
1086 					   dict_index_is_spatial(index));
1087 		} else {
1088 			ut_ad(btr_redo_create_info != NULL);
1089 			page = page_create(
1090 				block, mtr, btr_redo_create_info->format_flags,
1091 				type == DICT_SPATIAL);
1092 		}
1093 		/* Set the level of the new index page */
1094 		btr_page_set_level(page, NULL, 0, mtr);
1095 	}
1096 
1097 	/* Set the index id of the page */
1098 	btr_page_set_index_id(page, page_zip, index_id, mtr);
1099 
1100 	/* Set the next node and previous node fields */
1101 	btr_page_set_next(page, page_zip, FIL_NULL, mtr);
1102 	btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
1103 
1104 	/* We reset the free bits for the page to allow creation of several
1105 	trees in the same mtr, otherwise the latch on a bitmap page would
1106 	prevent it because of the latching order.
1107 
1108 	index will be NULL if we are recreating the table during recovery
1109 	on behalf of TRUNCATE.
1110 
1111 	Note: Insert Buffering is disabled for temporary tables given that
1112 	most temporary tables are smaller in size and short-lived. */
1113 	if (!(type & DICT_CLUSTERED)
1114 	    && (index == NULL || !dict_table_is_temporary(index->table))) {
1115 
1116 		ibuf_reset_free_bits(block);
1117 	}
1118 
1119 	/* In the following assertion we test that two records of maximum
1120 	allowed size fit on the root page: this fact is needed to ensure
1121 	correctness of split algorithms */
1122 
1123 	ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);
1124 
1125 	return(page_no);
1126 }
1127 
1128 /** Free a B-tree except the root page. The root page MUST be freed after
1129 this by calling btr_free_root.
1130 @param[in,out]	block		root page
1131 @param[in]	log_mode	mtr logging mode */
1132 static
1133 void
btr_free_but_not_root(buf_block_t * block,mtr_log_t log_mode)1134 btr_free_but_not_root(
1135 	buf_block_t*	block,
1136 	mtr_log_t	log_mode)
1137 {
1138 	ibool	finished;
1139 	mtr_t	mtr;
1140 
1141 	ut_ad(page_is_root(block->frame));
1142 leaf_loop:
1143 	mtr_start(&mtr);
1144 	mtr_set_log_mode(&mtr, log_mode);
1145 	mtr.set_named_space(block->page.id.space());
1146 
1147 	page_t*	root = block->frame;
1148 
1149 #ifdef UNIV_BTR_DEBUG
1150 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1151 				    + root, block->page.id.space()));
1152 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1153 				    + root, block->page.id.space()));
1154 #endif /* UNIV_BTR_DEBUG */
1155 
1156 	/* NOTE: page hash indexes are dropped when a page is freed inside
1157 	fsp0fsp. */
1158 
1159 	finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
1160 				  true, &mtr);
1161 	mtr_commit(&mtr);
1162 
1163 	if (!finished) {
1164 
1165 		goto leaf_loop;
1166 	}
1167 top_loop:
1168 	mtr_start(&mtr);
1169 	mtr_set_log_mode(&mtr, log_mode);
1170 	mtr.set_named_space(block->page.id.space());
1171 
1172 	root = block->frame;
1173 
1174 #ifdef UNIV_BTR_DEBUG
1175 	ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1176 				    + root, block->page.id.space()));
1177 #endif /* UNIV_BTR_DEBUG */
1178 
1179 	finished = fseg_free_step_not_header(
1180 		root + PAGE_HEADER + PAGE_BTR_SEG_TOP, true, &mtr);
1181 	mtr_commit(&mtr);
1182 
1183 	if (!finished) {
1184 
1185 		goto top_loop;
1186 	}
1187 }
1188 
1189 /** Free a persistent index tree if it exists.
1190 @param[in]	page_id		root page id
1191 @param[in]	page_size	page size
1192 @param[in]	index_id	PAGE_INDEX_ID contents
1193 @param[in,out]	mtr		mini-transaction */
1194 void
btr_free_if_exists(const page_id_t & page_id,const page_size_t & page_size,index_id_t index_id,mtr_t * mtr)1195 btr_free_if_exists(
1196 	const page_id_t&	page_id,
1197 	const page_size_t&	page_size,
1198 	index_id_t		index_id,
1199 	mtr_t*			mtr)
1200 {
1201 	buf_block_t* root = btr_free_root_check(
1202 		page_id, page_size, index_id, mtr);
1203 
1204 	if (root == NULL) {
1205 		return;
1206 	}
1207 
1208 	btr_free_but_not_root(root, mtr->get_log_mode());
1209 	mtr->set_named_space(page_id.space());
1210 	btr_free_root(root, mtr);
1211 	btr_free_root_invalidate(root, mtr);
1212 }
1213 
1214 /** Free an index tree in a temporary tablespace or during TRUNCATE TABLE.
1215 @param[in]	page_id		root page id
1216 @param[in]	page_size	page size */
1217 void
btr_free(const page_id_t & page_id,const page_size_t & page_size)1218 btr_free(
1219 	const page_id_t&	page_id,
1220 	const page_size_t&	page_size)
1221 {
1222 	mtr_t		mtr;
1223 	mtr.start();
1224 	mtr.set_log_mode(MTR_LOG_NO_REDO);
1225 
1226 	buf_block_t*	block = buf_page_get(
1227 		page_id, page_size, RW_X_LATCH, &mtr);
1228 
1229 	ut_ad(page_is_root(block->frame));
1230 
1231 	btr_free_but_not_root(block, MTR_LOG_NO_REDO);
1232 	btr_free_root(block, &mtr);
1233 	mtr.commit();
1234 }
1235 #endif /* !UNIV_HOTBACKUP */
1236 
1237 /*************************************************************//**
1238 Reorganizes an index page.
1239 
1240 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1241 if this is a compressed leaf page in a secondary index. This has to
1242 be done either within the same mini-transaction, or by invoking
1243 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1244 IBUF_BITMAP_FREE is unaffected by reorganization.
1245 
1246 @retval true if the operation was successful
1247 @retval false if it is a compressed page, and recompression failed */
1248 bool
btr_page_reorganize_low(bool recovery,ulint z_level,page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1249 btr_page_reorganize_low(
1250 /*====================*/
1251 	bool		recovery,/*!< in: true if called in recovery:
1252 				locks should not be updated, i.e.,
1253 				there cannot exist locks on the
1254 				page, and a hash index should not be
1255 				dropped: it cannot exist */
1256 	ulint		z_level,/*!< in: compression level to be used
1257 				if dealing with compressed page */
1258 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1259 	dict_index_t*	index,	/*!< in: the index tree of the page */
1260 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1261 {
1262 	buf_block_t*	block		= page_cur_get_block(cursor);
1263 #ifndef UNIV_HOTBACKUP
1264 	buf_pool_t*	buf_pool	= buf_pool_from_bpage(&block->page);
1265 #endif /* !UNIV_HOTBACKUP */
1266 	page_t*		page		= buf_block_get_frame(block);
1267 	page_zip_des_t*	page_zip	= buf_block_get_page_zip(block);
1268 	buf_block_t*	temp_block;
1269 	page_t*		temp_page;
1270 	ulint		data_size1;
1271 	ulint		data_size2;
1272 	ulint		max_ins_size1;
1273 	ulint		max_ins_size2;
1274 	bool		success		= false;
1275 	ulint		pos;
1276 	bool		log_compressed;
1277 	bool		is_spatial;
1278 
1279 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1280 	btr_assert_not_corrupted(block, index);
1281 #ifdef UNIV_ZIP_DEBUG
1282 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1283 #endif /* UNIV_ZIP_DEBUG */
1284 	data_size1 = page_get_data_size(page);
1285 	max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);
1286 
1287 	/* Turn logging off */
1288 	mtr_log_t	log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1289 
1290 #ifndef UNIV_HOTBACKUP
1291 	temp_block = buf_block_alloc(buf_pool);
1292 #else /* !UNIV_HOTBACKUP */
1293 	ut_ad(block == back_block1);
1294 	temp_block = back_block2;
1295 #endif /* !UNIV_HOTBACKUP */
1296 	temp_page = temp_block->frame;
1297 
1298 	MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
1299 
1300 	/* This function can be called by log redo with a "dummy" index.
1301 	So we would trust more on the original page's type */
1302 	is_spatial = (fil_page_get_type(page) == FIL_PAGE_RTREE
1303 		      || dict_index_is_spatial(index));
1304 
1305 	/* Copy the old page to temporary space */
1306 	buf_frame_copy(temp_page, page);
1307 
1308 #ifndef UNIV_HOTBACKUP
1309 	if (!recovery) {
1310 		btr_search_drop_page_hash_index(block);
1311 	}
1312 #endif /* !UNIV_HOTBACKUP */
1313 
1314 	/* Save the cursor position. */
1315 	pos = page_rec_get_n_recs_before(page_cur_get_rec(cursor));
1316 
1317 	/* Recreate the page: note that global data on page (possible
1318 	segment headers, next page-field, etc.) is preserved intact */
1319 
1320 	page_create(block, mtr, dict_table_is_comp(index->table), is_spatial);
1321 
1322 	/* Copy the records from the temporary space to the recreated page;
1323 	do not copy the lock bits yet */
1324 
1325 	page_copy_rec_list_end_no_locks(block, temp_block,
1326 					page_get_infimum_rec(temp_page),
1327 					index, mtr);
1328 
1329 	/* Multiple transactions cannot simultaneously operate on the
1330 	same temp-table in parallel.
1331 	max_trx_id is ignored for temp tables because it not required
1332 	for MVCC. */
1333 	if (dict_index_is_sec_or_ibuf(index)
1334 	    && page_is_leaf(page)
1335 	    && !dict_table_is_temporary(index->table)) {
1336 		/* Copy max trx id to recreated page */
1337 		trx_id_t	max_trx_id = page_get_max_trx_id(temp_page);
1338 		page_set_max_trx_id(block, NULL, max_trx_id, mtr);
1339 		/* In crash recovery, dict_index_is_sec_or_ibuf() always
1340 		holds, even for clustered indexes.  max_trx_id is
1341 		unused in clustered index pages. */
1342 		ut_ad(max_trx_id != 0 || recovery);
1343 	}
1344 
1345 	/* If innodb_log_compressed_pages is ON, page reorganize should log the
1346 	compressed page image.*/
1347 	log_compressed = page_zip && page_zip_log_pages;
1348 
1349 	if (log_compressed) {
1350 		mtr_set_log_mode(mtr, log_mode);
1351 	}
1352 
1353 	if (page_zip
1354 	    && !page_zip_compress(page_zip, page, index, z_level, NULL, mtr)) {
1355 
1356 		/* Restore the old page and exit. */
1357 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1358 		/* Check that the bytes that we skip are identical. */
1359 		ut_a(!memcmp(page, temp_page, PAGE_HEADER));
1360 		ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
1361 			     PAGE_HEADER + PAGE_N_RECS + temp_page,
1362 			     PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
1363 		ut_a(!memcmp(UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + page,
1364 			     UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + temp_page,
1365 			     FIL_PAGE_DATA_END));
1366 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1367 
1368 		memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
1369 		       PAGE_N_RECS - PAGE_N_DIR_SLOTS);
1370 		memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
1371 		       UNIV_PAGE_SIZE - PAGE_DATA - FIL_PAGE_DATA_END);
1372 
1373 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1374 		ut_a(!memcmp(page, temp_page, UNIV_PAGE_SIZE));
1375 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1376 
1377 		goto func_exit;
1378 	}
1379 
1380 #ifndef UNIV_HOTBACKUP
1381 	/* No locks are acquried for intrinsic tables. */
1382 	if (!recovery && !dict_table_is_locking_disabled(index->table)) {
1383 		/* Update the record lock bitmaps */
1384 		lock_move_reorganize_page(block, temp_block);
1385 	}
1386 #endif /* !UNIV_HOTBACKUP */
1387 
1388 	data_size2 = page_get_data_size(page);
1389 	max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);
1390 
1391 	if (data_size1 != data_size2 || max_ins_size1 != max_ins_size2) {
1392 		ib::error()
1393 			<< "Page old data size " << data_size1
1394 			<< " new data size " << data_size2
1395 			<< ", page old max ins size " << max_ins_size1
1396 			<< " new max ins size " << max_ins_size2;
1397 
1398 		ib::error() << BUG_REPORT_MSG;
1399 		ut_ad(0);
1400 	} else {
1401 		success = true;
1402 	}
1403 
1404 	/* Restore the cursor position. */
1405 	if (pos > 0) {
1406 		cursor->rec = page_rec_get_nth(page, pos);
1407 	} else {
1408 		ut_ad(cursor->rec == page_get_infimum_rec(page));
1409 	}
1410 
1411 func_exit:
1412 #ifdef UNIV_ZIP_DEBUG
1413 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1414 #endif /* UNIV_ZIP_DEBUG */
1415 #ifndef UNIV_HOTBACKUP
1416 	buf_block_free(temp_block);
1417 #endif /* !UNIV_HOTBACKUP */
1418 
1419 	/* Restore logging mode */
1420 	mtr_set_log_mode(mtr, log_mode);
1421 
1422 #ifndef UNIV_HOTBACKUP
1423 	if (success) {
1424 		mlog_id_t	type;
1425 		byte*		log_ptr;
1426 
1427 		/* Write the log record */
1428 		if (page_zip) {
1429 			ut_ad(page_is_comp(page));
1430 			type = MLOG_ZIP_PAGE_REORGANIZE;
1431 		} else if (page_is_comp(page)) {
1432 			type = MLOG_COMP_PAGE_REORGANIZE;
1433 		} else {
1434 			type = MLOG_PAGE_REORGANIZE;
1435 		}
1436 
1437 		log_ptr = log_compressed
1438 			? NULL
1439 			: mlog_open_and_write_index(
1440 				mtr, page, index, type,
1441 				page_zip ? 1 : 0);
1442 
1443 		/* For compressed pages write the compression level. */
1444 		if (log_ptr && page_zip) {
1445 			mach_write_to_1(log_ptr, z_level);
1446 			mlog_close(mtr, log_ptr + 1);
1447 		}
1448 
1449 		MONITOR_INC(MONITOR_INDEX_REORG_SUCCESSFUL);
1450 	}
1451 #endif /* !UNIV_HOTBACKUP */
1452 
1453 	return(success);
1454 }
1455 
1456 /*************************************************************//**
1457 Reorganizes an index page.
1458 
1459 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1460 if this is a compressed leaf page in a secondary index. This has to
1461 be done either within the same mini-transaction, or by invoking
1462 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1463 IBUF_BITMAP_FREE is unaffected by reorganization.
1464 
1465 @retval true if the operation was successful
1466 @retval false if it is a compressed page, and recompression failed */
1467 static MY_ATTRIBUTE((nonnull))
1468 bool
btr_page_reorganize_block(bool recovery,ulint z_level,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1469 btr_page_reorganize_block(
1470 /*======================*/
1471 	bool		recovery,/*!< in: true if called in recovery:
1472 				locks should not be updated, i.e.,
1473 				there cannot exist locks on the
1474 				page, and a hash index should not be
1475 				dropped: it cannot exist */
1476 	ulint		z_level,/*!< in: compression level to be used
1477 				if dealing with compressed page */
1478 	buf_block_t*	block,	/*!< in/out: B-tree page */
1479 	dict_index_t*	index,	/*!< in: the index tree of the page */
1480 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1481 {
1482 	page_cur_t	cur;
1483 	page_cur_set_before_first(block, &cur);
1484 
1485 	return(btr_page_reorganize_low(recovery, z_level, &cur, index, mtr));
1486 }
1487 
1488 #ifndef UNIV_HOTBACKUP
1489 /*************************************************************//**
1490 Reorganizes an index page.
1491 
1492 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1493 if this is a compressed leaf page in a secondary index. This has to
1494 be done either within the same mini-transaction, or by invoking
1495 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1496 IBUF_BITMAP_FREE is unaffected by reorganization.
1497 
1498 @retval true if the operation was successful
1499 @retval false if it is a compressed page, and recompression failed */
1500 bool
btr_page_reorganize(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1501 btr_page_reorganize(
1502 /*================*/
1503 	page_cur_t*	cursor,	/*!< in/out: page cursor */
1504 	dict_index_t*	index,	/*!< in: the index tree of the page */
1505 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
1506 {
1507 	return(btr_page_reorganize_low(false, page_zip_level,
1508 				       cursor, index, mtr));
1509 }
1510 #endif /* !UNIV_HOTBACKUP */
1511 
1512 /***********************************************************//**
1513 Parses a redo log record of reorganizing a page.
1514 @return end of log record or NULL */
1515 byte*
btr_parse_page_reorganize(byte * ptr,byte * end_ptr,dict_index_t * index,bool compressed,buf_block_t * block,mtr_t * mtr)1516 btr_parse_page_reorganize(
1517 /*======================*/
1518 	byte*		ptr,	/*!< in: buffer */
1519 	byte*		end_ptr,/*!< in: buffer end */
1520 	dict_index_t*	index,	/*!< in: record descriptor */
1521 	bool		compressed,/*!< in: true if compressed page */
1522 	buf_block_t*	block,	/*!< in: page to be reorganized, or NULL */
1523 	mtr_t*		mtr)	/*!< in: mtr or NULL */
1524 {
1525 	ulint	level;
1526 
1527 	ut_ad(ptr != NULL);
1528 	ut_ad(end_ptr != NULL);
1529 	ut_ad(index != NULL);
1530 
1531 	/* If dealing with a compressed page the record has the
1532 	compression level used during original compression written in
1533 	one byte. Otherwise record is empty. */
1534 	if (compressed) {
1535 		if (ptr == end_ptr) {
1536 			return(NULL);
1537 		}
1538 
1539 		level = mach_read_from_1(ptr);
1540 
1541 		ut_a(level <= 9);
1542 		++ptr;
1543 	} else {
1544 		level = page_zip_level;
1545 	}
1546 
1547 	if (block != NULL) {
1548 		btr_page_reorganize_block(true, level, block, index, mtr);
1549 	}
1550 
1551 	return(ptr);
1552 }
1553 
1554 #ifndef UNIV_HOTBACKUP
1555 /*************************************************************//**
1556 Empties an index page.  @see btr_page_create(). */
1557 static
1558 void
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1559 btr_page_empty(
1560 /*===========*/
1561 	buf_block_t*	block,	/*!< in: page to be emptied */
1562 	page_zip_des_t*	page_zip,/*!< out: compressed page, or NULL */
1563 	dict_index_t*	index,	/*!< in: index of the page */
1564 	ulint		level,	/*!< in: the B-tree level of the page */
1565 	mtr_t*		mtr)	/*!< in: mtr */
1566 {
1567 	page_t*	page = buf_block_get_frame(block);
1568 
1569 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1570 	ut_ad(page_zip == buf_block_get_page_zip(block));
1571 #ifdef UNIV_ZIP_DEBUG
1572 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1573 #endif /* UNIV_ZIP_DEBUG */
1574 
1575 	btr_search_drop_page_hash_index(block);
1576 
1577 	/* Recreate the page: note that global data on page (possible
1578 	segment headers, next page-field, etc.) is preserved intact */
1579 
1580 	if (page_zip) {
1581 		page_create_zip(block, index, level, 0, NULL, mtr);
1582 	} else {
1583 		page_create(block, mtr, dict_table_is_comp(index->table),
1584 			    dict_index_is_spatial(index));
1585 		btr_page_set_level(page, NULL, level, mtr);
1586 	}
1587 }
1588 
1589 /*************************************************************//**
1590 Makes tree one level higher by splitting the root, and inserts
1591 the tuple. It is assumed that mtr contains an x-latch on the tree.
1592 NOTE that the operation of this function must always succeed,
1593 we cannot reverse it: therefore enough free disk space must be
1594 guaranteed to be available before this function is called.
1595 @return inserted record */
1596 rec_t*
btr_root_raise_and_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1597 btr_root_raise_and_insert(
1598 /*======================*/
1599 	ulint		flags,	/*!< in: undo logging and locking flags */
1600 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert: must be
1601 				on the root page; when the function returns,
1602 				the cursor is positioned on the predecessor
1603 				of the inserted record */
1604 	ulint**		offsets,/*!< out: offsets on inserted record */
1605 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1606 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
1607 	ulint		n_ext,	/*!< in: number of externally stored columns */
1608 	mtr_t*		mtr)	/*!< in: mtr */
1609 {
1610 	dict_index_t*	index;
1611 	page_t*		root;
1612 	page_t*		new_page;
1613 	ulint		new_page_no;
1614 	rec_t*		rec;
1615 	dtuple_t*	node_ptr;
1616 	ulint		level;
1617 	rec_t*		node_ptr_rec;
1618 	page_cur_t*	page_cursor;
1619 	page_zip_des_t*	root_page_zip;
1620 	page_zip_des_t*	new_page_zip;
1621 	buf_block_t*	root_block;
1622 	buf_block_t*	new_block;
1623 
1624 	root = btr_cur_get_page(cursor);
1625 	root_block = btr_cur_get_block(cursor);
1626 	root_page_zip = buf_block_get_page_zip(root_block);
1627 	ut_ad(!page_is_empty(root));
1628 	index = btr_cur_get_index(cursor);
1629 #ifdef UNIV_ZIP_DEBUG
1630 	ut_a(!root_page_zip || page_zip_validate(root_page_zip, root, index));
1631 #endif /* UNIV_ZIP_DEBUG */
1632 #ifdef UNIV_BTR_DEBUG
1633 	if (!dict_index_is_ibuf(index)) {
1634 		ulint	space = dict_index_get_space(index);
1635 
1636 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1637 					    + root, space));
1638 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1639 					    + root, space));
1640 	}
1641 
1642 	ut_a(dict_index_get_page(index) == page_get_page_no(root));
1643 #endif /* UNIV_BTR_DEBUG */
1644 	ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
1645 					MTR_MEMO_X_LOCK
1646 					| MTR_MEMO_SX_LOCK)
1647 	      || dict_table_is_intrinsic(index->table));
1648 	ut_ad(mtr_is_block_fix(
1649 		mtr, root_block, MTR_MEMO_PAGE_X_FIX, index->table));
1650 
1651 	/* Allocate a new page to the tree. Root splitting is done by first
1652 	moving the root records to the new page, emptying the root, putting
1653 	a node pointer to the new page, and then splitting the new page. */
1654 
1655 	level = btr_page_get_level(root, mtr);
1656 
1657 	new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1658 
1659 	new_page = buf_block_get_frame(new_block);
1660 	new_page_zip = buf_block_get_page_zip(new_block);
1661 	ut_a(!new_page_zip == !root_page_zip);
1662 	ut_a(!new_page_zip
1663 	     || page_zip_get_size(new_page_zip)
1664 	     == page_zip_get_size(root_page_zip));
1665 
1666 	btr_page_create(new_block, new_page_zip, index, level, mtr);
1667 
1668 	/* Set the next node and previous node fields of new page */
1669 	btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
1670 	btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
1671 
1672 	/* Copy the records from root to the new page one by one. */
1673 
1674 	if (0
1675 #ifdef UNIV_ZIP_COPY
1676 	    || new_page_zip
1677 #endif /* UNIV_ZIP_COPY */
1678 	    || !page_copy_rec_list_end(new_block, root_block,
1679 				       page_get_infimum_rec(root),
1680 				       index, mtr)) {
1681 		ut_a(new_page_zip);
1682 
1683 		/* Copy the page byte for byte. */
1684 		page_zip_copy_recs(new_page_zip, new_page,
1685 				   root_page_zip, root, index, mtr);
1686 
1687 		/* Update the lock table and possible hash index. */
1688 
1689 		if (!dict_table_is_locking_disabled(index->table)) {
1690 			lock_move_rec_list_end(new_block, root_block,
1691 					       page_get_infimum_rec(root));
1692 		}
1693 
1694 		/* Move any existing predicate locks */
1695 		if (dict_index_is_spatial(index)) {
1696 			lock_prdt_rec_move(new_block, root_block);
1697 		}
1698 
1699 		btr_search_move_or_delete_hash_entries(new_block, root_block,
1700 						       index);
1701 	}
1702 
1703 	/* If this is a pessimistic insert which is actually done to
1704 	perform a pessimistic update then we have stored the lock
1705 	information of the record to be inserted on the infimum of the
1706 	root page: we cannot discard the lock structs on the root page */
1707 
1708 	if (!dict_table_is_locking_disabled(index->table)) {
1709 		lock_update_root_raise(new_block, root_block);
1710 	}
1711 
1712 	/* Create a memory heap where the node pointer is stored */
1713 	if (!*heap) {
1714 		*heap = mem_heap_create(1000);
1715 	}
1716 
1717 	rec = page_rec_get_next(page_get_infimum_rec(new_page));
1718 	new_page_no = new_block->page.id.page_no();
1719 
1720 	/* Build the node pointer (= node key and page address) for the
1721 	child */
1722 	if (dict_index_is_spatial(index)) {
1723 		rtr_mbr_t		new_mbr;
1724 
1725 		rtr_page_cal_mbr(index, new_block, &new_mbr, *heap);
1726 		node_ptr = rtr_index_build_node_ptr(
1727 			index, &new_mbr, rec, new_page_no, *heap, level);
1728 	} else {
1729 		node_ptr = dict_index_build_node_ptr(
1730 			index, rec, new_page_no, *heap, level);
1731 	}
1732 	/* The node pointer must be marked as the predefined minimum record,
1733 	as there is no lower alphabetical limit to records in the leftmost
1734 	node of a level: */
1735 	dtuple_set_info_bits(node_ptr,
1736 			     dtuple_get_info_bits(node_ptr)
1737 			     | REC_INFO_MIN_REC_FLAG);
1738 
1739 	/* Rebuild the root page to get free space */
1740 	btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
1741 
1742 	/* Set the next node and previous node fields, although
1743 	they should already have been set.  The previous node field
1744 	must be FIL_NULL if root_page_zip != NULL, because the
1745 	REC_INFO_MIN_REC_FLAG (of the first user record) will be
1746 	set if and only if btr_page_get_prev() == FIL_NULL. */
1747 	btr_page_set_next(root, root_page_zip, FIL_NULL, mtr);
1748 	btr_page_set_prev(root, root_page_zip, FIL_NULL, mtr);
1749 
1750 	page_cursor = btr_cur_get_page_cur(cursor);
1751 
1752 	/* Insert node pointer to the root */
1753 
1754 	page_cur_set_before_first(root_block, page_cursor);
1755 
1756 	node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
1757 					     index, offsets, heap, 0, mtr);
1758 
1759 	/* The root page should only contain the node pointer
1760 	to new_page at this point.  Thus, the data should fit. */
1761 	ut_a(node_ptr_rec);
1762 
1763 	/* We play safe and reset the free bits for the new page */
1764 
1765 	if (!dict_index_is_clust(index)
1766 	    && !dict_table_is_temporary(index->table)) {
1767 		ibuf_reset_free_bits(new_block);
1768 	}
1769 
1770 	/* Reposition the cursor to the child node */
1771 	page_cur_search(new_block, index, tuple, page_cursor);
1772 
1773 	/* Split the child and insert tuple */
1774 	if (dict_index_is_spatial(index)) {
1775 		/* Split rtree page and insert tuple */
1776 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
1777 						 tuple, n_ext, mtr));
1778 	} else {
1779 		return(btr_page_split_and_insert(flags, cursor, offsets, heap,
1780 						 tuple, n_ext, mtr));
1781 	}
1782 }
1783 
1784 /*************************************************************//**
1785 Decides if the page should be split at the convergence point of inserts
1786 converging to the left.
1787 @return TRUE if split recommended */
1788 ibool
btr_page_get_split_rec_to_left(btr_cur_t * cursor,rec_t ** split_rec)1789 btr_page_get_split_rec_to_left(
1790 /*===========================*/
1791 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
1792 	rec_t**		split_rec) /*!< out: if split recommended,
1793 				the first record on upper half page,
1794 				or NULL if tuple to be inserted should
1795 				be first */
1796 {
1797 	page_t*	page;
1798 	rec_t*	insert_point;
1799 	rec_t*	infimum;
1800 
1801 	page = btr_cur_get_page(cursor);
1802 	insert_point = btr_cur_get_rec(cursor);
1803 
1804 	if (page_header_get_ptr(page, PAGE_LAST_INSERT)
1805 	    == page_rec_get_next(insert_point)) {
1806 
1807 		infimum = page_get_infimum_rec(page);
1808 
1809 		/* If the convergence is in the middle of a page, include also
1810 		the record immediately before the new insert to the upper
1811 		page. Otherwise, we could repeatedly move from page to page
1812 		lots of records smaller than the convergence point. */
1813 
1814 		if (infimum != insert_point
1815 		    && page_rec_get_next(infimum) != insert_point) {
1816 
1817 			*split_rec = insert_point;
1818 		} else {
1819 			*split_rec = page_rec_get_next(insert_point);
1820 		}
1821 
1822 		return(TRUE);
1823 	}
1824 
1825 	return(FALSE);
1826 }
1827 
1828 /*************************************************************//**
1829 Decides if the page should be split at the convergence point of inserts
1830 converging to the right.
1831 @return TRUE if split recommended */
1832 ibool
btr_page_get_split_rec_to_right(btr_cur_t * cursor,rec_t ** split_rec)1833 btr_page_get_split_rec_to_right(
1834 /*============================*/
1835 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert */
1836 	rec_t**		split_rec) /*!< out: if split recommended,
1837 				the first record on upper half page,
1838 				or NULL if tuple to be inserted should
1839 				be first */
1840 {
1841 	page_t*	page;
1842 	rec_t*	insert_point;
1843 
1844 	page = btr_cur_get_page(cursor);
1845 	insert_point = btr_cur_get_rec(cursor);
1846 
1847 	/* We use eager heuristics: if the new insert would be right after
1848 	the previous insert on the same page, we assume that there is a
1849 	pattern of sequential inserts here. */
1850 
1851 	if (page_header_get_ptr(page, PAGE_LAST_INSERT) == insert_point) {
1852 
1853 		rec_t*	next_rec;
1854 
1855 		next_rec = page_rec_get_next(insert_point);
1856 
1857 		if (page_rec_is_supremum(next_rec)) {
1858 split_at_new:
1859 			/* Split at the new record to insert */
1860 			*split_rec = NULL;
1861 		} else {
1862 			rec_t*	next_next_rec = page_rec_get_next(next_rec);
1863 			if (page_rec_is_supremum(next_next_rec)) {
1864 
1865 				goto split_at_new;
1866 			}
1867 
1868 			/* If there are >= 2 user records up from the insert
1869 			point, split all but 1 off. We want to keep one because
1870 			then sequential inserts can use the adaptive hash
1871 			index, as they can do the necessary checks of the right
1872 			search position just by looking at the records on this
1873 			page. */
1874 
1875 			*split_rec = next_next_rec;
1876 		}
1877 
1878 		return(TRUE);
1879 	}
1880 
1881 	return(FALSE);
1882 }
1883 
1884 /*************************************************************//**
1885 Calculates a split record such that the tuple will certainly fit on
1886 its half-page when the split is performed. We assume in this function
1887 only that the cursor page has at least one user record.
1888 @return split record, or NULL if tuple will be the first record on
1889 the lower or upper half-page (determined by btr_page_tuple_smaller()) */
1890 static
1891 rec_t*
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext)1892 btr_page_get_split_rec(
1893 /*===================*/
1894 	btr_cur_t*	cursor,	/*!< in: cursor at which insert should be made */
1895 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
1896 	ulint		n_ext)	/*!< in: number of externally stored columns */
1897 {
1898 	page_t*		page;
1899 	page_zip_des_t*	page_zip;
1900 	ulint		insert_size;
1901 	ulint		free_space;
1902 	ulint		total_data;
1903 	ulint		total_n_recs;
1904 	ulint		total_space;
1905 	ulint		incl_data;
1906 	rec_t*		ins_rec;
1907 	rec_t*		rec;
1908 	rec_t*		next_rec;
1909 	ulint		n;
1910 	mem_heap_t*	heap;
1911 	ulint*		offsets;
1912 
1913 	page = btr_cur_get_page(cursor);
1914 
1915 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
1916 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
1917 
1918 	page_zip = btr_cur_get_page_zip(cursor);
1919 	if (page_zip) {
1920 		/* Estimate the free space of an empty compressed page. */
1921 		ulint	free_space_zip = page_zip_empty_size(
1922 			cursor->index->n_fields,
1923 			page_zip_get_size(page_zip));
1924 
1925 		if (free_space > (ulint) free_space_zip) {
1926 			free_space = (ulint) free_space_zip;
1927 		}
1928 	}
1929 
1930 	/* free_space is now the free space of a created new page */
1931 
1932 	total_data   = page_get_data_size(page) + insert_size;
1933 	total_n_recs = page_get_n_recs(page) + 1;
1934 	ut_ad(total_n_recs >= 2);
1935 	total_space  = total_data + page_dir_calc_reserved_space(total_n_recs);
1936 
1937 	n = 0;
1938 	incl_data = 0;
1939 	ins_rec = btr_cur_get_rec(cursor);
1940 	rec = page_get_infimum_rec(page);
1941 
1942 	heap = NULL;
1943 	offsets = NULL;
1944 
1945 	/* We start to include records to the left half, and when the
1946 	space reserved by them exceeds half of total_space, then if
1947 	the included records fit on the left page, they will be put there
1948 	if something was left over also for the right page,
1949 	otherwise the last included record will be the first on the right
1950 	half page */
1951 
1952 	do {
1953 		/* Decide the next record to include */
1954 		if (rec == ins_rec) {
1955 			rec = NULL;	/* NULL denotes that tuple is
1956 					now included */
1957 		} else if (rec == NULL) {
1958 			rec = page_rec_get_next(ins_rec);
1959 		} else {
1960 			rec = page_rec_get_next(rec);
1961 		}
1962 
1963 		if (rec == NULL) {
1964 			/* Include tuple */
1965 			incl_data += insert_size;
1966 		} else {
1967 			offsets = rec_get_offsets(rec, cursor->index,
1968 						  offsets, ULINT_UNDEFINED,
1969 						  &heap);
1970 			incl_data += rec_offs_size(offsets);
1971 		}
1972 
1973 		n++;
1974 	} while (incl_data + page_dir_calc_reserved_space(n)
1975 		 < total_space / 2);
1976 
1977 	if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
1978 		/* The next record will be the first on
1979 		the right half page if it is not the
1980 		supremum record of page */
1981 
1982 		if (rec == ins_rec) {
1983 			rec = NULL;
1984 
1985 			goto func_exit;
1986 		} else if (rec == NULL) {
1987 			next_rec = page_rec_get_next(ins_rec);
1988 		} else {
1989 			next_rec = page_rec_get_next(rec);
1990 		}
1991 		ut_ad(next_rec);
1992 		if (!page_rec_is_supremum(next_rec)) {
1993 			rec = next_rec;
1994 		}
1995 	}
1996 
1997 func_exit:
1998 	if (heap) {
1999 		mem_heap_free(heap);
2000 	}
2001 	return(rec);
2002 }
2003 
2004 /*************************************************************//**
2005 Returns TRUE if the insert fits on the appropriate half-page with the
2006 chosen split_rec.
2007 @return true if fits */
2008 static MY_ATTRIBUTE((nonnull(1,3,4,6), warn_unused_result))
2009 bool
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,ulint ** offsets,const dtuple_t * tuple,ulint n_ext,mem_heap_t ** heap)2010 btr_page_insert_fits(
2011 /*=================*/
2012 	btr_cur_t*	cursor,	/*!< in: cursor at which insert
2013 				should be made */
2014 	const rec_t*	split_rec,/*!< in: suggestion for first record
2015 				on upper half-page, or NULL if
2016 				tuple to be inserted should be first */
2017 	ulint**		offsets,/*!< in: rec_get_offsets(
2018 				split_rec, cursor->index); out: garbage */
2019 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2020 	ulint		n_ext,	/*!< in: number of externally stored columns */
2021 	mem_heap_t**	heap)	/*!< in: temporary memory heap */
2022 {
2023 	page_t*		page;
2024 	ulint		insert_size;
2025 	ulint		free_space;
2026 	ulint		total_data;
2027 	ulint		total_n_recs;
2028 	const rec_t*	rec;
2029 	const rec_t*	end_rec;
2030 
2031 	page = btr_cur_get_page(cursor);
2032 
2033 	ut_ad(!split_rec
2034 	      || !page_is_comp(page) == !rec_offs_comp(*offsets));
2035 	ut_ad(!split_rec
2036 	      || rec_offs_validate(split_rec, cursor->index, *offsets));
2037 
2038 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2039 	free_space  = page_get_free_space_of_empty(page_is_comp(page));
2040 
2041 	/* free_space is now the free space of a created new page */
2042 
2043 	total_data   = page_get_data_size(page) + insert_size;
2044 	total_n_recs = page_get_n_recs(page) + 1;
2045 
2046 	/* We determine which records (from rec to end_rec, not including
2047 	end_rec) will end up on the other half page from tuple when it is
2048 	inserted. */
2049 
2050 	if (split_rec == NULL) {
2051 		rec = page_rec_get_next(page_get_infimum_rec(page));
2052 		end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
2053 
2054 	} else if (cmp_dtuple_rec(tuple, split_rec, *offsets) >= 0) {
2055 
2056 		rec = page_rec_get_next(page_get_infimum_rec(page));
2057 		end_rec = split_rec;
2058 	} else {
2059 		rec = split_rec;
2060 		end_rec = page_get_supremum_rec(page);
2061 	}
2062 
2063 	if (total_data + page_dir_calc_reserved_space(total_n_recs)
2064 	    <= free_space) {
2065 
2066 		/* Ok, there will be enough available space on the
2067 		half page where the tuple is inserted */
2068 
2069 		return(true);
2070 	}
2071 
2072 	while (rec != end_rec) {
2073 		/* In this loop we calculate the amount of reserved
2074 		space after rec is removed from page. */
2075 
2076 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
2077 					   ULINT_UNDEFINED, heap);
2078 
2079 		total_data -= rec_offs_size(*offsets);
2080 		total_n_recs--;
2081 
2082 		if (total_data + page_dir_calc_reserved_space(total_n_recs)
2083 		    <= free_space) {
2084 
2085 			/* Ok, there will be enough available space on the
2086 			half page where the tuple is inserted */
2087 
2088 			return(true);
2089 		}
2090 
2091 		rec = page_rec_get_next_const(rec);
2092 	}
2093 
2094 	return(false);
2095 }
2096 
2097 /*******************************************************//**
2098 Inserts a data tuple to a tree on a non-leaf level. It is assumed
2099 that mtr holds an x-latch on the tree. */
2100 void
btr_insert_on_non_leaf_level_func(ulint flags,dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,ulint line,mtr_t * mtr)2101 btr_insert_on_non_leaf_level_func(
2102 /*==============================*/
2103 	ulint		flags,	/*!< in: undo logging and locking flags */
2104 	dict_index_t*	index,	/*!< in: index */
2105 	ulint		level,	/*!< in: level, must be > 0 */
2106 	dtuple_t*	tuple,	/*!< in: the record to be inserted */
2107 	const char*	file,	/*!< in: file name */
2108 	ulint		line,	/*!< in: line where called */
2109 	mtr_t*		mtr)	/*!< in: mtr */
2110 {
2111 	big_rec_t*	dummy_big_rec;
2112 	btr_cur_t	cursor;
2113 	dberr_t		err;
2114 	rec_t*		rec;
2115 	mem_heap_t*	heap = NULL;
2116 	ulint           offsets_[REC_OFFS_NORMAL_SIZE];
2117 	ulint*          offsets         = offsets_;
2118 	rec_offs_init(offsets_);
2119 	rtr_info_t	rtr_info;
2120 
2121 	ut_ad(level > 0);
2122 
2123 	if (!dict_index_is_spatial(index)) {
2124 		if (dict_table_is_intrinsic(index->table)) {
2125 			btr_cur_search_to_nth_level_with_no_latch(
2126 				index, level, tuple, PAGE_CUR_LE, &cursor,
2127 				__FILE__, __LINE__, mtr);
2128 		} else {
2129 			btr_cur_search_to_nth_level(
2130 				index, level, tuple, PAGE_CUR_LE,
2131 				BTR_CONT_MODIFY_TREE,
2132 				&cursor, 0, file, line, mtr);
2133 		}
2134 	} else {
2135 		/* For spatial index, initialize structures to track
2136 		its parents etc. */
2137 		rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2138 
2139 		rtr_info_update_btr(&cursor, &rtr_info);
2140 
2141 		btr_cur_search_to_nth_level(index, level, tuple,
2142 					    PAGE_CUR_RTREE_INSERT,
2143 					    BTR_CONT_MODIFY_TREE,
2144 					    &cursor, 0, file, line, mtr);
2145 	}
2146 
2147 	ut_ad(cursor.flag == BTR_CUR_BINARY);
2148 
2149 	err = btr_cur_optimistic_insert(
2150 		flags
2151 		| BTR_NO_LOCKING_FLAG
2152 		| BTR_KEEP_SYS_FLAG
2153 		| BTR_NO_UNDO_LOG_FLAG,
2154 		&cursor, &offsets, &heap,
2155 		tuple, &rec, &dummy_big_rec, 0, NULL, mtr);
2156 
2157 	if (err == DB_FAIL) {
2158 		err = btr_cur_pessimistic_insert(flags
2159 						 | BTR_NO_LOCKING_FLAG
2160 						 | BTR_KEEP_SYS_FLAG
2161 						 | BTR_NO_UNDO_LOG_FLAG,
2162 						 &cursor, &offsets, &heap,
2163 						 tuple, &rec,
2164 						 &dummy_big_rec, 0, NULL, mtr);
2165 		ut_a(err == DB_SUCCESS);
2166 	}
2167 
2168 	if (heap != NULL) {
2169 		mem_heap_free(heap);
2170 	}
2171 
2172 	if (dict_index_is_spatial(index)) {
2173 		ut_ad(cursor.rtr_info);
2174 
2175 		rtr_clean_rtr_info(&rtr_info, true);
2176 	}
2177 }
2178 
2179 /**************************************************************//**
2180 Attaches the halves of an index page on the appropriate level in an
2181 index tree. */
2182 static MY_ATTRIBUTE((nonnull))
2183 void
btr_attach_half_pages(ulint flags,dict_index_t * index,buf_block_t * block,const rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2184 btr_attach_half_pages(
2185 /*==================*/
2186 	ulint		flags,		/*!< in: undo logging and
2187 					locking flags */
2188 	dict_index_t*	index,		/*!< in: the index tree */
2189 	buf_block_t*	block,		/*!< in/out: page to be split */
2190 	const rec_t*	split_rec,	/*!< in: first record on upper
2191 					half page */
2192 	buf_block_t*	new_block,	/*!< in/out: the new half page */
2193 	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
2194 	mtr_t*		mtr)		/*!< in: mtr */
2195 {
2196 	ulint		prev_page_no;
2197 	ulint		next_page_no;
2198 	ulint		level;
2199 	page_t*		page		= buf_block_get_frame(block);
2200 	page_t*		lower_page;
2201 	page_t*		upper_page;
2202 	ulint		lower_page_no;
2203 	ulint		upper_page_no;
2204 	page_zip_des_t*	lower_page_zip;
2205 	page_zip_des_t*	upper_page_zip;
2206 	dtuple_t*	node_ptr_upper;
2207 	mem_heap_t*	heap;
2208 	buf_block_t*	prev_block = NULL;
2209 	buf_block_t*	next_block = NULL;
2210 
2211 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2212 	ut_ad(mtr_is_block_fix(
2213 		mtr, new_block, MTR_MEMO_PAGE_X_FIX, index->table));
2214 
2215 	/* Create a memory heap where the data tuple is stored */
2216 	heap = mem_heap_create(1024);
2217 
2218 	/* Based on split direction, decide upper and lower pages */
2219 	if (direction == FSP_DOWN) {
2220 
2221 		btr_cur_t	cursor;
2222 		ulint*		offsets;
2223 
2224 		lower_page = buf_block_get_frame(new_block);
2225 		lower_page_no = new_block->page.id.page_no();
2226 		lower_page_zip = buf_block_get_page_zip(new_block);
2227 		upper_page = buf_block_get_frame(block);
2228 		upper_page_no = block->page.id.page_no();
2229 		upper_page_zip = buf_block_get_page_zip(block);
2230 
2231 		/* Look up the index for the node pointer to page */
2232 		offsets = btr_page_get_father_block(NULL, heap, index,
2233 						    block, mtr, &cursor);
2234 
2235 		/* Replace the address of the old child node (= page) with the
2236 		address of the new lower half */
2237 
2238 		btr_node_ptr_set_child_page_no(
2239 			btr_cur_get_rec(&cursor),
2240 			btr_cur_get_page_zip(&cursor),
2241 			offsets, lower_page_no, mtr);
2242 		mem_heap_empty(heap);
2243 	} else {
2244 		lower_page = buf_block_get_frame(block);
2245 		lower_page_no = block->page.id.page_no();
2246 		lower_page_zip = buf_block_get_page_zip(block);
2247 		upper_page = buf_block_get_frame(new_block);
2248 		upper_page_no = new_block->page.id.page_no();
2249 		upper_page_zip = buf_block_get_page_zip(new_block);
2250 	}
2251 
2252 	/* Get the previous and next pages of page */
2253 	prev_page_no = btr_page_get_prev(page, mtr);
2254 	next_page_no = btr_page_get_next(page, mtr);
2255 
2256 	const ulint	space = block->page.id.space();
2257 
2258 	/* for consistency, both blocks should be locked, before change */
2259 	if (prev_page_no != FIL_NULL && direction == FSP_DOWN) {
2260 		prev_block = btr_block_get(
2261 			page_id_t(space, prev_page_no), block->page.size,
2262 			RW_X_LATCH, index, mtr);
2263 	}
2264 	if (next_page_no != FIL_NULL && direction != FSP_DOWN) {
2265 		next_block = btr_block_get(
2266 			page_id_t(space, next_page_no), block->page.size,
2267 			RW_X_LATCH, index, mtr);
2268 	}
2269 
2270 	/* Get the level of the split pages */
2271 	level = btr_page_get_level(buf_block_get_frame(block), mtr);
2272 	ut_ad(level
2273 	      == btr_page_get_level(buf_block_get_frame(new_block), mtr));
2274 
2275 	/* Build the node pointer (= node key and page address) for the upper
2276 	half */
2277 
2278 	node_ptr_upper = dict_index_build_node_ptr(index, split_rec,
2279 						   upper_page_no, heap, level);
2280 
2281 	/* Insert it next to the pointer to the lower half. Note that this
2282 	may generate recursion leading to a split on the higher level. */
2283 
2284 	btr_insert_on_non_leaf_level(flags, index, level + 1,
2285 				     node_ptr_upper, mtr);
2286 
2287 	/* Free the memory heap */
2288 	mem_heap_free(heap);
2289 
2290 	/* Update page links of the level */
2291 
2292 	if (prev_block) {
2293 #ifdef UNIV_BTR_DEBUG
2294 		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
2295 		ut_a(btr_page_get_next(prev_block->frame, mtr)
2296 		     == block->page.id.page_no());
2297 #endif /* UNIV_BTR_DEBUG */
2298 
2299 		btr_page_set_next(buf_block_get_frame(prev_block),
2300 				  buf_block_get_page_zip(prev_block),
2301 				  lower_page_no, mtr);
2302 	}
2303 
2304 	if (next_block) {
2305 #ifdef UNIV_BTR_DEBUG
2306 		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
2307 		ut_a(btr_page_get_prev(next_block->frame, mtr)
2308 		     == page_get_page_no(page));
2309 #endif /* UNIV_BTR_DEBUG */
2310 
2311 		btr_page_set_prev(buf_block_get_frame(next_block),
2312 				  buf_block_get_page_zip(next_block),
2313 				  upper_page_no, mtr);
2314 	}
2315 
2316 	if (direction == FSP_DOWN) {
2317 		/* lower_page is new */
2318 		btr_page_set_prev(lower_page, lower_page_zip,
2319 				  prev_page_no, mtr);
2320 	} else {
2321 		ut_ad(btr_page_get_prev(lower_page, mtr) == prev_page_no);
2322 	}
2323 
2324 	btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
2325 	btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
2326 
2327 	if (direction != FSP_DOWN) {
2328 		/* upper_page is new */
2329 		btr_page_set_next(upper_page, upper_page_zip,
2330 				  next_page_no, mtr);
2331 	} else {
2332 		ut_ad(btr_page_get_next(upper_page, mtr) == next_page_no);
2333 	}
2334 }
2335 
2336 /*************************************************************//**
2337 Determine if a tuple is smaller than any record on the page.
2338 @return TRUE if smaller */
2339 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2340 bool
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,ulint ** offsets,ulint n_uniq,mem_heap_t ** heap)2341 btr_page_tuple_smaller(
2342 /*===================*/
2343 	btr_cur_t*	cursor,	/*!< in: b-tree cursor */
2344 	const dtuple_t*	tuple,	/*!< in: tuple to consider */
2345 	ulint**		offsets,/*!< in/out: temporary storage */
2346 	ulint		n_uniq,	/*!< in: number of unique fields
2347 				in the index page records */
2348 	mem_heap_t**	heap)	/*!< in/out: heap for offsets */
2349 {
2350 	buf_block_t*	block;
2351 	const rec_t*	first_rec;
2352 	page_cur_t	pcur;
2353 
2354 	/* Read the first user record in the page. */
2355 	block = btr_cur_get_block(cursor);
2356 	page_cur_set_before_first(block, &pcur);
2357 	page_cur_move_to_next(&pcur);
2358 	first_rec = page_cur_get_rec(&pcur);
2359 
2360 	*offsets = rec_get_offsets(
2361 		first_rec, cursor->index, *offsets,
2362 		n_uniq, heap);
2363 
2364 	return(cmp_dtuple_rec(tuple, first_rec, *offsets) < 0);
2365 }
2366 
2367 /** Insert the tuple into the right sibling page, if the cursor is at the end
2368 of a page.
2369 @param[in]	flags	undo logging and locking flags
2370 @param[in,out]	cursor	cursor at which to insert; when the function succeeds,
2371 			the cursor is positioned before the insert point.
2372 @param[out]	offsets	offsets on inserted record
2373 @param[in,out]	heap	memory heap for allocating offsets
2374 @param[in]	tuple	tuple to insert
2375 @param[in]	n_ext	number of externally stored columns
2376 @param[in,out]	mtr	mini-transaction
2377 @return	inserted record (first record on the right sibling page);
2378 	the cursor will be positioned on the page infimum
2379 @retval	NULL if the operation was not performed */
2380 static
2381 rec_t*
btr_insert_into_right_sibling(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2382 btr_insert_into_right_sibling(
2383 	ulint		flags,
2384 	btr_cur_t*	cursor,
2385 	ulint**		offsets,
2386 	mem_heap_t*	heap,
2387 	const dtuple_t*	tuple,
2388 	ulint		n_ext,
2389 	mtr_t*		mtr)
2390 {
2391 	buf_block_t*	block = btr_cur_get_block(cursor);
2392 	page_t*		page = buf_block_get_frame(block);
2393 	ulint		next_page_no = btr_page_get_next(page, mtr);
2394 
2395 	ut_ad(dict_table_is_intrinsic(cursor->index->table)
2396 	      || mtr_memo_contains_flagged(
2397 			mtr, dict_index_get_lock(cursor->index),
2398 			MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2399 	ut_ad(mtr_is_block_fix(
2400 		mtr, block, MTR_MEMO_PAGE_X_FIX, cursor->index->table));
2401 	ut_ad(heap);
2402 
2403 	if (next_page_no == FIL_NULL || !page_rec_is_supremum(
2404 			page_rec_get_next(btr_cur_get_rec(cursor)))) {
2405 
2406 		return(NULL);
2407 	}
2408 
2409 	page_cur_t	next_page_cursor;
2410 	buf_block_t*	next_block;
2411 	page_t*		next_page;
2412 	btr_cur_t	next_father_cursor;
2413 	rec_t*		rec = NULL;
2414 	ulint		max_size;
2415 
2416 	const ulint	space = block->page.id.space();
2417 
2418 	next_block = btr_block_get(
2419 		page_id_t(space, next_page_no), block->page.size,
2420 		RW_X_LATCH, cursor->index, mtr);
2421 	next_page = buf_block_get_frame(next_block);
2422 
2423 	bool	is_leaf = page_is_leaf(next_page);
2424 
2425 	btr_page_get_father(
2426 		cursor->index, next_block, mtr, &next_father_cursor);
2427 
2428 	page_cur_search(
2429 		next_block, cursor->index, tuple, PAGE_CUR_LE,
2430 		&next_page_cursor);
2431 
2432 	max_size = page_get_max_insert_size_after_reorganize(next_page, 1);
2433 
2434 	/* Extends gap lock for the next page */
2435 	if (!dict_table_is_locking_disabled(cursor->index->table)) {
2436 		lock_update_split_left(next_block, block);
2437 	}
2438 
2439 	rec = page_cur_tuple_insert(
2440 		&next_page_cursor, tuple, cursor->index, offsets, &heap,
2441 		n_ext, mtr);
2442 
2443 	if (rec == NULL) {
2444 		if (is_leaf
2445 		    && next_block->page.size.is_compressed()
2446 		    && !dict_index_is_clust(cursor->index)
2447 		    && !dict_table_is_temporary(cursor->index->table)) {
2448 			/* Reset the IBUF_BITMAP_FREE bits, because
2449 			page_cur_tuple_insert() will have attempted page
2450 			reorganize before failing. */
2451 			ibuf_reset_free_bits(next_block);
2452 		}
2453 		return(NULL);
2454 	}
2455 
2456 	ibool	compressed;
2457 	dberr_t	err;
2458 	ulint	level = btr_page_get_level(next_page, mtr);
2459 
2460 	/* adjust cursor position */
2461 	*btr_cur_get_page_cur(cursor) = next_page_cursor;
2462 
2463 	ut_ad(btr_cur_get_rec(cursor) == page_get_infimum_rec(next_page));
2464 	ut_ad(page_rec_get_next(page_get_infimum_rec(next_page)) == rec);
2465 
2466 	/* We have to change the parent node pointer */
2467 
2468 	compressed = btr_cur_pessimistic_delete(
2469 		&err, TRUE, &next_father_cursor,
2470 		BTR_CREATE_FLAG, false, mtr);
2471 
2472 	ut_a(err == DB_SUCCESS);
2473 
2474 	if (!compressed) {
2475 		btr_cur_compress_if_useful(&next_father_cursor, FALSE, mtr);
2476 	}
2477 
2478 	dtuple_t*	node_ptr = dict_index_build_node_ptr(
2479 		cursor->index, rec, next_block->page.id.page_no(),
2480 		heap, level);
2481 
2482 	btr_insert_on_non_leaf_level(
2483 		flags, cursor->index, level + 1, node_ptr, mtr);
2484 
2485 	ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2486 
2487 	if (is_leaf
2488 	    && !dict_index_is_clust(cursor->index)
2489 	    && !dict_table_is_temporary(cursor->index->table)) {
2490 		/* Update the free bits of the B-tree page in the
2491 		insert buffer bitmap. */
2492 
2493 		if (next_block->page.size.is_compressed()) {
2494 			ibuf_update_free_bits_zip(next_block, mtr);
2495 		} else {
2496 			ibuf_update_free_bits_if_full(
2497 				next_block, max_size,
2498 				rec_offs_size(*offsets) + PAGE_DIR_SLOT_SIZE);
2499 		}
2500 	}
2501 
2502 	return(rec);
2503 }
2504 
2505 /*************************************************************//**
2506 Splits an index page to halves and inserts the tuple. It is assumed
2507 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2508 released within this function! NOTE that the operation of this
2509 function must always succeed, we cannot reverse it: therefore enough
2510 free disk space (2 pages) must be guaranteed to be available before
2511 this function is called.
2512 @return inserted record */
2513 rec_t*
btr_page_split_and_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2514 btr_page_split_and_insert(
2515 /*======================*/
2516 	ulint		flags,	/*!< in: undo logging and locking flags */
2517 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
2518 				function returns, the cursor is positioned
2519 				on the predecessor of the inserted record */
2520 	ulint**		offsets,/*!< out: offsets on inserted record */
2521 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
2522 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
2523 	ulint		n_ext,	/*!< in: number of externally stored columns */
2524 	mtr_t*		mtr)	/*!< in: mtr */
2525 {
2526 	buf_block_t*	block;
2527 	page_t*		page;
2528 	page_zip_des_t*	page_zip;
2529 	ulint		page_no;
2530 	byte		direction;
2531 	ulint		hint_page_no;
2532 	buf_block_t*	new_block;
2533 	page_t*		new_page;
2534 	page_zip_des_t*	new_page_zip;
2535 	rec_t*		split_rec;
2536 	buf_block_t*	left_block;
2537 	buf_block_t*	right_block;
2538 	buf_block_t*	insert_block;
2539 	page_cur_t*	page_cursor;
2540 	rec_t*		first_rec;
2541 	byte*		buf = 0; /* remove warning */
2542 	rec_t*		move_limit;
2543 	ibool		insert_will_fit;
2544 	ibool		insert_left;
2545 	ulint		n_iterations = 0;
2546 	rec_t*		rec;
2547 	ulint		n_uniq;
2548 	dict_index_t*	index;
2549 
2550 	index = btr_cur_get_index(cursor);
2551 
2552 	if (dict_index_is_spatial(index)) {
2553 		/* Split rtree page and update parent */
2554 		return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2555 						 tuple, n_ext, mtr));
2556 	}
2557 
2558 	if (!*heap) {
2559 		*heap = mem_heap_create(1024);
2560 	}
2561 	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
2562 func_start:
2563 	mem_heap_empty(*heap);
2564 	*offsets = NULL;
2565 
2566 	ut_ad(mtr_memo_contains_flagged(mtr,
2567 					dict_index_get_lock(cursor->index),
2568 					MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK)
2569 	      || dict_table_is_intrinsic(cursor->index->table));
2570 	ut_ad(!dict_index_is_online_ddl(cursor->index)
2571 	      || (flags & BTR_CREATE_FLAG)
2572 	      || dict_index_is_clust(cursor->index));
2573 	ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
2574 				  RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX)
2575 	      || dict_table_is_intrinsic(cursor->index->table));
2576 
2577 	block = btr_cur_get_block(cursor);
2578 	page = buf_block_get_frame(block);
2579 	page_zip = buf_block_get_page_zip(block);
2580 
2581 	ut_ad(mtr_is_block_fix(
2582 		mtr, block, MTR_MEMO_PAGE_X_FIX, cursor->index->table));
2583 	ut_ad(!page_is_empty(page));
2584 
2585 	/* try to insert to the next page if possible before split */
2586 	rec = btr_insert_into_right_sibling(
2587 		flags, cursor, offsets, *heap, tuple, n_ext, mtr);
2588 
2589 	if (rec != NULL) {
2590 		return(rec);
2591 	}
2592 
2593 	page_no = block->page.id.page_no();
2594 
2595 	/* 1. Decide the split record; split_rec == NULL means that the
2596 	tuple to be inserted should be the first record on the upper
2597 	half-page */
2598 	insert_left = FALSE;
2599 
2600 	if (n_iterations > 0) {
2601 		direction = FSP_UP;
2602 		hint_page_no = page_no + 1;
2603 		split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
2604 
2605 		if (split_rec == NULL) {
2606 			insert_left = btr_page_tuple_smaller(
2607 				cursor, tuple, offsets, n_uniq, heap);
2608 		}
2609 	} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
2610 		direction = FSP_UP;
2611 		hint_page_no = page_no + 1;
2612 
2613 	} else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) {
2614 		direction = FSP_DOWN;
2615 		hint_page_no = page_no - 1;
2616 		ut_ad(split_rec);
2617 	} else {
2618 		direction = FSP_UP;
2619 		hint_page_no = page_no + 1;
2620 
2621 		/* If there is only one record in the index page, we
2622 		can't split the node in the middle by default. We need
2623 		to determine whether the new record will be inserted
2624 		to the left or right. */
2625 
2626 		if (page_get_n_recs(page) > 1) {
2627 			split_rec = page_get_middle_rec(page);
2628 		} else if (btr_page_tuple_smaller(cursor, tuple,
2629 						  offsets, n_uniq, heap)) {
2630 			split_rec = page_rec_get_next(
2631 				page_get_infimum_rec(page));
2632 		} else {
2633 			split_rec = NULL;
2634 		}
2635 	}
2636 
2637 	/* 2. Allocate a new page to the index */
2638 	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2639 				   btr_page_get_level(page, mtr), mtr, mtr);
2640 
2641 	new_page = buf_block_get_frame(new_block);
2642 	new_page_zip = buf_block_get_page_zip(new_block);
2643 	btr_page_create(new_block, new_page_zip, cursor->index,
2644 			btr_page_get_level(page, mtr), mtr);
2645 
2646 	/* 3. Calculate the first record on the upper half-page, and the
2647 	first record (move_limit) on original page which ends up on the
2648 	upper half */
2649 
2650 	if (split_rec) {
2651 		first_rec = move_limit = split_rec;
2652 
2653 		*offsets = rec_get_offsets(split_rec, cursor->index, *offsets,
2654 					   n_uniq, heap);
2655 
2656 		insert_left = cmp_dtuple_rec(tuple, split_rec, *offsets) < 0;
2657 
2658 		if (!insert_left && new_page_zip && n_iterations > 0) {
2659 			/* If a compressed page has already been split,
2660 			avoid further splits by inserting the record
2661 			to an empty page. */
2662 			split_rec = NULL;
2663 			goto insert_empty;
2664 		}
2665 	} else if (insert_left) {
2666 		ut_a(n_iterations > 0);
2667 		first_rec = page_rec_get_next(page_get_infimum_rec(page));
2668 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2669 	} else {
2670 insert_empty:
2671 		ut_ad(!split_rec);
2672 		ut_ad(!insert_left);
2673 		buf = UT_NEW_ARRAY_NOKEY(
2674 			byte,
2675 			rec_get_converted_size(cursor->index, tuple, n_ext));
2676 
2677 		first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
2678 						      tuple, n_ext);
2679 		move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2680 	}
2681 
2682 	/* 4. Do first the modifications in the tree structure */
2683 
2684 	btr_attach_half_pages(flags, cursor->index, block,
2685 			      first_rec, new_block, direction, mtr);
2686 
2687 	/* If the split is made on the leaf level and the insert will fit
2688 	on the appropriate half-page, we may release the tree x-latch.
2689 	We can then move the records after releasing the tree latch,
2690 	thus reducing the tree latch contention. */
2691 
2692 	if (split_rec) {
2693 		insert_will_fit = !new_page_zip
2694 			&& btr_page_insert_fits(cursor, split_rec,
2695 						offsets, tuple, n_ext, heap);
2696 	} else {
2697 		if (!insert_left) {
2698 			UT_DELETE_ARRAY(buf);
2699 			buf = NULL;
2700 		}
2701 
2702 		insert_will_fit = !new_page_zip
2703 			&& btr_page_insert_fits(cursor, NULL,
2704 						offsets, tuple, n_ext, heap);
2705 	}
2706 
2707 	if (!srv_read_only_mode
2708 	    && !dict_table_is_intrinsic(cursor->index->table)
2709 	    && insert_will_fit
2710 	    && page_is_leaf(page)
2711 	    && !dict_index_is_online_ddl(cursor->index)) {
2712 
2713 		mtr->memo_release(
2714 			dict_index_get_lock(cursor->index),
2715 			MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK);
2716 
2717 		/* NOTE: We cannot release root block latch here, because it
2718 		has segment header and already modified in most of cases.*/
2719 	}
2720 
2721 	/* 5. Move then the records to the new page */
2722 	if (direction == FSP_DOWN) {
2723 		/*		fputs("Split left\n", stderr); */
2724 
2725 		if (0
2726 #ifdef UNIV_ZIP_COPY
2727 		    || page_zip
2728 #endif /* UNIV_ZIP_COPY */
2729 		    || !page_move_rec_list_start(new_block, block, move_limit,
2730 						 cursor->index, mtr)) {
2731 			/* For some reason, compressing new_page failed,
2732 			even though it should contain fewer records than
2733 			the original page.  Copy the page byte for byte
2734 			and then delete the records from both pages
2735 			as appropriate.  Deleting will always succeed. */
2736 			ut_a(new_page_zip);
2737 
2738 			page_zip_copy_recs(new_page_zip, new_page,
2739 					   page_zip, page, cursor->index, mtr);
2740 			page_delete_rec_list_end(move_limit - page + new_page,
2741 						 new_block, cursor->index,
2742 						 ULINT_UNDEFINED,
2743 						 ULINT_UNDEFINED, mtr);
2744 
2745 			/* Update the lock table and possible hash index. */
2746 
2747 			if (!dict_table_is_locking_disabled(
2748 				cursor->index->table)) {
2749 				lock_move_rec_list_start(
2750 					new_block, block, move_limit,
2751 					new_page + PAGE_NEW_INFIMUM);
2752 			}
2753 
2754 			btr_search_move_or_delete_hash_entries(
2755 				new_block, block, cursor->index);
2756 
2757 			/* Delete the records from the source page. */
2758 
2759 			page_delete_rec_list_start(move_limit, block,
2760 						   cursor->index, mtr);
2761 		}
2762 
2763 		left_block = new_block;
2764 		right_block = block;
2765 
2766 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
2767 			lock_update_split_left(right_block, left_block);
2768 		}
2769 	} else {
2770 		/*		fputs("Split right\n", stderr); */
2771 
2772 		if (0
2773 #ifdef UNIV_ZIP_COPY
2774 		    || page_zip
2775 #endif /* UNIV_ZIP_COPY */
2776 		    || !page_move_rec_list_end(new_block, block, move_limit,
2777 					       cursor->index, mtr)) {
2778 			/* For some reason, compressing new_page failed,
2779 			even though it should contain fewer records than
2780 			the original page.  Copy the page byte for byte
2781 			and then delete the records from both pages
2782 			as appropriate.  Deleting will always succeed. */
2783 			ut_a(new_page_zip);
2784 
2785 			page_zip_copy_recs(new_page_zip, new_page,
2786 					   page_zip, page, cursor->index, mtr);
2787 			page_delete_rec_list_start(move_limit - page
2788 						   + new_page, new_block,
2789 						   cursor->index, mtr);
2790 
2791 			/* Update the lock table and possible hash index. */
2792 			if (!dict_table_is_locking_disabled(
2793 				cursor->index->table)) {
2794 				lock_move_rec_list_end(
2795 					new_block, block, move_limit);
2796 			}
2797 
2798 			ut_ad(!dict_index_is_spatial(index));
2799 
2800 			btr_search_move_or_delete_hash_entries(
2801 				new_block, block, cursor->index);
2802 
2803 			/* Delete the records from the source page. */
2804 
2805 			page_delete_rec_list_end(move_limit, block,
2806 						 cursor->index,
2807 						 ULINT_UNDEFINED,
2808 						 ULINT_UNDEFINED, mtr);
2809 		}
2810 
2811 		left_block = block;
2812 		right_block = new_block;
2813 
2814 		if (!dict_table_is_locking_disabled(cursor->index->table)) {
2815 			lock_update_split_right(right_block, left_block);
2816 		}
2817 	}
2818 
2819 #ifdef UNIV_ZIP_DEBUG
2820 	if (page_zip) {
2821 		ut_a(page_zip_validate(page_zip, page, cursor->index));
2822 		ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
2823 	}
2824 #endif /* UNIV_ZIP_DEBUG */
2825 
2826 	/* At this point, split_rec, move_limit and first_rec may point
2827 	to garbage on the old page. */
2828 
2829 	/* 6. The split and the tree modification is now completed. Decide the
2830 	page where the tuple should be inserted */
2831 
2832 	if (insert_left) {
2833 		insert_block = left_block;
2834 	} else {
2835 		insert_block = right_block;
2836 	}
2837 
2838 	/* 7. Reposition the cursor for insert and try insertion */
2839 	page_cursor = btr_cur_get_page_cur(cursor);
2840 
2841 	page_cur_search(insert_block, cursor->index, tuple, page_cursor);
2842 
2843 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
2844 				    offsets, heap, n_ext, mtr);
2845 
2846 #ifdef UNIV_ZIP_DEBUG
2847 	{
2848 		page_t*		insert_page
2849 			= buf_block_get_frame(insert_block);
2850 
2851 		page_zip_des_t*	insert_page_zip
2852 			= buf_block_get_page_zip(insert_block);
2853 
2854 		ut_a(!insert_page_zip
2855 		     || page_zip_validate(insert_page_zip, insert_page,
2856 					  cursor->index));
2857 	}
2858 #endif /* UNIV_ZIP_DEBUG */
2859 
2860 	if (rec != NULL) {
2861 
2862 		goto func_exit;
2863 	}
2864 
2865 	/* 8. If insert did not fit, try page reorganization.
2866 	For compressed pages, page_cur_tuple_insert() will have
2867 	attempted this already. */
2868 
2869 	if (page_cur_get_page_zip(page_cursor)
2870 	    || !btr_page_reorganize(page_cursor, cursor->index, mtr)) {
2871 
2872 		goto insert_failed;
2873 	}
2874 
2875 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
2876 				    offsets, heap, n_ext, mtr);
2877 
2878 	if (rec == NULL) {
2879 		/* The insert did not fit on the page: loop back to the
2880 		start of the function for a new split */
2881 insert_failed:
2882 		/* We play safe and reset the free bits for new_page */
2883 		if (!dict_index_is_clust(cursor->index)
2884 		    && !dict_table_is_temporary(cursor->index->table)) {
2885 			ibuf_reset_free_bits(new_block);
2886 			ibuf_reset_free_bits(block);
2887 		}
2888 
2889 		n_iterations++;
2890 		ut_ad(n_iterations < 2
2891 		      || buf_block_get_page_zip(insert_block));
2892 		ut_ad(!insert_will_fit);
2893 
2894 		goto func_start;
2895 	}
2896 
2897 func_exit:
2898 	/* Insert fit on the page: update the free bits for the
2899 	left and right pages in the same mtr */
2900 
2901 	if (!dict_index_is_clust(cursor->index)
2902 	    && !dict_table_is_temporary(cursor->index->table)
2903 	    && page_is_leaf(page)) {
2904 
2905 		ibuf_update_free_bits_for_two_pages_low(
2906 			left_block, right_block, mtr);
2907 	}
2908 
2909 	MONITOR_INC(MONITOR_INDEX_SPLIT);
2910 
2911 	ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
2912 	ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
2913 
2914 	ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
2915 	return(rec);
2916 }
2917 
2918 /** Removes a page from the level list of pages.
2919 @param[in]	space		space where removed
2920 @param[in]	page_size	page size
2921 @param[in,out]	page		page to remove
2922 @param[in]	index		index tree
2923 @param[in,out]	mtr		mini-transaction */
2924 # define btr_level_list_remove(space,page_size,page,index,mtr)		\
2925 	btr_level_list_remove_func(space,page_size,page,index,mtr)
2926 
2927 /** Removes a page from the level list of pages.
2928 @param[in]	space		space where removed
2929 @param[in]	page_size	page size
2930 @param[in,out]	page		page to remove
2931 @param[in]	index		index tree
2932 @param[in,out]	mtr		mini-transaction */
2933 static
2934 void
btr_level_list_remove_func(ulint space,const page_size_t & page_size,page_t * page,const dict_index_t * index,mtr_t * mtr)2935 btr_level_list_remove_func(
2936 	ulint			space,
2937 	const page_size_t&	page_size,
2938 	page_t*			page,
2939 	const dict_index_t*	index,
2940 	mtr_t*			mtr)
2941 {
2942 	ut_ad(page != NULL);
2943 	ut_ad(mtr != NULL);
2944 	ut_ad(mtr_is_page_fix(mtr, page, MTR_MEMO_PAGE_X_FIX, index->table));
2945 	ut_ad(space == page_get_space_id(page));
2946 	/* Get the previous and next page numbers of page */
2947 
2948 	const ulint	prev_page_no = btr_page_get_prev(page, mtr);
2949 	const ulint	next_page_no = btr_page_get_next(page, mtr);
2950 
2951 	/* Update page links of the level */
2952 
2953 	if (prev_page_no != FIL_NULL) {
2954 		buf_block_t*	prev_block
2955 			= btr_block_get(page_id_t(space, prev_page_no),
2956 					page_size, RW_X_LATCH, index, mtr);
2957 
2958 		page_t*		prev_page
2959 			= buf_block_get_frame(prev_block);
2960 #ifdef UNIV_BTR_DEBUG
2961 		ut_a(page_is_comp(prev_page) == page_is_comp(page));
2962 		ut_a(btr_page_get_next(prev_page, mtr)
2963 		     == page_get_page_no(page));
2964 #endif /* UNIV_BTR_DEBUG */
2965 
2966 		btr_page_set_next(prev_page,
2967 				  buf_block_get_page_zip(prev_block),
2968 				  next_page_no, mtr);
2969 	}
2970 
2971 	if (next_page_no != FIL_NULL) {
2972 		buf_block_t*	next_block
2973 			= btr_block_get(
2974 				page_id_t(space, next_page_no), page_size,
2975 				RW_X_LATCH, index, mtr);
2976 
2977 		page_t*		next_page
2978 			= buf_block_get_frame(next_block);
2979 #ifdef UNIV_BTR_DEBUG
2980 		ut_a(page_is_comp(next_page) == page_is_comp(page));
2981 		ut_a(btr_page_get_prev(next_page, mtr)
2982 		     == page_get_page_no(page));
2983 #endif /* UNIV_BTR_DEBUG */
2984 
2985 		btr_page_set_prev(next_page,
2986 				  buf_block_get_page_zip(next_block),
2987 				  prev_page_no, mtr);
2988 	}
2989 }
2990 
2991 /****************************************************************//**
2992 Writes the redo log record for setting an index record as the predefined
2993 minimum record. */
2994 UNIV_INLINE
2995 void
btr_set_min_rec_mark_log(rec_t * rec,mlog_id_t type,mtr_t * mtr)2996 btr_set_min_rec_mark_log(
2997 /*=====================*/
2998 	rec_t*		rec,	/*!< in: record */
2999 	mlog_id_t	type,	/*!< in: MLOG_COMP_REC_MIN_MARK or
3000 				MLOG_REC_MIN_MARK */
3001 	mtr_t*		mtr)	/*!< in: mtr */
3002 {
3003 	mlog_write_initial_log_record(rec, type, mtr);
3004 
3005 	/* Write rec offset as a 2-byte ulint */
3006 	mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
3007 }
3008 #else /* !UNIV_HOTBACKUP */
3009 # define btr_set_min_rec_mark_log(rec,comp,mtr) ((void) 0)
3010 #endif /* !UNIV_HOTBACKUP */
3011 
3012 /****************************************************************//**
3013 Parses the redo log record for setting an index record as the predefined
3014 minimum record.
3015 @return end of log record or NULL */
3016 byte*
btr_parse_set_min_rec_mark(byte * ptr,byte * end_ptr,ulint comp,page_t * page,mtr_t * mtr)3017 btr_parse_set_min_rec_mark(
3018 /*=======================*/
3019 	byte*	ptr,	/*!< in: buffer */
3020 	byte*	end_ptr,/*!< in: buffer end */
3021 	ulint	comp,	/*!< in: nonzero=compact page format */
3022 	page_t*	page,	/*!< in: page or NULL */
3023 	mtr_t*	mtr)	/*!< in: mtr or NULL */
3024 {
3025 	rec_t*	rec;
3026 
3027 	if (end_ptr < ptr + 2) {
3028 
3029 		return(NULL);
3030 	}
3031 
3032 	if (page) {
3033 		ut_a(!page_is_comp(page) == !comp);
3034 
3035 		rec = page + mach_read_from_2(ptr);
3036 
3037 		btr_set_min_rec_mark(rec, mtr);
3038 	}
3039 
3040 	return(ptr + 2);
3041 }
3042 
3043 /****************************************************************//**
3044 Sets a record as the predefined minimum record. */
3045 void
btr_set_min_rec_mark(rec_t * rec,mtr_t * mtr)3046 btr_set_min_rec_mark(
3047 /*=================*/
3048 	rec_t*	rec,	/*!< in: record */
3049 	mtr_t*	mtr)	/*!< in: mtr */
3050 {
3051 	ulint	info_bits;
3052 
3053 	if (page_rec_is_comp(rec)) {
3054 		info_bits = rec_get_info_bits(rec, TRUE);
3055 
3056 		rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3057 
3058 		btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
3059 	} else {
3060 		info_bits = rec_get_info_bits(rec, FALSE);
3061 
3062 		rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3063 
3064 		btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
3065 	}
3066 }
3067 
3068 #ifndef UNIV_HOTBACKUP
3069 /*************************************************************//**
3070 Deletes on the upper level the node pointer to a page. */
3071 void
btr_node_ptr_delete(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3072 btr_node_ptr_delete(
3073 /*================*/
3074 	dict_index_t*	index,	/*!< in: index tree */
3075 	buf_block_t*	block,	/*!< in: page whose node pointer is deleted */
3076 	mtr_t*		mtr)	/*!< in: mtr */
3077 {
3078 	btr_cur_t	cursor;
3079 	ibool		compressed;
3080 	dberr_t		err;
3081 
3082 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3083 
3084 	/* Delete node pointer on father page */
3085 	btr_page_get_father(index, block, mtr, &cursor);
3086 
3087 	compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor,
3088 						BTR_CREATE_FLAG, false, mtr);
3089 	ut_a(err == DB_SUCCESS);
3090 
3091 	if (!compressed) {
3092 		btr_cur_compress_if_useful(&cursor, FALSE, mtr);
3093 	}
3094 }
3095 
3096 /*************************************************************//**
3097 If page is the only on its level, this function moves its records to the
3098 father page, thus reducing the tree height.
3099 @return father block */
3100 static
3101 buf_block_t*
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3102 btr_lift_page_up(
3103 /*=============*/
3104 	dict_index_t*	index,	/*!< in: index tree */
3105 	buf_block_t*	block,	/*!< in: page which is the only on its level;
3106 				must not be empty: use
3107 				btr_discard_only_page_on_level if the last
3108 				record from the page should be removed */
3109 	mtr_t*		mtr)	/*!< in: mtr */
3110 {
3111 	buf_block_t*	father_block;
3112 	page_t*		father_page;
3113 	ulint		page_level;
3114 	page_zip_des_t*	father_page_zip;
3115 	page_t*		page		= buf_block_get_frame(block);
3116 	ulint		root_page_no;
3117 	buf_block_t*	blocks[BTR_MAX_LEVELS];
3118 	ulint		n_blocks;	/*!< last used index in blocks[] */
3119 	ulint		i;
3120 	bool		lift_father_up;
3121 	buf_block_t*	block_orig	= block;
3122 
3123 	ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
3124 	ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
3125 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3126 
3127 	page_level = btr_page_get_level(page, mtr);
3128 	root_page_no = dict_index_get_page(index);
3129 
3130 	{
3131 		btr_cur_t	cursor;
3132 		ulint*		offsets	= NULL;
3133 		mem_heap_t*	heap	= mem_heap_create(
3134 			sizeof(*offsets)
3135 			* (REC_OFFS_HEADER_SIZE + 1 + 1 + index->n_fields));
3136 		buf_block_t*	b;
3137 
3138 		if (dict_index_is_spatial(index)) {
3139 			offsets = rtr_page_get_father_block(
3140 				NULL, heap, index, block, mtr,
3141 				NULL, &cursor);
3142 		} else {
3143 			offsets = btr_page_get_father_block(offsets, heap,
3144 							    index, block,
3145 							    mtr, &cursor);
3146 		}
3147 		father_block = btr_cur_get_block(&cursor);
3148 		father_page_zip = buf_block_get_page_zip(father_block);
3149 		father_page = buf_block_get_frame(father_block);
3150 
3151 		n_blocks = 0;
3152 
3153 		/* Store all ancestor pages so we can reset their
3154 		levels later on.  We have to do all the searches on
3155 		the tree now because later on, after we've replaced
3156 		the first level, the tree is in an inconsistent state
3157 		and can not be searched. */
3158 		for (b = father_block;
3159 		     b->page.id.page_no() != root_page_no; ) {
3160 			ut_a(n_blocks < BTR_MAX_LEVELS);
3161 
3162 			if (dict_index_is_spatial(index)) {
3163 				offsets = rtr_page_get_father_block(
3164 					NULL, heap, index, b, mtr,
3165 					NULL, &cursor);
3166 			} else {
3167 				offsets = btr_page_get_father_block(offsets,
3168 								    heap,
3169 								    index, b,
3170 								    mtr,
3171 								    &cursor);
3172 			}
3173 
3174 			blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
3175 		}
3176 
3177 		lift_father_up = (n_blocks && page_level == 0);
3178 		if (lift_father_up) {
3179 			/* The father page also should be the only on its level (not
3180 			root). We should lift up the father page at first.
3181 			Because the leaf page should be lifted up only for root page.
3182 			The freeing page is based on page_level (==0 or !=0)
3183 			to choose segment. If the page_level is changed ==0 from !=0,
3184 			later freeing of the page doesn't find the page allocation
3185 			to be freed.*/
3186 
3187 			block = father_block;
3188 			page = buf_block_get_frame(block);
3189 			page_level = btr_page_get_level(page, mtr);
3190 
3191 			ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
3192 			ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
3193 			ut_ad(mtr_is_block_fix(
3194 				mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3195 
3196 			father_block = blocks[0];
3197 			father_page_zip = buf_block_get_page_zip(father_block);
3198 			father_page = buf_block_get_frame(father_block);
3199 		}
3200 
3201 		mem_heap_free(heap);
3202 	}
3203 
3204 	btr_search_drop_page_hash_index(block);
3205 
3206 	/* Make the father empty */
3207 	btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
3208 	page_level++;
3209 
3210 	/* Copy the records to the father page one by one. */
3211 	if (0
3212 #ifdef UNIV_ZIP_COPY
3213 	    || father_page_zip
3214 #endif /* UNIV_ZIP_COPY */
3215 	    || !page_copy_rec_list_end(father_block, block,
3216 				       page_get_infimum_rec(page),
3217 				       index, mtr)) {
3218 		const page_zip_des_t*	page_zip
3219 			= buf_block_get_page_zip(block);
3220 		ut_a(father_page_zip);
3221 		ut_a(page_zip);
3222 
3223 		/* Copy the page byte for byte. */
3224 		page_zip_copy_recs(father_page_zip, father_page,
3225 				   page_zip, page, index, mtr);
3226 
3227 		/* Update the lock table and possible hash index. */
3228 
3229 		if (!dict_table_is_locking_disabled(index->table)) {
3230 			lock_move_rec_list_end(father_block, block,
3231 					       page_get_infimum_rec(page));
3232 		}
3233 
3234 		/* Also update the predicate locks */
3235 		if (dict_index_is_spatial(index)) {
3236 			lock_prdt_rec_move(father_block, block);
3237 		}
3238 
3239 		btr_search_move_or_delete_hash_entries(father_block, block,
3240 						       index);
3241 	}
3242 
3243 	if (!dict_table_is_locking_disabled(index->table)) {
3244 		/* Free predicate page locks on the block */
3245 		if (dict_index_is_spatial(index)) {
3246 			lock_mutex_enter();
3247 			lock_prdt_page_free_from_discard(
3248 				block, lock_sys->prdt_page_hash);
3249 			lock_mutex_exit();
3250 		}
3251 		lock_update_copy_and_discard(father_block, block);
3252 	}
3253 
3254 	/* Go upward to root page, decrementing levels by one. */
3255 	for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
3256 		page_t*		page	= buf_block_get_frame(blocks[i]);
3257 		page_zip_des_t*	page_zip= buf_block_get_page_zip(blocks[i]);
3258 
3259 		ut_ad(btr_page_get_level(page, mtr) == page_level + 1);
3260 
3261 		btr_page_set_level(page, page_zip, page_level, mtr);
3262 #ifdef UNIV_ZIP_DEBUG
3263 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3264 #endif /* UNIV_ZIP_DEBUG */
3265 	}
3266 
3267 	if (dict_index_is_spatial(index)) {
3268 		rtr_check_discard_page(index, NULL, block);
3269 	}
3270 
3271 	/* Free the file page */
3272 	btr_page_free(index, block, mtr);
3273 
3274 	/* We play it safe and reset the free bits for the father */
3275 	if (!dict_index_is_clust(index)
3276 	    && !dict_table_is_temporary(index->table)) {
3277 		ibuf_reset_free_bits(father_block);
3278 	}
3279 	ut_ad(page_validate(father_page, index));
3280 	ut_ad(btr_check_node_ptr(index, father_block, mtr));
3281 
3282 	return(lift_father_up ? block_orig : father_block);
3283 }
3284 
3285 /*************************************************************//**
3286 Tries to merge the page first to the left immediate brother if such a
3287 brother exists, and the node pointers to the current page and to the brother
3288 reside on the same page. If the left brother does not satisfy these
3289 conditions, looks at the right brother. If the page is the only one on that
3290 level lifts the records of the page to the father page, thus reducing the
3291 tree height. It is assumed that mtr holds an x-latch on the tree and on the
3292 page. If cursor is on the leaf level, mtr must also hold x-latches to the
3293 brothers, if they exist.
3294 @return TRUE on success */
3295 ibool
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3296 btr_compress(
3297 /*=========*/
3298 	btr_cur_t*	cursor,	/*!< in/out: cursor on the page to merge
3299 				or lift; the page must not be empty:
3300 				when deleting records, use btr_discard_page()
3301 				if the page would become empty */
3302 	ibool		adjust,	/*!< in: TRUE if should adjust the
3303 				cursor position even if compression occurs */
3304 	mtr_t*		mtr)	/*!< in/out: mini-transaction */
3305 {
3306 	dict_index_t*	index;
3307 	ulint		space;
3308 	ulint		left_page_no;
3309 	ulint		right_page_no;
3310 	buf_block_t*	merge_block;
3311 	page_t*		merge_page = NULL;
3312 	page_zip_des_t*	merge_page_zip;
3313 	ibool		is_left;
3314 	buf_block_t*	block;
3315 	page_t*		page;
3316 	btr_cur_t	father_cursor;
3317 	mem_heap_t*	heap;
3318 	ulint*		offsets;
3319 	ulint		nth_rec = 0; /* remove bogus warning */
3320 	bool		mbr_changed = false;
3321 #ifdef UNIV_DEBUG
3322 	bool		leftmost_child;
3323 #endif
3324 	DBUG_ENTER("btr_compress");
3325 
3326 	block = btr_cur_get_block(cursor);
3327 	page = btr_cur_get_page(cursor);
3328 	index = btr_cur_get_index(cursor);
3329 
3330 	btr_assert_not_corrupted(block, index);
3331 
3332 #ifdef UNIV_DEBUG
3333 	if (dict_index_is_spatial(index)) {
3334 		ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3335 						MTR_MEMO_X_LOCK));
3336 	} else {
3337 		ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3338 						MTR_MEMO_X_LOCK
3339 						| MTR_MEMO_SX_LOCK)
3340 		      || dict_table_is_intrinsic(index->table));
3341 	}
3342 #endif /* UNIV_DEBUG */
3343 
3344 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3345 	space = dict_index_get_space(index);
3346 
3347 	const page_size_t	page_size(dict_table_page_size(index->table));
3348 
3349 	MONITOR_INC(MONITOR_INDEX_MERGE_ATTEMPTS);
3350 
3351 	left_page_no = btr_page_get_prev(page, mtr);
3352 	right_page_no = btr_page_get_next(page, mtr);
3353 
3354 #ifdef UNIV_DEBUG
3355 	if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3356 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3357 			page_rec_get_next(page_get_infimum_rec(page)),
3358 			page_is_comp(page)));
3359 	}
3360 #endif /* UNIV_DEBUG */
3361 
3362 	heap = mem_heap_create(100);
3363 
3364 	if (dict_index_is_spatial(index)) {
3365 		offsets = rtr_page_get_father_block(
3366 			NULL, heap, index, block, mtr, cursor, &father_cursor);
3367 		ut_ad(cursor->page_cur.block->page.id.page_no()
3368 		      == block->page.id.page_no());
3369 		rec_t*  my_rec = father_cursor.page_cur.rec;
3370 
3371 		ulint page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3372 
3373 		if (page_no != block->page.id.page_no()) {
3374 			ib::info() << "father positioned on page "
3375 				<< page_no << "instead of "
3376 				<< block->page.id.page_no();
3377 			offsets = btr_page_get_father_block(
3378 				NULL, heap, index, block, mtr, &father_cursor);
3379 		}
3380 	} else {
3381 		offsets = btr_page_get_father_block(
3382 			NULL, heap, index, block, mtr, &father_cursor);
3383 	}
3384 
3385 	if (adjust) {
3386 		nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3387 		ut_ad(nth_rec > 0);
3388 	}
3389 
3390 	if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3391 		/* The page is the only one on the level, lift the records
3392 		to the father */
3393 
3394 		merge_block = btr_lift_page_up(index, block, mtr);
3395 		goto func_exit;
3396 	}
3397 
3398 	ut_d(leftmost_child =
3399 		left_page_no != FIL_NULL
3400 		&& (page_rec_get_next(
3401 			page_get_infimum_rec(
3402 				btr_cur_get_page(&father_cursor)))
3403 		    == btr_cur_get_rec(&father_cursor)));
3404 
3405 	/* Decide the page to which we try to merge and which will inherit
3406 	the locks */
3407 
3408 	is_left = btr_can_merge_with_page(cursor, left_page_no,
3409 					  &merge_block, mtr);
3410 
3411 	DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3412 retry:
3413 	if (!is_left
3414 	   && !btr_can_merge_with_page(cursor, right_page_no, &merge_block,
3415 				       mtr)) {
3416 		if (!merge_block) {
3417 			merge_page = NULL;
3418 		}
3419 		goto err_exit;
3420 	}
3421 
3422 	merge_page = buf_block_get_frame(merge_block);
3423 
3424 #ifdef UNIV_BTR_DEBUG
3425 	if (is_left) {
3426 		ut_a(btr_page_get_next(merge_page, mtr)
3427 		     == block->page.id.page_no());
3428 	} else {
3429 		ut_a(btr_page_get_prev(merge_page, mtr)
3430 		     == block->page.id.page_no());
3431 	}
3432 #endif /* UNIV_BTR_DEBUG */
3433 
3434 #ifdef UNIV_GIS_DEBUG
3435 	if (dict_index_is_spatial(index)) {
3436 		if (is_left) {
3437 			fprintf(stderr, "GIS_DIAG: merge left  %ld to %ld \n",
3438 				(long) block->page.id.page_no(), left_page_no);
3439 		} else {
3440 			fprintf(stderr, "GIS_DIAG: merge right %ld to %ld\n",
3441 				(long) block->page.id.page_no(), right_page_no);
3442 		}
3443 	}
3444 #endif /* UNIV_GIS_DEBUG */
3445 
3446 	ut_ad(page_validate(merge_page, index));
3447 
3448 	merge_page_zip = buf_block_get_page_zip(merge_block);
3449 #ifdef UNIV_ZIP_DEBUG
3450 	if (merge_page_zip) {
3451 		const page_zip_des_t*	page_zip
3452 			= buf_block_get_page_zip(block);
3453 		ut_a(page_zip);
3454 		ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3455 		ut_a(page_zip_validate(page_zip, page, index));
3456 	}
3457 #endif /* UNIV_ZIP_DEBUG */
3458 
3459 	/* Move records to the merge page */
3460 	if (is_left) {
3461 		btr_cur_t	cursor2;
3462 		rtr_mbr_t	new_mbr;
3463 		ulint*		offsets2 = NULL;
3464 
3465 		/* For rtree, we need to update father's mbr. */
3466 		if (dict_index_is_spatial(index)) {
3467 			/* We only support merge pages with the same parent
3468 			page */
3469 			if (!rtr_check_same_block(
3470 				index, &cursor2,
3471 				btr_cur_get_block(&father_cursor),
3472 				merge_block, heap)) {
3473 				is_left = false;
3474 				goto retry;
3475 			}
3476 
3477 			/* Set rtr_info for cursor2, since it is
3478 			necessary in recursive page merge. */
3479 			cursor2.rtr_info = cursor->rtr_info;
3480 			cursor2.tree_height = cursor->tree_height;
3481 
3482 			offsets2 = rec_get_offsets(
3483 				btr_cur_get_rec(&cursor2), index,
3484 				NULL, ULINT_UNDEFINED, &heap);
3485 
3486 			/* Check if parent entry needs to be updated */
3487 			mbr_changed = rtr_merge_mbr_changed(
3488 				&cursor2, &father_cursor,
3489 				offsets2, offsets, &new_mbr,
3490 				merge_block, block, index);
3491 		}
3492 
3493 		rec_t*	orig_pred = page_copy_rec_list_start(
3494 			merge_block, block, page_get_supremum_rec(page),
3495 			index, mtr);
3496 
3497 		if (!orig_pred) {
3498 			goto err_exit;
3499 		}
3500 
3501 		btr_search_drop_page_hash_index(block);
3502 
3503 		/* Remove the page from the level list */
3504 		btr_level_list_remove(space, page_size, page, index, mtr);
3505 
3506 		if (dict_index_is_spatial(index)) {
3507 			rec_t*  my_rec = father_cursor.page_cur.rec;
3508 
3509 			ulint page_no = btr_node_ptr_get_child_page_no(
3510 						my_rec, offsets);
3511 
3512 			if (page_no != block->page.id.page_no()) {
3513 
3514 				ib::fatal() << "father positioned on "
3515 					<< page_no << " instead of "
3516 					<< block->page.id.page_no();
3517 
3518 				ut_ad(0);
3519 			}
3520 
3521 			if (mbr_changed) {
3522 #ifdef UNIV_DEBUG
3523 				bool	success = rtr_update_mbr_field(
3524 					&cursor2, offsets2, &father_cursor,
3525 					merge_page, &new_mbr, NULL, mtr);
3526 
3527 				ut_ad(success);
3528 #else
3529 				rtr_update_mbr_field(
3530 					&cursor2, offsets2, &father_cursor,
3531 					merge_page, &new_mbr, NULL, mtr);
3532 #endif
3533 			} else {
3534 				rtr_node_ptr_delete(
3535 					index, &father_cursor, block, mtr);
3536 			}
3537 
3538 			/* No GAP lock needs to be worrying about */
3539 			lock_mutex_enter();
3540 			lock_prdt_page_free_from_discard(
3541 				block, lock_sys->prdt_page_hash);
3542 			lock_rec_free_all_from_discard_page(block);
3543 			lock_mutex_exit();
3544 		} else {
3545 			btr_node_ptr_delete(index, block, mtr);
3546 			if (!dict_table_is_locking_disabled(index->table)) {
3547 				lock_update_merge_left(
3548 					merge_block, orig_pred, block);
3549 			}
3550 		}
3551 
3552 		if (adjust) {
3553 			nth_rec += page_rec_get_n_recs_before(orig_pred);
3554 		}
3555 	} else {
3556 		rec_t*		orig_succ;
3557 		ibool		compressed;
3558 		dberr_t		err;
3559 		btr_cur_t	cursor2;
3560 					/* father cursor pointing to node ptr
3561 					of the right sibling */
3562 #ifdef UNIV_BTR_DEBUG
3563 		byte		fil_page_prev[4];
3564 #endif /* UNIV_BTR_DEBUG */
3565 
3566 		if (dict_index_is_spatial(index)) {
3567 			cursor2.rtr_info = NULL;
3568 
3569 			/* For spatial index, we disallow merge of blocks
3570 			with different parents, since the merge would need
3571 			to update entry (for MBR and Primary key) in the
3572 			parent of block being merged */
3573 			if (!rtr_check_same_block(
3574 				index, &cursor2,
3575 				btr_cur_get_block(&father_cursor),
3576 				merge_block, heap)) {
3577 				goto err_exit;
3578 			}
3579 
3580 			/* Set rtr_info for cursor2, since it is
3581 			necessary in recursive page merge. */
3582 			cursor2.rtr_info = cursor->rtr_info;
3583 			cursor2.tree_height = cursor->tree_height;
3584 		} else {
3585 			btr_page_get_father(index, merge_block, mtr, &cursor2);
3586 		}
3587 
3588 		if (merge_page_zip && left_page_no == FIL_NULL) {
3589 
3590 			/* The function page_zip_compress(), which will be
3591 			invoked by page_copy_rec_list_end() below,
3592 			requires that FIL_PAGE_PREV be FIL_NULL.
3593 			Clear the field, but prepare to restore it. */
3594 #ifdef UNIV_BTR_DEBUG
3595 			memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3596 #endif /* UNIV_BTR_DEBUG */
3597 #if FIL_NULL != 0xffffffff
3598 # error "FIL_NULL != 0xffffffff"
3599 #endif
3600 			memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
3601 		}
3602 
3603 		orig_succ = page_copy_rec_list_end(merge_block, block,
3604 						   page_get_infimum_rec(page),
3605 						   cursor->index, mtr);
3606 
3607 		if (!orig_succ) {
3608 			ut_a(merge_page_zip);
3609 #ifdef UNIV_BTR_DEBUG
3610 			if (left_page_no == FIL_NULL) {
3611 				/* FIL_PAGE_PREV was restored from
3612 				merge_page_zip. */
3613 				ut_a(!memcmp(fil_page_prev,
3614 					     merge_page + FIL_PAGE_PREV, 4));
3615 			}
3616 #endif /* UNIV_BTR_DEBUG */
3617 			goto err_exit;
3618 		}
3619 
3620 		btr_search_drop_page_hash_index(block);
3621 
3622 #ifdef UNIV_BTR_DEBUG
3623 		if (merge_page_zip && left_page_no == FIL_NULL) {
3624 
3625 			/* Restore FIL_PAGE_PREV in order to avoid an assertion
3626 			failure in btr_level_list_remove(), which will set
3627 			the field again to FIL_NULL.  Even though this makes
3628 			merge_page and merge_page_zip inconsistent for a
3629 			split second, it is harmless, because the pages
3630 			are X-latched. */
3631 			memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3632 		}
3633 #endif /* UNIV_BTR_DEBUG */
3634 
3635 		/* Remove the page from the level list */
3636 		btr_level_list_remove(space, page_size, page, index, mtr);
3637 
3638 		ut_ad(btr_node_ptr_get_child_page_no(
3639 			btr_cur_get_rec(&father_cursor), offsets)
3640 			== block->page.id.page_no());
3641 
3642 		/* Replace the address of the old child node (= page) with the
3643 		address of the merge page to the right */
3644 		btr_node_ptr_set_child_page_no(
3645 			btr_cur_get_rec(&father_cursor),
3646 			btr_cur_get_page_zip(&father_cursor),
3647 			offsets, right_page_no, mtr);
3648 
3649 #ifdef UNIV_DEBUG
3650 		if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3651 			ut_ad(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3652 				page_rec_get_next(page_get_infimum_rec(
3653 					buf_block_get_frame(merge_block))),
3654 				page_is_comp(page)));
3655 		}
3656 #endif /* UNIV_DEBUG */
3657 
3658 		/* For rtree, we need to update father's mbr. */
3659 		if (dict_index_is_spatial(index)) {
3660 			ulint*	offsets2;
3661 			ulint	rec_info;
3662 
3663 			offsets2 = rec_get_offsets(
3664 				btr_cur_get_rec(&cursor2),
3665 				index, NULL, ULINT_UNDEFINED, &heap);
3666 
3667 			ut_ad(btr_node_ptr_get_child_page_no(
3668 				btr_cur_get_rec(&cursor2), offsets2)
3669 				== right_page_no);
3670 
3671 			rec_info = rec_get_info_bits(
3672 				btr_cur_get_rec(&father_cursor),
3673 				rec_offs_comp(offsets));
3674 			if (rec_info & REC_INFO_MIN_REC_FLAG) {
3675 				/* When the father node ptr is minimal rec,
3676 				we will keep it and delete the node ptr of
3677 				merge page. */
3678 				rtr_merge_and_update_mbr(&father_cursor,
3679 							 &cursor2,
3680 							 offsets, offsets2,
3681 							 merge_page,
3682 							 merge_block,
3683 							 block, index, mtr);
3684 			} else {
3685 				/* Otherwise, we will keep the node ptr of
3686 				merge page and delete the father node ptr.
3687 				This is for keeping the rec order in upper
3688 				level. */
3689 				rtr_merge_and_update_mbr(&cursor2,
3690 							 &father_cursor,
3691 							 offsets2, offsets,
3692 							 merge_page,
3693 							 merge_block,
3694 							 block, index, mtr);
3695 			}
3696 			lock_mutex_enter();
3697 			lock_prdt_page_free_from_discard(
3698 				block, lock_sys->prdt_page_hash);
3699 			lock_rec_free_all_from_discard_page(block);
3700 			lock_mutex_exit();
3701 		} else {
3702 
3703 			compressed = btr_cur_pessimistic_delete(&err, TRUE,
3704 								&cursor2,
3705 								BTR_CREATE_FLAG,
3706 								false, mtr);
3707 			ut_a(err == DB_SUCCESS);
3708 
3709 			if (!compressed) {
3710 				btr_cur_compress_if_useful(&cursor2,
3711 							   FALSE,
3712 							   mtr);
3713 			}
3714 
3715 			if (!dict_table_is_locking_disabled(index->table)) {
3716 				lock_update_merge_right(
3717 					merge_block, orig_succ, block);
3718 			}
3719 		}
3720 	}
3721 
3722 	if (!dict_index_is_clust(index)
3723 	    && !dict_table_is_temporary(index->table)
3724 	    && page_is_leaf(merge_page)) {
3725 		/* Update the free bits of the B-tree page in the
3726 		insert buffer bitmap.  This has to be done in a
3727 		separate mini-transaction that is committed before the
3728 		main mini-transaction.  We cannot update the insert
3729 		buffer bitmap in this mini-transaction, because
3730 		btr_compress() can be invoked recursively without
3731 		committing the mini-transaction in between.  Since
3732 		insert buffer bitmap pages have a lower rank than
3733 		B-tree pages, we must not access other pages in the
3734 		same mini-transaction after accessing an insert buffer
3735 		bitmap page. */
3736 
3737 		/* The free bits in the insert buffer bitmap must
3738 		never exceed the free space on a page.  It is safe to
3739 		decrement or reset the bits in the bitmap in a
3740 		mini-transaction that is committed before the
3741 		mini-transaction that affects the free space. */
3742 
3743 		/* It is unsafe to increment the bits in a separately
3744 		committed mini-transaction, because in crash recovery,
3745 		the free bits could momentarily be set too high. */
3746 
3747 		if (page_size.is_compressed()) {
3748 			/* Because the free bits may be incremented
3749 			and we cannot update the insert buffer bitmap
3750 			in the same mini-transaction, the only safe
3751 			thing we can do here is the pessimistic
3752 			approach: reset the free bits. */
3753 			ibuf_reset_free_bits(merge_block);
3754 		} else {
3755 			/* On uncompressed pages, the free bits will
3756 			never increase here.  Thus, it is safe to
3757 			write the bits accurately in a separate
3758 			mini-transaction. */
3759 			ibuf_update_free_bits_if_full(merge_block,
3760 						      UNIV_PAGE_SIZE,
3761 						      ULINT_UNDEFINED);
3762 		}
3763 	}
3764 
3765 	ut_ad(page_validate(merge_page, index));
3766 #ifdef UNIV_ZIP_DEBUG
3767 	ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page,
3768 						  index));
3769 #endif /* UNIV_ZIP_DEBUG */
3770 
3771 	if (dict_index_is_spatial(index)) {
3772 #ifdef UNIV_GIS_DEBUG
3773 		fprintf(stderr, "GIS_DIAG: compressed away  %ld\n",
3774 			(long) block->page.id.page_no());
3775 		fprintf(stderr, "GIS_DIAG: merged to %ld\n",
3776 			(long) merge_block->page.id.page_no());
3777 #endif
3778 
3779 		rtr_check_discard_page(index, NULL, block);
3780 	}
3781 
3782 	/* Free the file page */
3783 	btr_page_free(index, block, mtr);
3784 
3785 	/* btr_check_node_ptr() needs parent block latched.
3786 	If the merge_block's parent block is not same,
3787 	we cannot use btr_check_node_ptr() */
3788 	ut_ad(leftmost_child
3789 	      || btr_check_node_ptr(index, merge_block, mtr));
3790 func_exit:
3791 	mem_heap_free(heap);
3792 
3793 	if (adjust) {
3794 		ut_ad(nth_rec > 0);
3795 		btr_cur_position(
3796 			index,
3797 			page_rec_get_nth(merge_block->frame, nth_rec),
3798 			merge_block, cursor);
3799 	}
3800 
3801 	MONITOR_INC(MONITOR_INDEX_MERGE_SUCCESSFUL);
3802 
3803 	DBUG_RETURN(TRUE);
3804 
3805 err_exit:
3806 	/* We play it safe and reset the free bits. */
3807 	if (page_size.is_compressed()
3808 	    && merge_page
3809 	    && page_is_leaf(merge_page)
3810 	    && !dict_index_is_clust(index)) {
3811 
3812 		ibuf_reset_free_bits(merge_block);
3813 	}
3814 
3815 	mem_heap_free(heap);
3816 	DBUG_RETURN(FALSE);
3817 }
3818 
3819 /*************************************************************//**
3820 Discards a page that is the only page on its level.  This will empty
3821 the whole B-tree, leaving just an empty root page.  This function
3822 should never be reached, because btr_compress(), which is invoked in
3823 delete operations, calls btr_lift_page_up() to flatten the B-tree. */
3824 static
3825 void
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3826 btr_discard_only_page_on_level(
3827 /*===========================*/
3828 	dict_index_t*	index,	/*!< in: index tree */
3829 	buf_block_t*	block,	/*!< in: page which is the only on its level */
3830 	mtr_t*		mtr)	/*!< in: mtr */
3831 {
3832 	ulint		page_level = 0;
3833 	trx_id_t	max_trx_id;
3834 
3835 	/* Save the PAGE_MAX_TRX_ID from the leaf page. */
3836 	max_trx_id = page_get_max_trx_id(buf_block_get_frame(block));
3837 
3838 	while (block->page.id.page_no() != dict_index_get_page(index)) {
3839 		btr_cur_t	cursor;
3840 		buf_block_t*	father;
3841 		const page_t*	page	= buf_block_get_frame(block);
3842 
3843 		ut_a(page_get_n_recs(page) == 1);
3844 		ut_a(page_level == btr_page_get_level(page, mtr));
3845 		ut_a(btr_page_get_prev(page, mtr) == FIL_NULL);
3846 		ut_a(btr_page_get_next(page, mtr) == FIL_NULL);
3847 
3848 		ut_ad(mtr_is_block_fix(
3849 			mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3850 		btr_search_drop_page_hash_index(block);
3851 
3852 		if (dict_index_is_spatial(index)) {
3853 			/* Check any concurrent search having this page */
3854 			rtr_check_discard_page(index, NULL, block);
3855 			rtr_page_get_father(index, block, mtr, NULL, &cursor);
3856 		} else {
3857 			btr_page_get_father(index, block, mtr, &cursor);
3858 		}
3859 		father = btr_cur_get_block(&cursor);
3860 
3861 		if (!dict_table_is_locking_disabled(index->table)) {
3862 			lock_update_discard(
3863 				father, PAGE_HEAP_NO_SUPREMUM, block);
3864 		}
3865 
3866 		/* Free the file page */
3867 		btr_page_free(index, block, mtr);
3868 
3869 		block = father;
3870 		page_level++;
3871 	}
3872 
3873 	/* block is the root page, which must be empty, except
3874 	for the node pointer to the (now discarded) block(s). */
3875 
3876 #ifdef UNIV_BTR_DEBUG
3877 	if (!dict_index_is_ibuf(index)) {
3878 		const page_t*	root	= buf_block_get_frame(block);
3879 		const ulint	space	= dict_index_get_space(index);
3880 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
3881 					    + root, space));
3882 		ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
3883 					    + root, space));
3884 	}
3885 #endif /* UNIV_BTR_DEBUG */
3886 
3887 	btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
3888 	ut_ad(page_is_leaf(buf_block_get_frame(block)));
3889 
3890 	if (!dict_index_is_clust(index)
3891 	    && !dict_table_is_temporary(index->table)) {
3892 		/* We play it safe and reset the free bits for the root */
3893 		ibuf_reset_free_bits(block);
3894 
3895 		ut_a(max_trx_id);
3896 		page_set_max_trx_id(block,
3897 				    buf_block_get_page_zip(block),
3898 				    max_trx_id, mtr);
3899 	}
3900 }
3901 
3902 /*************************************************************//**
3903 Discards a page from a B-tree. This is used to remove the last record from
3904 a B-tree page: the whole page must be removed at the same time. This cannot
3905 be used for the root page, which is allowed to be empty. */
3906 void
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)3907 btr_discard_page(
3908 /*=============*/
3909 	btr_cur_t*	cursor,	/*!< in: cursor on the page to discard: not on
3910 				the root page */
3911 	mtr_t*		mtr)	/*!< in: mtr */
3912 {
3913 	dict_index_t*	index;
3914 	ulint		left_page_no;
3915 	ulint		right_page_no;
3916 	buf_block_t*	merge_block;
3917 	page_t*		merge_page;
3918 	buf_block_t*	block;
3919 	page_t*		page;
3920 	rec_t*		node_ptr;
3921 #ifdef UNIV_DEBUG
3922 	btr_cur_t	parent_cursor;
3923 	bool		parent_is_different = false;
3924 #endif
3925 
3926 	block = btr_cur_get_block(cursor);
3927 	index = btr_cur_get_index(cursor);
3928 
3929 	ut_ad(dict_index_get_page(index) != block->page.id.page_no());
3930 
3931 	ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3932 					MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK)
3933 	      || dict_table_is_intrinsic(index->table));
3934 
3935 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3936 
3937 	const ulint	space = dict_index_get_space(index);
3938 
3939 	MONITOR_INC(MONITOR_INDEX_DISCARD);
3940 
3941 #ifdef UNIV_DEBUG
3942 	if (dict_index_is_spatial(index)) {
3943 		rtr_page_get_father(index, block, mtr, cursor, &parent_cursor);
3944 	} else {
3945 		btr_page_get_father(index, block, mtr, &parent_cursor);
3946 	}
3947 #endif
3948 
3949 	/* Decide the page which will inherit the locks */
3950 
3951 	left_page_no = btr_page_get_prev(buf_block_get_frame(block), mtr);
3952 	right_page_no = btr_page_get_next(buf_block_get_frame(block), mtr);
3953 
3954 	const page_size_t	page_size(dict_table_page_size(index->table));
3955 
3956 	if (left_page_no != FIL_NULL) {
3957 		merge_block = btr_block_get(
3958 			page_id_t(space, left_page_no), page_size,
3959 			RW_X_LATCH, index, mtr);
3960 
3961 		merge_page = buf_block_get_frame(merge_block);
3962 #ifdef UNIV_BTR_DEBUG
3963 		ut_a(btr_page_get_next(merge_page, mtr)
3964 		     == block->page.id.page_no());
3965 #endif /* UNIV_BTR_DEBUG */
3966 		ut_d(parent_is_different =
3967 			(page_rec_get_next(
3968 				page_get_infimum_rec(
3969 					btr_cur_get_page(
3970 						&parent_cursor)))
3971 			 == btr_cur_get_rec(&parent_cursor)));
3972 	} else if (right_page_no != FIL_NULL) {
3973 		merge_block = btr_block_get(
3974 			page_id_t(space, right_page_no), page_size,
3975 			RW_X_LATCH, index, mtr);
3976 
3977 		merge_page = buf_block_get_frame(merge_block);
3978 #ifdef UNIV_BTR_DEBUG
3979 		ut_a(btr_page_get_prev(merge_page, mtr)
3980 		     == block->page.id.page_no());
3981 #endif /* UNIV_BTR_DEBUG */
3982 		ut_d(parent_is_different = page_rec_is_supremum(
3983 			page_rec_get_next(btr_cur_get_rec(&parent_cursor))));
3984 	} else {
3985 		btr_discard_only_page_on_level(index, block, mtr);
3986 
3987 		return;
3988 	}
3989 
3990 	page = buf_block_get_frame(block);
3991 	ut_a(page_is_comp(merge_page) == page_is_comp(page));
3992 	btr_search_drop_page_hash_index(block);
3993 
3994 	if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
3995 
3996 		/* We have to mark the leftmost node pointer on the right
3997 		side page as the predefined minimum record */
3998 		node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));
3999 
4000 		ut_ad(page_rec_is_user_rec(node_ptr));
4001 
4002 		/* This will make page_zip_validate() fail on merge_page
4003 		until btr_level_list_remove() completes.  This is harmless,
4004 		because everything will take place within a single
4005 		mini-transaction and because writing to the redo log
4006 		is an atomic operation (performed by mtr_commit()). */
4007 		btr_set_min_rec_mark(node_ptr, mtr);
4008 	}
4009 
4010 	if (dict_index_is_spatial(index)) {
4011 		btr_cur_t	father_cursor;
4012 
4013 		/* Since rtr_node_ptr_delete doesn't contain get father
4014 		node ptr, so, we need to get father node ptr first and then
4015 		delete it. */
4016 		rtr_page_get_father(index, block, mtr, cursor, &father_cursor);
4017 		rtr_node_ptr_delete(index, &father_cursor, block, mtr);
4018 	} else {
4019 		btr_node_ptr_delete(index, block, mtr);
4020 	}
4021 
4022 	/* Remove the page from the level list */
4023 	btr_level_list_remove(space, page_size, page, index, mtr);
4024 #ifdef UNIV_ZIP_DEBUG
4025 	{
4026 		page_zip_des_t*	merge_page_zip
4027 			= buf_block_get_page_zip(merge_block);
4028 		ut_a(!merge_page_zip
4029 		     || page_zip_validate(merge_page_zip, merge_page, index));
4030 	}
4031 #endif /* UNIV_ZIP_DEBUG */
4032 
4033 	if (!dict_table_is_locking_disabled(index->table)) {
4034 		if (left_page_no != FIL_NULL) {
4035 			lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
4036 					    block);
4037 		} else {
4038 			lock_update_discard(merge_block,
4039 					    lock_get_min_heap_no(merge_block),
4040 					    block);
4041 		}
4042 	}
4043 
4044 	if (dict_index_is_spatial(index)) {
4045 		rtr_check_discard_page(index, cursor, block);
4046 	}
4047 
4048 	/* Free the file page */
4049 	btr_page_free(index, block, mtr);
4050 
4051 	/* btr_check_node_ptr() needs parent block latched.
4052 	If the merge_block's parent block is not same,
4053 	we cannot use btr_check_node_ptr() */
4054 	ut_ad(parent_is_different
4055 	      || btr_check_node_ptr(index, merge_block, mtr));
4056 }
4057 
4058 #ifdef UNIV_BTR_PRINT
4059 /*************************************************************//**
4060 Prints size info of a B-tree. */
4061 void
btr_print_size(dict_index_t * index)4062 btr_print_size(
4063 /*===========*/
4064 	dict_index_t*	index)	/*!< in: index tree */
4065 {
4066 	page_t*		root;
4067 	fseg_header_t*	seg;
4068 	mtr_t		mtr;
4069 
4070 	if (dict_index_is_ibuf(index)) {
4071 		fputs("Sorry, cannot print info of an ibuf tree:"
4072 		      " use ibuf functions\n", stderr);
4073 
4074 		return;
4075 	}
4076 
4077 	mtr_start(&mtr);
4078 
4079 	root = btr_root_get(index, &mtr);
4080 
4081 	seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4082 
4083 	fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
4084 	fseg_print(seg, &mtr);
4085 
4086 	if (!dict_index_is_ibuf(index)) {
4087 
4088 		seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
4089 
4090 		fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
4091 		fseg_print(seg, &mtr);
4092 	}
4093 
4094 	mtr_commit(&mtr);
4095 }
4096 
4097 /************************************************************//**
4098 Prints recursively index tree pages. */
4099 static
4100 void
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,ulint ** offsets,mtr_t * mtr)4101 btr_print_recursive(
4102 /*================*/
4103 	dict_index_t*	index,	/*!< in: index tree */
4104 	buf_block_t*	block,	/*!< in: index page */
4105 	ulint		width,	/*!< in: print this many entries from start
4106 				and end */
4107 	mem_heap_t**	heap,	/*!< in/out: heap for rec_get_offsets() */
4108 	ulint**		offsets,/*!< in/out: buffer for rec_get_offsets() */
4109 	mtr_t*		mtr)	/*!< in: mtr */
4110 {
4111 	const page_t*	page	= buf_block_get_frame(block);
4112 	page_cur_t	cursor;
4113 	ulint		n_recs;
4114 	ulint		i	= 0;
4115 	mtr_t		mtr2;
4116 
4117 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_SX_FIX, index->table));
4118 
4119 	ib::info() << "NODE ON LEVEL " << btr_page_get_level(page, mtr)
4120 		<< " page " << block->page.id;
4121 
4122 	page_print(block, index, width, width);
4123 
4124 	n_recs = page_get_n_recs(page);
4125 
4126 	page_cur_set_before_first(block, &cursor);
4127 	page_cur_move_to_next(&cursor);
4128 
4129 	while (!page_cur_is_after_last(&cursor)) {
4130 
4131 		if (page_is_leaf(page)) {
4132 
4133 			/* If this is the leaf level, do nothing */
4134 
4135 		} else if ((i <= width) || (i >= n_recs - width)) {
4136 
4137 			const rec_t*	node_ptr;
4138 
4139 			mtr_start(&mtr2);
4140 
4141 			node_ptr = page_cur_get_rec(&cursor);
4142 
4143 			*offsets = rec_get_offsets(node_ptr, index, *offsets,
4144 						   ULINT_UNDEFINED, heap);
4145 			btr_print_recursive(index,
4146 					    btr_node_ptr_get_child(node_ptr,
4147 								   index,
4148 								   *offsets,
4149 								   &mtr2),
4150 					    width, heap, offsets, &mtr2);
4151 			mtr_commit(&mtr2);
4152 		}
4153 
4154 		page_cur_move_to_next(&cursor);
4155 		i++;
4156 	}
4157 }
4158 
4159 /**************************************************************//**
4160 Prints directories and other info of all nodes in the tree. */
4161 void
btr_print_index(dict_index_t * index,ulint width)4162 btr_print_index(
4163 /*============*/
4164 	dict_index_t*	index,	/*!< in: index */
4165 	ulint		width)	/*!< in: print this many entries from start
4166 				and end */
4167 {
4168 	mtr_t		mtr;
4169 	buf_block_t*	root;
4170 	mem_heap_t*	heap	= NULL;
4171 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
4172 	ulint*		offsets	= offsets_;
4173 	rec_offs_init(offsets_);
4174 
4175 	fputs("--------------------------\n"
4176 	      "INDEX TREE PRINT\n", stderr);
4177 
4178 	mtr_start(&mtr);
4179 
4180 	root = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4181 
4182 	btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
4183 	if (heap) {
4184 		mem_heap_free(heap);
4185 	}
4186 
4187 	mtr_commit(&mtr);
4188 
4189 	ut_ad(btr_validate_index(index, 0, false));
4190 }
4191 #endif /* UNIV_BTR_PRINT */
4192 
4193 #ifdef UNIV_DEBUG
4194 /************************************************************//**
4195 Checks that the node pointer to a page is appropriate.
4196 @return TRUE */
4197 ibool
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4198 btr_check_node_ptr(
4199 /*===============*/
4200 	dict_index_t*	index,	/*!< in: index tree */
4201 	buf_block_t*	block,	/*!< in: index page */
4202 	mtr_t*		mtr)	/*!< in: mtr */
4203 {
4204 	mem_heap_t*	heap;
4205 	dtuple_t*	tuple;
4206 	ulint*		offsets;
4207 	btr_cur_t	cursor;
4208 	page_t*		page = buf_block_get_frame(block);
4209 
4210 	ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
4211 
4212 	if (dict_index_get_page(index) == block->page.id.page_no()) {
4213 
4214 		return(TRUE);
4215 	}
4216 
4217 	heap = mem_heap_create(256);
4218 
4219 	if (dict_index_is_spatial(index)) {
4220 		offsets = rtr_page_get_father_block(NULL, heap, index, block, mtr,
4221 						    NULL, &cursor);
4222 	} else {
4223 		offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
4224 						    &cursor);
4225 	}
4226 
4227 	if (page_is_leaf(page)) {
4228 
4229 		goto func_exit;
4230 	}
4231 
4232 	tuple = dict_index_build_node_ptr(
4233 		index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
4234 		btr_page_get_level(page, mtr));
4235 
4236 	/* For spatial index, the MBR in the parent rec could be different
4237 	with that of first rec of child, their relationship should be
4238 	"WITHIN" relationship */
4239 	if (dict_index_is_spatial(index)) {
4240 		ut_a(!cmp_dtuple_rec_with_gis(
4241 			tuple, btr_cur_get_rec(&cursor),
4242 			offsets, PAGE_CUR_WITHIN));
4243 	} else {
4244 		ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
4245 	}
4246 func_exit:
4247 	mem_heap_free(heap);
4248 
4249 	return(TRUE);
4250 }
4251 #endif /* UNIV_DEBUG */
4252 
4253 /************************************************************//**
4254 Display identification information for a record. */
4255 static
4256 void
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)4257 btr_index_rec_validate_report(
4258 /*==========================*/
4259 	const page_t*		page,	/*!< in: index page */
4260 	const rec_t*		rec,	/*!< in: index record */
4261 	const dict_index_t*	index)	/*!< in: index */
4262 {
4263 	ib::info() << "Record in index " << index->name
4264 		<< " of table " << index->table->name
4265 		<< ", page " << page_id_t(page_get_space_id(page),
4266 					  page_get_page_no(page))
4267 		<< ", at offset " << page_offset(rec);
4268 }
4269 
4270 /************************************************************//**
4271 Checks the size and number of fields in a record based on the definition of
4272 the index.
4273 @return TRUE if ok */
4274 ibool
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)4275 btr_index_rec_validate(
4276 /*===================*/
4277 	const rec_t*		rec,		/*!< in: index record */
4278 	const dict_index_t*	index,		/*!< in: index */
4279 	ibool			dump_on_error)	/*!< in: TRUE if the function
4280 						should print hex dump of record
4281 						and page on error */
4282 {
4283 	ulint		len;
4284 	ulint		n;
4285 	ulint		i;
4286 	const page_t*	page;
4287 	mem_heap_t*	heap	= NULL;
4288 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
4289 	ulint*		offsets	= offsets_;
4290 	rec_offs_init(offsets_);
4291 
4292 	page = page_align(rec);
4293 
4294 	if (dict_index_is_ibuf(index)) {
4295 		/* The insert buffer index tree can contain records from any
4296 		other index: we cannot check the number of fields or
4297 		their length */
4298 
4299 		return(TRUE);
4300 	}
4301 
4302 #ifdef VIRTUAL_INDEX_DEBUG
4303 	if (dict_index_has_virtual(index)) {
4304 		fprintf(stderr, "index name is %s\n", index->name());
4305 	}
4306 #endif
4307 	if ((ibool)!!page_is_comp(page) != dict_table_is_comp(index->table)) {
4308 		btr_index_rec_validate_report(page, rec, index);
4309 
4310 		ib::error() << "Compact flag=" << !!page_is_comp(page)
4311 			<< ", should be " << dict_table_is_comp(index->table);
4312 
4313 		return(FALSE);
4314 	}
4315 
4316 	n = dict_index_get_n_fields(index);
4317 
4318 	if (!page_is_comp(page)
4319 	    && (rec_get_n_fields_old(rec) != n
4320 		/* a record for older SYS_INDEXES table
4321 		(missing merge_threshold column) is acceptable. */
4322 		&& !(index->id == DICT_INDEXES_ID
4323 		     && rec_get_n_fields_old(rec) == n - 1))) {
4324 		btr_index_rec_validate_report(page, rec, index);
4325 
4326 		ib::error() << "Has " << rec_get_n_fields_old(rec)
4327 			<< " fields, should have " << n;
4328 
4329 		if (dump_on_error) {
4330 			fputs("InnoDB: corrupt record ", stderr);
4331 			rec_print_old(stderr, rec);
4332 			putc('\n', stderr);
4333 		}
4334 		return(FALSE);
4335 	}
4336 
4337 	offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
4338 
4339 	for (i = 0; i < n; i++) {
4340 		dict_field_t*	field = dict_index_get_nth_field(index, i);
4341 		ulint		fixed_size = dict_col_get_fixed_size(
4342 						dict_field_get_col(field),
4343 						page_is_comp(page));
4344 
4345 		rec_get_nth_field_offs(offsets, i, &len);
4346 
4347 		/* Note that if fixed_size != 0, it equals the
4348 		length of a fixed-size column in the clustered index,
4349 		except the DATA_POINT, whose length would be MBR_LEN
4350 		when it's indexed in a R-TREE. We should adjust it here.
4351 		A prefix index of the column is of fixed, but different
4352 		length.  When fixed_size == 0, prefix_len is the maximum
4353 		length of the prefix index column. */
4354 
4355 		if (dict_field_get_col(field)->mtype == DATA_POINT) {
4356 			ut_ad(fixed_size == DATA_POINT_LEN);
4357 			if (dict_index_is_spatial(index)) {
4358 				/* For DATA_POINT data, when it has R-tree
4359 				index, the fixed_len is the MBR of the point.
4360 				But if it's a primary key and on R-TREE
4361 				as the PK pointer, the length shall be
4362 				DATA_POINT_LEN as well. */
4363 				ut_ad((field->fixed_len == DATA_MBR_LEN
4364 				       && i == 0)
4365 				      || (field->fixed_len == DATA_POINT_LEN
4366 					  && i != 0));
4367 				fixed_size = field->fixed_len;
4368 			}
4369 		}
4370 
4371 		if ((field->prefix_len == 0
4372 		     && len != UNIV_SQL_NULL && fixed_size
4373 		     && len != fixed_size)
4374 		    || (field->prefix_len > 0
4375 			&& len != UNIV_SQL_NULL
4376 			&& len
4377 			> field->prefix_len)) {
4378 
4379 			btr_index_rec_validate_report(page, rec, index);
4380 
4381 			ib::error	error;
4382 
4383 			error << "Field " << i << " len is " << len
4384 				<< ", should be " << fixed_size;
4385 
4386 			if (dump_on_error) {
4387 				error << "; ";
4388 				rec_print(error.m_oss, rec,
4389 					  rec_get_info_bits(
4390 						  rec, rec_offs_comp(offsets)),
4391 					  offsets);
4392 			}
4393 			if (heap) {
4394 				mem_heap_free(heap);
4395 			}
4396 			return(FALSE);
4397 		}
4398 	}
4399 
4400 #ifdef VIRTUAL_INDEX_DEBUG
4401 	if (dict_index_has_virtual(index)) {
4402 		rec_print_new(stderr, rec, offsets);
4403 	}
4404 #endif
4405 
4406 	if (heap) {
4407 		mem_heap_free(heap);
4408 	}
4409 	return(TRUE);
4410 }
4411 
4412 /************************************************************//**
4413 Checks the size and number of fields in records based on the definition of
4414 the index.
4415 @return TRUE if ok */
4416 static
4417 ibool
btr_index_page_validate(buf_block_t * block,dict_index_t * index)4418 btr_index_page_validate(
4419 /*====================*/
4420 	buf_block_t*	block,	/*!< in: index page */
4421 	dict_index_t*	index)	/*!< in: index */
4422 {
4423 	page_cur_t	cur;
4424 	ibool		ret	= TRUE;
4425 #ifndef DBUG_OFF
4426 	ulint		nth	= 1;
4427 #endif /* !DBUG_OFF */
4428 
4429 	page_cur_set_before_first(block, &cur);
4430 
4431 	/* Directory slot 0 should only contain the infimum record. */
4432 	DBUG_EXECUTE_IF("check_table_rec_next",
4433 			ut_a(page_rec_get_nth_const(
4434 				     page_cur_get_page(&cur), 0)
4435 			     == cur.rec);
4436 			ut_a(page_dir_slot_get_n_owned(
4437 				     page_dir_get_nth_slot(
4438 					     page_cur_get_page(&cur), 0))
4439 			     == 1););
4440 
4441 	page_cur_move_to_next(&cur);
4442 
4443 	for (;;) {
4444 		if (page_cur_is_after_last(&cur)) {
4445 
4446 			break;
4447 		}
4448 
4449 		if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4450 
4451 			return(FALSE);
4452 		}
4453 
4454 		/* Verify that page_rec_get_nth_const() is correctly
4455 		retrieving each record. */
4456 		DBUG_EXECUTE_IF("check_table_rec_next",
4457 				ut_a(cur.rec == page_rec_get_nth_const(
4458 					     page_cur_get_page(&cur),
4459 					     page_rec_get_n_recs_before(
4460 						     cur.rec)));
4461 				ut_a(nth++ == page_rec_get_n_recs_before(
4462 					     cur.rec)););
4463 
4464 		page_cur_move_to_next(&cur);
4465 	}
4466 
4467 	return(ret);
4468 }
4469 
4470 /************************************************************//**
4471 Report an error on one page of an index tree. */
4472 static
4473 void
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4474 btr_validate_report1(
4475 /*=================*/
4476 	dict_index_t*		index,	/*!< in: index */
4477 	ulint			level,	/*!< in: B-tree level */
4478 	const buf_block_t*	block)	/*!< in: index page */
4479 {
4480 	ib::error	error;
4481 	error << "In page " << block->page.id.page_no()
4482 		<< " of index " << index->name
4483 		<< " of table " << index->table->name;
4484 
4485 	if (level > 0) {
4486 		error << ", index tree level " << level;
4487 	}
4488 }
4489 
4490 /************************************************************//**
4491 Report an error on two pages of an index tree. */
4492 static
4493 void
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4494 btr_validate_report2(
4495 /*=================*/
4496 	const dict_index_t*	index,	/*!< in: index */
4497 	ulint			level,	/*!< in: B-tree level */
4498 	const buf_block_t*	block1,	/*!< in: first index page */
4499 	const buf_block_t*	block2)	/*!< in: second index page */
4500 {
4501 	ib::error	error;
4502 	error << "In pages " << block1->page.id
4503 		<< " and " << block2->page.id << " of index " << index->name
4504 		<< " of table " << index->table->name;
4505 
4506 	if (level > 0) {
4507 		error << ", index tree level " << level;
4508 	}
4509 }
4510 
4511 /************************************************************//**
4512 Validates index tree level.
4513 @return TRUE if ok */
4514 static
4515 bool
btr_validate_level(dict_index_t * index,const trx_t * trx,ulint level,bool lockout)4516 btr_validate_level(
4517 /*===============*/
4518 	dict_index_t*	index,	/*!< in: index tree */
4519 	const trx_t*	trx,	/*!< in: transaction or NULL */
4520 	ulint		level,	/*!< in: level number */
4521 	bool		lockout)/*!< in: true if X-latch index is intended */
4522 {
4523 	buf_block_t*	block;
4524 	page_t*		page;
4525 	buf_block_t*	right_block = 0; /* remove warning */
4526 	page_t*		right_page = 0; /* remove warning */
4527 	page_t*		father_page;
4528 	btr_cur_t	node_cur;
4529 	btr_cur_t	right_node_cur;
4530 	rec_t*		rec;
4531 	ulint		right_page_no;
4532 	ulint		left_page_no;
4533 	page_cur_t	cursor;
4534 	dtuple_t*	node_ptr_tuple;
4535 	bool		ret	= true;
4536 	mtr_t		mtr;
4537 	mem_heap_t*	heap	= mem_heap_create(256);
4538 	fseg_header_t*	seg;
4539 	ulint*		offsets	= NULL;
4540 	ulint*		offsets2= NULL;
4541 #ifdef UNIV_ZIP_DEBUG
4542 	page_zip_des_t*	page_zip;
4543 #endif /* UNIV_ZIP_DEBUG */
4544 	ulint		savepoint = 0;
4545 	ulint		savepoint2 = 0;
4546 	ulint		parent_page_no = FIL_NULL;
4547 	ulint		parent_right_page_no = FIL_NULL;
4548 	bool		rightmost_child = false;
4549 
4550 	mtr_start(&mtr);
4551 
4552 	if (!srv_read_only_mode) {
4553 		if (lockout) {
4554 			mtr_x_lock(dict_index_get_lock(index), &mtr);
4555 		} else {
4556 			mtr_sx_lock(dict_index_get_lock(index), &mtr);
4557 		}
4558 	}
4559 
4560 	block = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4561 	page = buf_block_get_frame(block);
4562 	seg = page + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4563 
4564 #ifdef UNIV_DEBUG
4565 	if (dict_index_is_spatial(index)) {
4566 		fprintf(stderr, "Root page no: %lu\n",
4567 			(ulong) page_get_page_no(page));
4568 	}
4569 #endif
4570 
4571 	const fil_space_t*	space	= fil_space_get(index->space);
4572 	const page_size_t	table_page_size(
4573 		dict_table_page_size(index->table));
4574 	const page_size_t	space_page_size(space->flags);
4575 
4576 	if (!table_page_size.equals_to(space_page_size)) {
4577 
4578 		ib::warn() << "Flags mismatch: table=" << index->table->flags
4579 			<< ", tablespace=" << space->flags;
4580 
4581 		mtr_commit(&mtr);
4582 
4583 		return(false);
4584 	}
4585 
4586 	while (level != btr_page_get_level(page, &mtr)) {
4587 		const rec_t*	node_ptr;
4588 
4589 		if (fseg_page_is_free(seg,
4590 				      block->page.id.space(),
4591 				      block->page.id.page_no())) {
4592 
4593 			btr_validate_report1(index, level, block);
4594 
4595 			ib::warn() << "Page is free";
4596 
4597 			ret = false;
4598 		}
4599 
4600 		ut_a(index->space == block->page.id.space());
4601 		ut_a(index->space == page_get_space_id(page));
4602 #ifdef UNIV_ZIP_DEBUG
4603 		page_zip = buf_block_get_page_zip(block);
4604 		ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4605 #endif /* UNIV_ZIP_DEBUG */
4606 		ut_a(!page_is_leaf(page));
4607 
4608 		page_cur_set_before_first(block, &cursor);
4609 		page_cur_move_to_next(&cursor);
4610 
4611 		node_ptr = page_cur_get_rec(&cursor);
4612 		offsets = rec_get_offsets(node_ptr, index, offsets,
4613 					  ULINT_UNDEFINED, &heap);
4614 
4615 		savepoint2 = mtr_set_savepoint(&mtr);
4616 		block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4617 		page = buf_block_get_frame(block);
4618 
4619 		/* For R-Tree, since record order might not be the same as
4620 		linked index page in the lower level, we need to travers
4621 		backwards to get the first page rec in this level.
4622 		This is only used for index validation. Spatial index
4623 		does not use such scan for any of its DML or query
4624 		operations  */
4625 		if (dict_index_is_spatial(index)) {
4626 			left_page_no = btr_page_get_prev(page, &mtr);
4627 
4628 			while (left_page_no != FIL_NULL) {
4629 				page_id_t	left_page_id(
4630 					index->space, left_page_no);
4631 				/* To obey latch order of tree blocks,
4632 				we should release the right_block once to
4633 				obtain lock of the uncle block. */
4634 				mtr_release_block_at_savepoint(
4635 					&mtr, savepoint2, block);
4636 
4637 				savepoint2 = mtr_set_savepoint(&mtr);
4638 				block = btr_block_get(
4639 					left_page_id,
4640 					table_page_size,
4641 					RW_SX_LATCH, index, &mtr);
4642 				page = buf_block_get_frame(block);
4643 				left_page_no = btr_page_get_prev(page, &mtr);
4644 			}
4645 		}
4646 	}
4647 
4648 	/* Now we are on the desired level. Loop through the pages on that
4649 	level. */
4650 
4651 	if (level == 0) {
4652 		/* Leaf pages are managed in their own file segment. */
4653 		seg -= PAGE_BTR_SEG_TOP - PAGE_BTR_SEG_LEAF;
4654 	}
4655 
4656 loop:
4657 	mem_heap_empty(heap);
4658 	offsets = offsets2 = NULL;
4659 	if (!srv_read_only_mode) {
4660 		if (lockout) {
4661 			mtr_x_lock(dict_index_get_lock(index), &mtr);
4662 		} else {
4663 			mtr_sx_lock(dict_index_get_lock(index), &mtr);
4664 		}
4665 	}
4666 
4667 #ifdef UNIV_ZIP_DEBUG
4668 	page_zip = buf_block_get_page_zip(block);
4669 	ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4670 #endif /* UNIV_ZIP_DEBUG */
4671 
4672 	ut_a(block->page.id.space() == index->space);
4673 
4674 	if (fseg_page_is_free(seg,
4675 			      block->page.id.space(),
4676 			      block->page.id.page_no())) {
4677 
4678 		btr_validate_report1(index, level, block);
4679 
4680 		ib::warn() << "Page is marked as free";
4681 		ret = false;
4682 
4683 	} else if (btr_page_get_index_id(page) != index->id) {
4684 
4685 		ib::error() << "Page index id " << btr_page_get_index_id(page)
4686 			<< " != data dictionary index id " << index->id;
4687 
4688 		ret = false;
4689 
4690 	} else if (!page_validate(page, index)) {
4691 
4692 		btr_validate_report1(index, level, block);
4693 		ret = false;
4694 
4695 	} else if (level == 0 && !btr_index_page_validate(block, index)) {
4696 
4697 		/* We are on level 0. Check that the records have the right
4698 		number of fields, and field lengths are right. */
4699 
4700 		ret = false;
4701 	}
4702 
4703 	ut_a(btr_page_get_level(page, &mtr) == level);
4704 
4705 	right_page_no = btr_page_get_next(page, &mtr);
4706 	left_page_no = btr_page_get_prev(page, &mtr);
4707 
4708 	ut_a(!page_is_empty(page)
4709 	     || (level == 0
4710 		 && page_get_page_no(page) == dict_index_get_page(index)));
4711 
4712 	if (right_page_no != FIL_NULL) {
4713 		const rec_t*	right_rec;
4714 		savepoint = mtr_set_savepoint(&mtr);
4715 
4716 		right_block = btr_block_get(
4717 			page_id_t(index->space, right_page_no),
4718 			table_page_size,
4719 			RW_SX_LATCH, index, &mtr);
4720 
4721 		right_page = buf_block_get_frame(right_block);
4722 
4723 		if (btr_page_get_prev(right_page, &mtr)
4724 		    != page_get_page_no(page)) {
4725 
4726 			btr_validate_report2(index, level, block, right_block);
4727 			fputs("InnoDB: broken FIL_PAGE_NEXT"
4728 			      " or FIL_PAGE_PREV links\n", stderr);
4729 
4730 			ret = false;
4731 		}
4732 
4733 		if (page_is_comp(right_page) != page_is_comp(page)) {
4734 			btr_validate_report2(index, level, block, right_block);
4735 			fputs("InnoDB: 'compact' flag mismatch\n", stderr);
4736 
4737 			ret = false;
4738 
4739 			goto node_ptr_fails;
4740 		}
4741 
4742 		rec = page_rec_get_prev(page_get_supremum_rec(page));
4743 		right_rec = page_rec_get_next(page_get_infimum_rec(
4744 						      right_page));
4745 		offsets = rec_get_offsets(rec, index,
4746 					  offsets, ULINT_UNDEFINED, &heap);
4747 		offsets2 = rec_get_offsets(right_rec, index,
4748 					   offsets2, ULINT_UNDEFINED, &heap);
4749 
4750 		/* For spatial index, we cannot guarantee the key ordering
4751 		across pages, so skip the record compare verification for
4752 		now. Will enhanced in special R-Tree index validation scheme */
4753 		if (!dict_index_is_spatial(index)
4754 		    && cmp_rec_rec(rec, right_rec,
4755 				   offsets, offsets2, index, false) >= 0) {
4756 
4757 			btr_validate_report2(index, level, block, right_block);
4758 
4759 			fputs("InnoDB: records in wrong order"
4760 			      " on adjacent pages\n", stderr);
4761 
4762 			fputs("InnoDB: record ", stderr);
4763 			rec = page_rec_get_prev(page_get_supremum_rec(page));
4764 			rec_print(stderr, rec, index);
4765 			putc('\n', stderr);
4766 			fputs("InnoDB: record ", stderr);
4767 			rec = page_rec_get_next(
4768 				page_get_infimum_rec(right_page));
4769 			rec_print(stderr, rec, index);
4770 			putc('\n', stderr);
4771 
4772 			ret = false;
4773 		}
4774 	}
4775 
4776 	if (level > 0 && left_page_no == FIL_NULL) {
4777 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
4778 			     page_rec_get_next(page_get_infimum_rec(page)),
4779 			     page_is_comp(page)));
4780 	}
4781 
4782 	/* Similarly skip the father node check for spatial index for now,
4783 	for a couple of reasons:
4784 	1) As mentioned, there is no ordering relationship between records
4785 	in parent level and linked pages in the child level.
4786 	2) Search parent from root is very costly for R-tree.
4787 	We will add special validation mechanism for R-tree later (WL #7520) */
4788 	if (!dict_index_is_spatial(index)
4789 	    && block->page.id.page_no() != dict_index_get_page(index)) {
4790 
4791 		/* Check father node pointers */
4792 		rec_t*	node_ptr;
4793 
4794 		btr_cur_position(
4795 			index, page_rec_get_next(page_get_infimum_rec(page)),
4796 			block, &node_cur);
4797 		offsets = btr_page_get_father_node_ptr_for_validate(
4798 			offsets, heap, &node_cur, &mtr);
4799 
4800 		father_page = btr_cur_get_page(&node_cur);
4801 		node_ptr = btr_cur_get_rec(&node_cur);
4802 
4803 		parent_page_no = page_get_page_no(father_page);
4804 		parent_right_page_no = btr_page_get_next(father_page, &mtr);
4805 		rightmost_child = page_rec_is_supremum(
4806 					page_rec_get_next(node_ptr));
4807 
4808 		btr_cur_position(
4809 			index,
4810 			page_rec_get_prev(page_get_supremum_rec(page)),
4811 			block, &node_cur);
4812 
4813 		offsets = btr_page_get_father_node_ptr_for_validate(
4814 				offsets, heap, &node_cur, &mtr);
4815 
4816 		if (node_ptr != btr_cur_get_rec(&node_cur)
4817 		    || btr_node_ptr_get_child_page_no(node_ptr, offsets)
4818 				     != block->page.id.page_no()) {
4819 
4820 			btr_validate_report1(index, level, block);
4821 
4822 			fputs("InnoDB: node pointer to the page is wrong\n",
4823 			      stderr);
4824 
4825 			fputs("InnoDB: node ptr ", stderr);
4826 			rec_print(stderr, node_ptr, index);
4827 
4828 			rec = btr_cur_get_rec(&node_cur);
4829 			fprintf(stderr, "\n"
4830 				"InnoDB: node ptr child page n:o %lu\n",
4831 				(ulong) btr_node_ptr_get_child_page_no(
4832 					rec, offsets));
4833 
4834 			fputs("InnoDB: record on page ", stderr);
4835 			rec_print_new(stderr, rec, offsets);
4836 			putc('\n', stderr);
4837 			ret = false;
4838 
4839 			goto node_ptr_fails;
4840 		}
4841 
4842 		if (!page_is_leaf(page)) {
4843 			node_ptr_tuple = dict_index_build_node_ptr(
4844 				index,
4845 				page_rec_get_next(page_get_infimum_rec(page)),
4846 				0, heap, btr_page_get_level(page, &mtr));
4847 
4848 			if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
4849 					   offsets)) {
4850 				const rec_t* first_rec = page_rec_get_next(
4851 					page_get_infimum_rec(page));
4852 
4853 				btr_validate_report1(index, level, block);
4854 
4855 				ib::error() << "Node ptrs differ on levels > 0";
4856 
4857 				fputs("InnoDB: node ptr ",stderr);
4858 				rec_print_new(stderr, node_ptr, offsets);
4859 				fputs("InnoDB: first rec ", stderr);
4860 				rec_print(stderr, first_rec, index);
4861 				putc('\n', stderr);
4862 				ret = false;
4863 
4864 				goto node_ptr_fails;
4865 			}
4866 		}
4867 
4868 		if (left_page_no == FIL_NULL) {
4869 			ut_a(node_ptr == page_rec_get_next(
4870 				     page_get_infimum_rec(father_page)));
4871 			ut_a(btr_page_get_prev(father_page, &mtr) == FIL_NULL);
4872 		}
4873 
4874 		if (right_page_no == FIL_NULL) {
4875 			ut_a(node_ptr == page_rec_get_prev(
4876 				     page_get_supremum_rec(father_page)));
4877 			ut_a(btr_page_get_next(father_page, &mtr) == FIL_NULL);
4878 		} else {
4879 			const rec_t*	right_node_ptr;
4880 
4881 			right_node_ptr = page_rec_get_next(node_ptr);
4882 
4883 			if (!lockout && rightmost_child) {
4884 
4885 				/* To obey latch order of tree blocks,
4886 				we should release the right_block once to
4887 				obtain lock of the uncle block. */
4888 				mtr_release_block_at_savepoint(
4889 					&mtr, savepoint, right_block);
4890 
4891 				btr_block_get(
4892 					page_id_t(index->space,
4893 						  parent_right_page_no),
4894 					table_page_size,
4895 					RW_SX_LATCH, index, &mtr);
4896 
4897 				right_block = btr_block_get(
4898 					page_id_t(index->space,
4899 						  right_page_no),
4900 					table_page_size,
4901 					RW_SX_LATCH, index, &mtr);
4902 			}
4903 
4904 			btr_cur_position(
4905 				index, page_rec_get_next(
4906 					page_get_infimum_rec(
4907 						buf_block_get_frame(
4908 							right_block))),
4909 				right_block, &right_node_cur);
4910 
4911 			offsets = btr_page_get_father_node_ptr_for_validate(
4912 					offsets, heap, &right_node_cur, &mtr);
4913 
4914 			if (right_node_ptr
4915 			    != page_get_supremum_rec(father_page)) {
4916 
4917 				if (btr_cur_get_rec(&right_node_cur)
4918 				    != right_node_ptr) {
4919 					ret = false;
4920 					fputs("InnoDB: node pointer to"
4921 					      " the right page is wrong\n",
4922 					      stderr);
4923 
4924 					btr_validate_report1(index, level,
4925 							     block);
4926 				}
4927 			} else {
4928 				page_t*	right_father_page
4929 					= btr_cur_get_page(&right_node_cur);
4930 
4931 				if (btr_cur_get_rec(&right_node_cur)
4932 				    != page_rec_get_next(
4933 					    page_get_infimum_rec(
4934 						    right_father_page))) {
4935 					ret = false;
4936 					fputs("InnoDB: node pointer 2 to"
4937 					      " the right page is wrong\n",
4938 					      stderr);
4939 
4940 					btr_validate_report1(index, level,
4941 							     block);
4942 				}
4943 
4944 				if (page_get_page_no(right_father_page)
4945 				    != btr_page_get_next(father_page, &mtr)) {
4946 
4947 					ret = false;
4948 					fputs("InnoDB: node pointer 3 to"
4949 					      " the right page is wrong\n",
4950 					      stderr);
4951 
4952 					btr_validate_report1(index, level,
4953 							     block);
4954 				}
4955 			}
4956 		}
4957 	}
4958 
4959 node_ptr_fails:
4960 	/* Commit the mini-transaction to release the latch on 'page'.
4961 	Re-acquire the latch on right_page, which will become 'page'
4962 	on the next loop.  The page has already been checked. */
4963 	mtr_commit(&mtr);
4964 
4965 	if (trx_is_interrupted(trx)) {
4966 		/* On interrupt, return the current status. */
4967 	} else if (right_page_no != FIL_NULL) {
4968 
4969 		mtr_start(&mtr);
4970 
4971 		if (!lockout) {
4972 			if (rightmost_child) {
4973 				if (parent_right_page_no != FIL_NULL) {
4974 					btr_block_get(
4975 						page_id_t(
4976 							index->space,
4977 							parent_right_page_no),
4978 						table_page_size,
4979 						RW_SX_LATCH, index, &mtr);
4980 				}
4981 			} else if (parent_page_no != FIL_NULL) {
4982 				btr_block_get(
4983 					page_id_t(index->space,
4984 						  parent_page_no),
4985 					table_page_size,
4986 					RW_SX_LATCH, index, &mtr);
4987 			}
4988 		}
4989 
4990 		block = btr_block_get(
4991 			page_id_t(index->space, right_page_no),
4992 			table_page_size,
4993 			RW_SX_LATCH, index, &mtr);
4994 
4995 		page = buf_block_get_frame(block);
4996 
4997 		goto loop;
4998 	}
4999 
5000 	mem_heap_free(heap);
5001 
5002 	return(ret);
5003 }
5004 
5005 /**************************************************************//**
5006 Do an index level validation of spaital index tree.
5007 @return	true if no error found */
5008 bool
btr_validate_spatial_index(dict_index_t * index,const trx_t * trx)5009 btr_validate_spatial_index(
5010 /*=======================*/
5011 	dict_index_t*	index,	/*!< in: index */
5012 	const trx_t*	trx)	/*!< in: transaction or NULL */
5013 {
5014 
5015 	mtr_t	mtr;
5016 	bool	ok = true;
5017 
5018 	mtr_start(&mtr);
5019 
5020 	mtr_x_lock(dict_index_get_lock(index), &mtr);
5021 
5022 	page_t*	root = btr_root_get(index, &mtr);
5023 	ulint	n = btr_page_get_level(root, &mtr);
5024 
5025 #ifdef UNIV_RTR_DEBUG
5026 	fprintf(stderr, "R-tree level is %lu\n", n);
5027 #endif /* UNIV_RTR_DEBUG */
5028 
5029 	for (ulint i = 0; i <= n; ++i) {
5030 #ifdef UNIV_RTR_DEBUG
5031 		fprintf(stderr, "Level %lu:\n", n - i);
5032 #endif /* UNIV_RTR_DEBUG */
5033 
5034 		if (!btr_validate_level(index, trx, n - i, true)) {
5035 			ok = false;
5036 			break;
5037 		}
5038 	}
5039 
5040 	mtr_commit(&mtr);
5041 
5042 	return(ok);
5043 }
5044 
5045 /**************************************************************//**
5046 Checks the consistency of an index tree.
5047 @return true if ok */
5048 bool
btr_validate_index(dict_index_t * index,const trx_t * trx,bool lockout)5049 btr_validate_index(
5050 /*===============*/
5051 	dict_index_t*	index,	/*!< in: index */
5052 	const trx_t*	trx,	/*!< in: transaction or NULL */
5053 	bool		lockout)/*!< in: true if X-latch index is intended */
5054 {
5055 	/* Full Text index are implemented by auxiliary tables,
5056 	not the B-tree */
5057 	if (dict_index_is_online_ddl(index) || (index->type & DICT_FTS)) {
5058 		return(true);
5059 	}
5060 
5061 	if (dict_index_is_spatial(index)) {
5062 		return(btr_validate_spatial_index(index, trx));
5063 	}
5064 
5065 	mtr_t		mtr;
5066 
5067 	mtr_start(&mtr);
5068 
5069 	if (!srv_read_only_mode) {
5070 		if (lockout) {
5071 			mtr_x_lock(dict_index_get_lock(index), &mtr);
5072 		} else {
5073 			mtr_sx_lock(dict_index_get_lock(index), &mtr);
5074 		}
5075 	}
5076 
5077 	bool	ok = true;
5078 	page_t*	root = btr_root_get(index, &mtr);
5079 	ulint	n = btr_page_get_level(root, &mtr);
5080 
5081 	for (ulint i = 0; i <= n; ++i) {
5082 
5083 		if (!btr_validate_level(index, trx, n - i, lockout)) {
5084 			ok = false;
5085 			break;
5086 		}
5087 	}
5088 
5089 	mtr_commit(&mtr);
5090 
5091 	return(ok);
5092 }
5093 
5094 /**************************************************************//**
5095 Checks if the page in the cursor can be merged with given page.
5096 If necessary, re-organize the merge_page.
5097 @return	true if possible to merge. */
5098 bool
btr_can_merge_with_page(btr_cur_t * cursor,ulint page_no,buf_block_t ** merge_block,mtr_t * mtr)5099 btr_can_merge_with_page(
5100 /*====================*/
5101 	btr_cur_t*	cursor,		/*!< in: cursor on the page to merge */
5102 	ulint		page_no,	/*!< in: a sibling page */
5103 	buf_block_t**	merge_block,	/*!< out: the merge block */
5104 	mtr_t*		mtr)		/*!< in: mini-transaction */
5105 {
5106 	dict_index_t*	index;
5107 	page_t*		page;
5108 	ulint		n_recs;
5109 	ulint		data_size;
5110 	ulint		max_ins_size_reorg;
5111 	ulint		max_ins_size;
5112 	buf_block_t*	mblock;
5113 	page_t*		mpage;
5114 	DBUG_ENTER("btr_can_merge_with_page");
5115 
5116 	if (page_no == FIL_NULL) {
5117 		*merge_block = NULL;
5118 		DBUG_RETURN(false);
5119 	}
5120 
5121 	index = btr_cur_get_index(cursor);
5122 	page = btr_cur_get_page(cursor);
5123 
5124 	const page_id_t		page_id(dict_index_get_space(index), page_no);
5125 	const page_size_t	page_size(dict_table_page_size(index->table));
5126 
5127 	mblock = btr_block_get(page_id, page_size, RW_X_LATCH, index, mtr);
5128 	mpage = buf_block_get_frame(mblock);
5129 
5130 	n_recs = page_get_n_recs(page);
5131 	data_size = page_get_data_size(page);
5132 
5133 	max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
5134 		mpage, n_recs);
5135 
5136 	if (data_size > max_ins_size_reorg) {
5137 		goto error;
5138 	}
5139 
5140 	/* If compression padding tells us that merging will result in
5141 	too packed up page i.e.: which is likely to cause compression
5142 	failure then don't merge the pages. */
5143 	if (page_size.is_compressed() && page_is_leaf(mpage)
5144 	    && (page_get_data_size(mpage) + data_size
5145 		>= dict_index_zip_pad_optimal_page_size(index))) {
5146 
5147 		goto error;
5148 	}
5149 
5150 
5151 	max_ins_size = page_get_max_insert_size(mpage, n_recs);
5152 
5153 	if (data_size > max_ins_size) {
5154 
5155 		/* We have to reorganize mpage */
5156 
5157 		if (!btr_page_reorganize_block(
5158 			    false, page_zip_level, mblock, index, mtr)) {
5159 
5160 			goto error;
5161 		}
5162 
5163 		max_ins_size = page_get_max_insert_size(mpage, n_recs);
5164 
5165 		ut_ad(page_validate(mpage, index));
5166 		ut_ad(max_ins_size == max_ins_size_reorg);
5167 
5168 		if (data_size > max_ins_size) {
5169 
5170 			/* Add fault tolerance, though this should
5171 			never happen */
5172 
5173 			goto error;
5174 		}
5175 	}
5176 
5177 	*merge_block = mblock;
5178 	DBUG_RETURN(true);
5179 
5180 error:
5181 	*merge_block = NULL;
5182 	DBUG_RETURN(false);
5183 }
5184 
5185 #endif /* !UNIV_HOTBACKUP */
5186