1 /*****************************************************************************
2
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2014, 2021, MariaDB Corporation.
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *****************************************************************************/
20
21 /**************************************************//**
22 @file btr/btr0btr.cc
23 The B-tree
24
25 Created 6/2/1994 Heikki Tuuri
26 *******************************************************/
27
28 #include "btr0btr.h"
29
30 #include "page0page.h"
31 #include "page0zip.h"
32 #include "gis0rtree.h"
33
34 #include "btr0cur.h"
35 #include "btr0sea.h"
36 #include "btr0pcur.h"
37 #include "btr0defragment.h"
38 #include "rem0cmp.h"
39 #include "lock0lock.h"
40 #include "ibuf0ibuf.h"
41 #include "trx0trx.h"
42 #include "srv0mon.h"
43 #include "gis0geo.h"
44 #include "dict0boot.h"
45 #include "row0sel.h" /* row_search_max_autoinc() */
46
47 Atomic_counter<uint32_t> btr_validate_index_running;
48
49 /**************************************************************//**
50 Checks if the page in the cursor can be merged with given page.
51 If necessary, re-organize the merge_page.
52 @return true if possible to merge. */
53 static
54 bool
55 btr_can_merge_with_page(
56 /*====================*/
57 btr_cur_t* cursor, /*!< in: cursor on the page to merge */
58 ulint page_no, /*!< in: a sibling page */
59 buf_block_t** merge_block, /*!< out: the merge block */
60 mtr_t* mtr); /*!< in: mini-transaction */
61
62 /** Report that an index page is corrupted.
63 @param[in] buffer block
64 @param[in] index tree */
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)65 void btr_corruption_report(const buf_block_t* block, const dict_index_t* index)
66 {
67 ib::fatal()
68 << "Flag mismatch in page " << block->page.id
69 << " index " << index->name
70 << " of table " << index->table->name;
71 }
72
73 /*
74 Latching strategy of the InnoDB B-tree
75 --------------------------------------
76
77 Node pointer page latches acquisition is protected by index->lock latch.
78
79 Before MariaDB 10.2.2, all node pointer pages were protected by index->lock
80 either in S (shared) or X (exclusive) mode and block->lock was not acquired on
81 node pointer pages.
82
83 After MariaDB 10.2.2, block->lock S-latch or X-latch is used to protect
84 node pointer pages and obtaiment of node pointer page latches is protected by
85 index->lock.
86
87 (0) Definition: B-tree level.
88
89 (0.1) The leaf pages of the B-tree are at level 0.
90
91 (0.2) The parent of a page at level L has level L+1. (The level of the
92 root page is equal to the tree height.)
93
94 (0.3) The B-tree lock (index->lock) is the parent of the root page and
95 has a level = tree height + 1.
96
97 Index->lock has 3 possible locking modes:
98
99 (1) S-latch:
100
101 (1.1) All latches for pages must be obtained in descending order of tree level.
102
103 (1.2) Before obtaining the first node pointer page latch at a given B-tree
104 level, parent latch must be held (at level +1 ).
105
106 (1.3) If a node pointer page is already latched at the same level
107 we can only obtain latch to its right sibling page latch at the same level.
108
109 (1.4) Release of the node pointer page latches must be done in
110 child-to-parent order. (Prevents deadlocks when obtained index->lock
111 in SX mode).
112
113 (1.4.1) Level L node pointer page latch can be released only when
114 no latches at children level i.e. level < L are hold.
115
116 (1.4.2) All latches from node pointer pages must be released so
117 that no latches are obtained between.
118
119 (1.5) [implied by (1.1), (1.2)] Root page latch must be first node pointer
120 latch obtained.
121
122 (2) SX-latch:
123
124 In this case rules (1.2) and (1.3) from S-latch case are relaxed and
125 merged into (2.2) and rule (1.4) is removed. Thus, latch acquisition
126 can be skipped at some tree levels and latches can be obtained in
127 a less restricted order.
128
129 (2.1) [identical to (1.1)]: All latches for pages must be obtained in descending
130 order of tree level.
131
132 (2.2) When a node pointer latch at level L is obtained,
133 the left sibling page latch in the same level or some ancestor
134 page latch (at level > L) must be hold.
135
136 (2.3) [implied by (2.1), (2.2)] The first node pointer page latch obtained can
137 be any node pointer page.
138
139 (3) X-latch:
140
141 Node pointer latches can be obtained in any order.
142
143 NOTE: New rules after MariaDB 10.2.2 does not affect the latching rules of leaf pages:
144
145 index->lock S-latch is needed in read for the node pointer traversal. When the leaf
146 level is reached, index-lock can be released (and with the MariaDB 10.2.2 changes, all
147 node pointer latches). Left to right index travelsal in leaf page level can be safely done
148 by obtaining right sibling leaf page latch and then releasing the old page latch.
149
150 Single leaf page modifications (BTR_MODIFY_LEAF) are protected by index->lock
151 S-latch.
152
153 B-tree operations involving page splits or merges (BTR_MODIFY_TREE) and page
154 allocations are protected by index->lock X-latch.
155
156 Node pointers
157 -------------
158 Leaf pages of a B-tree contain the index records stored in the
159 tree. On levels n > 0 we store 'node pointers' to pages on level
160 n - 1. For each page there is exactly one node pointer stored:
161 thus the our tree is an ordinary B-tree, not a B-link tree.
162
163 A node pointer contains a prefix P of an index record. The prefix
164 is long enough so that it determines an index record uniquely.
165 The file page number of the child page is added as the last
166 field. To the child page we can store node pointers or index records
167 which are >= P in the alphabetical order, but < P1 if there is
168 a next node pointer on the level, and P1 is its prefix.
169
170 If a node pointer with a prefix P points to a non-leaf child,
171 then the leftmost record in the child must have the same
172 prefix P. If it points to a leaf node, the child is not required
173 to contain any record with a prefix equal to P. The leaf case
174 is decided this way to allow arbitrary deletions in a leaf node
175 without touching upper levels of the tree.
176
177 We have predefined a special minimum record which we
178 define as the smallest record in any alphabetical order.
179 A minimum record is denoted by setting a bit in the record
180 header. A minimum record acts as the prefix of a node pointer
181 which points to a leftmost node on any level of the tree.
182
183 File page allocation
184 --------------------
185 In the root node of a B-tree there are two file segment headers.
186 The leaf pages of a tree are allocated from one file segment, to
187 make them consecutive on disk if possible. From the other file segment
188 we allocate pages for the non-leaf levels of the tree.
189 */
190
191 #ifdef UNIV_BTR_DEBUG
192 /**************************************************************//**
193 Checks a file segment header within a B-tree root page.
194 @return TRUE if valid */
195 static
196 ibool
btr_root_fseg_validate(const fseg_header_t * seg_header,ulint space)197 btr_root_fseg_validate(
198 /*===================*/
199 const fseg_header_t* seg_header, /*!< in: segment header */
200 ulint space) /*!< in: tablespace identifier */
201 {
202 ulint offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
203
204 ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
205 ut_a(offset >= FIL_PAGE_DATA);
206 ut_a(offset <= srv_page_size - FIL_PAGE_DATA_END);
207 return(TRUE);
208 }
209 #endif /* UNIV_BTR_DEBUG */
210
211 /**************************************************************//**
212 Gets the root node of a tree and x- or s-latches it.
213 @return root page, x- or s-latched */
214 buf_block_t*
btr_root_block_get(const dict_index_t * index,ulint mode,mtr_t * mtr)215 btr_root_block_get(
216 /*===============*/
217 const dict_index_t* index, /*!< in: index tree */
218 ulint mode, /*!< in: either RW_S_LATCH
219 or RW_X_LATCH */
220 mtr_t* mtr) /*!< in: mtr */
221 {
222 if (!index->table || !index->table->space) {
223 return NULL;
224 }
225
226 buf_block_t* block = btr_block_get(
227 page_id_t(index->table->space_id, index->page),
228 index->table->space->zip_size(), mode,
229 index, mtr);
230
231 if (!block) {
232 index->table->file_unreadable = true;
233
234 ib_push_warning(
235 static_cast<THD*>(NULL), DB_DECRYPTION_FAILED,
236 "Table %s in file %s is encrypted but encryption service or"
237 " used key_id is not available. "
238 " Can't continue reading table.",
239 index->table->name.m_name,
240 UT_LIST_GET_FIRST(index->table->space->chain)->name);
241
242 return NULL;
243 }
244
245 btr_assert_not_corrupted(block, index);
246
247 #ifdef UNIV_BTR_DEBUG
248 if (!dict_index_is_ibuf(index)) {
249 const page_t* root = buf_block_get_frame(block);
250
251 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
252 + root, index->table->space_id));
253 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
254 + root, index->table->space_id));
255 }
256 #endif /* UNIV_BTR_DEBUG */
257
258 return(block);
259 }
260
261 /**************************************************************//**
262 Gets the root node of a tree and sx-latches it for segment access.
263 @return root page, sx-latched */
264 page_t*
btr_root_get(const dict_index_t * index,mtr_t * mtr)265 btr_root_get(
266 /*=========*/
267 const dict_index_t* index, /*!< in: index tree */
268 mtr_t* mtr) /*!< in: mtr */
269 {
270 /* Intended to be used for segment list access.
271 SX lock doesn't block reading user data by other threads.
272 And block the segment list access by others.*/
273 buf_block_t* root = btr_root_block_get(index, RW_SX_LATCH,
274 mtr);
275 return(root ? buf_block_get_frame(root) : NULL);
276 }
277
278 /**************************************************************//**
279 Gets the height of the B-tree (the level of the root, when the leaf
280 level is assumed to be 0). The caller must hold an S or X latch on
281 the index.
282 @return tree height (level of the root) */
283 ulint
btr_height_get(const dict_index_t * index,mtr_t * mtr)284 btr_height_get(
285 /*===========*/
286 const dict_index_t* index, /*!< in: index tree */
287 mtr_t* mtr) /*!< in/out: mini-transaction */
288 {
289 ulint height=0;
290 buf_block_t* root_block;
291
292 ut_ad(srv_read_only_mode
293 || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
294 MTR_MEMO_S_LOCK
295 | MTR_MEMO_X_LOCK
296 | MTR_MEMO_SX_LOCK));
297
298 /* S latches the page */
299 root_block = btr_root_block_get(index, RW_S_LATCH, mtr);
300
301 if (root_block) {
302 height = btr_page_get_level(buf_block_get_frame(root_block));
303
304 /* Release the S latch on the root page. */
305 mtr->memo_release(root_block, MTR_MEMO_PAGE_S_FIX);
306
307 ut_d(sync_check_unlock(&root_block->lock));
308 }
309
310 return(height);
311 }
312
313 /**************************************************************//**
314 Checks a file segment header within a B-tree root page and updates
315 the segment header space id.
316 @return TRUE if valid */
317 static
318 bool
btr_root_fseg_adjust_on_import(fseg_header_t * seg_header,page_zip_des_t * page_zip,ulint space)319 btr_root_fseg_adjust_on_import(
320 /*===========================*/
321 fseg_header_t* seg_header, /*!< in/out: segment header */
322 page_zip_des_t* page_zip, /*!< in/out: compressed page,
323 or NULL */
324 ulint space) /*!< in: tablespace identifier */
325 {
326 ulint offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
327
328 if (offset < FIL_PAGE_DATA
329 || offset > srv_page_size - FIL_PAGE_DATA_END) {
330 return false;
331 }
332
333 seg_header += FSEG_HDR_SPACE;
334
335 mach_write_to_4(seg_header, space);
336 if (UNIV_LIKELY_NULL(page_zip)) {
337 memcpy(page_zip->data + page_offset(seg_header), seg_header,
338 4);
339 }
340
341 return true;
342 }
343
344 /**************************************************************//**
345 Checks and adjusts the root node of a tree during IMPORT TABLESPACE.
346 @return error code, or DB_SUCCESS */
347 dberr_t
btr_root_adjust_on_import(const dict_index_t * index)348 btr_root_adjust_on_import(
349 /*======================*/
350 const dict_index_t* index) /*!< in: index tree */
351 {
352 dberr_t err;
353 mtr_t mtr;
354 page_t* page;
355 buf_block_t* block;
356 page_zip_des_t* page_zip;
357 dict_table_t* table = index->table;
358 const page_id_t page_id(table->space_id, index->page);
359 const ulint zip_size = table->space->zip_size();
360
361 DBUG_EXECUTE_IF("ib_import_trigger_corruption_3",
362 return(DB_CORRUPTION););
363
364 mtr_start(&mtr);
365
366 mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
367
368 block = btr_block_get(page_id, zip_size, RW_X_LATCH, index, &mtr);
369
370 page = buf_block_get_frame(block);
371 page_zip = buf_block_get_page_zip(block);
372
373 if (!fil_page_index_page_check(page) || page_has_siblings(page)) {
374 err = DB_CORRUPTION;
375
376 } else if (dict_index_is_clust(index)) {
377 bool page_is_compact_format;
378
379 page_is_compact_format = page_is_comp(page) > 0;
380
381 /* Check if the page format and table format agree. */
382 if (page_is_compact_format != dict_table_is_comp(table)) {
383 err = DB_CORRUPTION;
384 } else {
385 /* Check that the table flags and the tablespace
386 flags match. */
387 ulint tf = dict_tf_to_fsp_flags(table->flags);
388 ulint sf = table->space->flags;
389 sf &= ~FSP_FLAGS_MEM_MASK;
390 tf &= ~FSP_FLAGS_MEM_MASK;
391 if (fil_space_t::is_flags_equal(tf, sf)
392 || fil_space_t::is_flags_equal(sf, tf)) {
393 mutex_enter(&fil_system.mutex);
394 table->space->flags = (table->space->flags
395 & ~FSP_FLAGS_MEM_MASK)
396 | (tf & FSP_FLAGS_MEM_MASK);
397 mutex_exit(&fil_system.mutex);
398 err = DB_SUCCESS;
399 } else {
400 err = DB_CORRUPTION;
401 }
402 }
403 } else {
404 err = DB_SUCCESS;
405 }
406
407 /* Check and adjust the file segment headers, if all OK so far. */
408 if (err == DB_SUCCESS
409 && (!btr_root_fseg_adjust_on_import(
410 FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
411 + page, page_zip, table->space_id)
412 || !btr_root_fseg_adjust_on_import(
413 FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
414 + page, page_zip, table->space_id))) {
415
416 err = DB_CORRUPTION;
417 }
418
419 mtr_commit(&mtr);
420
421 return(err);
422 }
423
424 /**************************************************************//**
425 Creates a new index page (not the root, and also not
426 used in page reorganization). @see btr_page_empty(). */
427 void
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)428 btr_page_create(
429 /*============*/
430 buf_block_t* block, /*!< in/out: page to be created */
431 page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
432 dict_index_t* index, /*!< in: index */
433 ulint level, /*!< in: the B-tree level of the page */
434 mtr_t* mtr) /*!< in: mtr */
435 {
436 page_t* page = buf_block_get_frame(block);
437
438 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
439
440 if (page_zip) {
441 page_create_zip(block, index, level, 0, mtr);
442 } else {
443 page_create(block, mtr, dict_table_is_comp(index->table),
444 dict_index_is_spatial(index));
445 /* Set the level of the new index page */
446 btr_page_set_level(page, NULL, level, mtr);
447 }
448
449 /* For Spatial Index, initialize the Split Sequence Number */
450 if (dict_index_is_spatial(index)) {
451 page_set_ssn_id(block, page_zip, 0, mtr);
452 }
453
454 btr_page_set_index_id(page, page_zip, index->id, mtr);
455 }
456
457 /**************************************************************//**
458 Allocates a new file page to be used in an ibuf tree. Takes the page from
459 the free list of the tree, which must contain pages!
460 @return new allocated block, x-latched */
461 static
462 buf_block_t*
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)463 btr_page_alloc_for_ibuf(
464 /*====================*/
465 dict_index_t* index, /*!< in: index tree */
466 mtr_t* mtr) /*!< in: mtr */
467 {
468 fil_addr_t node_addr;
469 page_t* root;
470 page_t* new_page;
471 buf_block_t* new_block;
472
473 root = btr_root_get(index, mtr);
474
475 node_addr = flst_get_first(root + PAGE_HEADER
476 + PAGE_BTR_IBUF_FREE_LIST, mtr);
477 ut_a(node_addr.page != FIL_NULL);
478
479 new_block = buf_page_get(
480 page_id_t(index->table->space_id, node_addr.page),
481 index->table->space->zip_size(),
482 RW_X_LATCH, mtr);
483
484 new_page = buf_block_get_frame(new_block);
485 buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
486
487 flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
488 new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
489 mtr);
490 ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
491 mtr));
492
493 return(new_block);
494 }
495
496 /**************************************************************//**
497 Allocates a new file page to be used in an index tree. NOTE: we assume
498 that the caller has made the reservation for free extents!
499 @retval NULL if no page could be allocated
500 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
501 (init_mtr == mtr, or the page was not previously freed in mtr)
502 @retval block (not allocated or initialized) otherwise */
503 static MY_ATTRIBUTE((nonnull, warn_unused_result))
504 buf_block_t*
btr_page_alloc_low(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)505 btr_page_alloc_low(
506 /*===============*/
507 dict_index_t* index, /*!< in: index */
508 ulint hint_page_no, /*!< in: hint of a good page */
509 byte file_direction, /*!< in: direction where a possible
510 page split is made */
511 ulint level, /*!< in: level where the page is placed
512 in the tree */
513 mtr_t* mtr, /*!< in/out: mini-transaction
514 for the allocation */
515 mtr_t* init_mtr) /*!< in/out: mtr or another
516 mini-transaction in which the
517 page should be initialized.
518 If init_mtr!=mtr, but the page
519 is already X-latched in mtr, do
520 not initialize the page. */
521 {
522 fseg_header_t* seg_header;
523 page_t* root;
524
525 root = btr_root_get(index, mtr);
526
527 if (level == 0) {
528 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
529 } else {
530 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
531 }
532
533 /* Parameter TRUE below states that the caller has made the
534 reservation for free extents, and thus we know that a page can
535 be allocated: */
536
537 buf_block_t* block = fseg_alloc_free_page_general(
538 seg_header, hint_page_no, file_direction,
539 TRUE, mtr, init_mtr);
540
541 #ifdef UNIV_DEBUG_SCRUBBING
542 if (block != NULL) {
543 fprintf(stderr,
544 "alloc %lu:%lu to index: %lu root: %lu\n",
545 buf_block_get_page_no(block),
546 buf_block_get_space(block),
547 index->id,
548 dict_index_get_page(index));
549 } else {
550 fprintf(stderr,
551 "failed alloc index: %lu root: %lu\n",
552 index->id,
553 dict_index_get_page(index));
554 }
555 #endif /* UNIV_DEBUG_SCRUBBING */
556
557 return block;
558 }
559
560 /**************************************************************//**
561 Allocates a new file page to be used in an index tree. NOTE: we assume
562 that the caller has made the reservation for free extents!
563 @retval NULL if no page could be allocated
564 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
565 (init_mtr == mtr, or the page was not previously freed in mtr)
566 @retval block (not allocated or initialized) otherwise */
567 buf_block_t*
btr_page_alloc(dict_index_t * index,ulint hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)568 btr_page_alloc(
569 /*===========*/
570 dict_index_t* index, /*!< in: index */
571 ulint hint_page_no, /*!< in: hint of a good page */
572 byte file_direction, /*!< in: direction where a possible
573 page split is made */
574 ulint level, /*!< in: level where the page is placed
575 in the tree */
576 mtr_t* mtr, /*!< in/out: mini-transaction
577 for the allocation */
578 mtr_t* init_mtr) /*!< in/out: mini-transaction
579 for x-latching and initializing
580 the page */
581 {
582 buf_block_t* new_block;
583
584 if (dict_index_is_ibuf(index)) {
585
586 return(btr_page_alloc_for_ibuf(index, mtr));
587 }
588
589 new_block = btr_page_alloc_low(
590 index, hint_page_no, file_direction, level, mtr, init_mtr);
591
592 if (new_block) {
593 buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
594 }
595
596 return(new_block);
597 }
598
599 /**************************************************************//**
600 Gets the number of pages in a B-tree.
601 @return number of pages, or ULINT_UNDEFINED if the index is unavailable */
602 ulint
btr_get_size(const dict_index_t * index,ulint flag,mtr_t * mtr)603 btr_get_size(
604 /*=========*/
605 const dict_index_t* index, /*!< in: index */
606 ulint flag, /*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
607 mtr_t* mtr) /*!< in/out: mini-transaction where index
608 is s-latched */
609 {
610 fseg_header_t* seg_header;
611 page_t* root;
612 ulint n=0;
613 ulint dummy;
614
615 ut_ad(srv_read_only_mode
616 || mtr_memo_contains(mtr, dict_index_get_lock(index),
617 MTR_MEMO_S_LOCK));
618
619 if (index->page == FIL_NULL
620 || dict_index_is_online_ddl(index)
621 || !index->is_committed()) {
622 return(ULINT_UNDEFINED);
623 }
624
625 root = btr_root_get(index, mtr);
626
627 if (root) {
628 if (flag == BTR_N_LEAF_PAGES) {
629 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
630
631 fseg_n_reserved_pages(seg_header, &n, mtr);
632
633 } else if (flag == BTR_TOTAL_SIZE) {
634 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
635
636 n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
637
638 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
639
640 n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
641 } else {
642 ut_error;
643 }
644 } else {
645 n = ULINT_UNDEFINED;
646 }
647
648 return(n);
649 }
650
651 /**************************************************************//**
652 Gets the number of reserved and used pages in a B-tree.
653 @return number of pages reserved, or ULINT_UNDEFINED if the index
654 is unavailable */
655 UNIV_INTERN
656 ulint
btr_get_size_and_reserved(dict_index_t * index,ulint flag,ulint * used,mtr_t * mtr)657 btr_get_size_and_reserved(
658 /*======================*/
659 dict_index_t* index, /*!< in: index */
660 ulint flag, /*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
661 ulint* used, /*!< out: number of pages used (<= reserved) */
662 mtr_t* mtr) /*!< in/out: mini-transaction where index
663 is s-latched */
664 {
665 fseg_header_t* seg_header;
666 page_t* root;
667 ulint n=ULINT_UNDEFINED;
668 ulint dummy;
669
670 ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
671 MTR_MEMO_S_LOCK));
672
673 ut_a(flag == BTR_N_LEAF_PAGES || flag == BTR_TOTAL_SIZE);
674
675 if (index->page == FIL_NULL
676 || dict_index_is_online_ddl(index)
677 || !index->is_committed()) {
678 return(ULINT_UNDEFINED);
679 }
680
681 root = btr_root_get(index, mtr);
682 *used = 0;
683
684 if (root) {
685
686 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
687
688 n = fseg_n_reserved_pages(seg_header, used, mtr);
689
690 if (flag == BTR_TOTAL_SIZE) {
691 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
692
693 n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
694 *used += dummy;
695
696 }
697 }
698
699 return(n);
700 }
701
702 /**************************************************************//**
703 Frees a page used in an ibuf tree. Puts the page to the free list of the
704 ibuf tree. */
705 static
706 void
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)707 btr_page_free_for_ibuf(
708 /*===================*/
709 dict_index_t* index, /*!< in: index tree */
710 buf_block_t* block, /*!< in: block to be freed, x-latched */
711 mtr_t* mtr) /*!< in: mtr */
712 {
713 page_t* root;
714
715 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
716 root = btr_root_get(index, mtr);
717
718 flst_add_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
719 buf_block_get_frame(block)
720 + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
721
722 ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
723 mtr));
724 }
725
726 /** Free an index page.
727 @param[in,out] index index tree
728 @param[in,out] block block to be freed
729 @param[in,out] mtr mini-transaction
730 @param[in] blob whether this is freeing a BLOB page */
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr,bool blob)731 void btr_page_free(dict_index_t* index, buf_block_t* block, mtr_t* mtr,
732 bool blob)
733 {
734 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
735 #ifdef BTR_CUR_HASH_ADAPT
736 if (block->index && !block->index->freed()) {
737 ut_ad(!blob);
738 ut_ad(page_is_leaf(block->frame));
739 }
740 #endif
741 ut_ad(index->table->space_id == block->page.id.space());
742 /* The root page is freed by btr_free_root(). */
743 ut_ad(block->page.id.page_no() != index->page);
744 ut_ad(mtr->is_named_space(index->table->space));
745
746 /* The page gets invalid for optimistic searches: increment the frame
747 modify clock */
748
749 buf_block_modify_clock_inc(block);
750
751 if (dict_index_is_ibuf(index)) {
752 btr_page_free_for_ibuf(index, block, mtr);
753 return;
754 }
755
756 /* TODO: Discard any operations for block from mtr->log.
757 The page will be freed, so previous changes to it by this
758 mini-transaction should not matter. */
759 page_t* root = btr_root_get(index, mtr);
760 fseg_header_t* seg_header = &root[blob || page_is_leaf(block->frame)
761 ? PAGE_HEADER + PAGE_BTR_SEG_LEAF
762 : PAGE_HEADER + PAGE_BTR_SEG_TOP];
763 fseg_free_page(seg_header,
764 index->table->space, block->page.id.page_no(),
765 !block->page.flush_observer, mtr);
766
767 /* The page was marked free in the allocation bitmap, but it
768 should remain exclusively latched until mtr_t::commit() or until it
769 is explicitly freed from the mini-transaction. */
770 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
771
772 /* MDEV-15528 FIXME: Zero out the page after the redo log for
773 this mini-transaction has been durably written.
774 This must be done unconditionally if
775 srv_immediate_scrub_data_uncompressed is set. */
776 }
777
778 /**************************************************************//**
779 Sets the child node file address in a node pointer. */
780 UNIV_INLINE
781 void
btr_node_ptr_set_child_page_no(rec_t * rec,page_zip_des_t * page_zip,const rec_offs * offsets,ulint page_no,mtr_t * mtr)782 btr_node_ptr_set_child_page_no(
783 /*===========================*/
784 rec_t* rec, /*!< in: node pointer record */
785 page_zip_des_t* page_zip,/*!< in/out: compressed page whose uncompressed
786 part will be updated, or NULL */
787 const rec_offs* offsets,/*!< in: array returned by rec_get_offsets() */
788 ulint page_no,/*!< in: child node address */
789 mtr_t* mtr) /*!< in: mtr */
790 {
791 byte* field;
792 ulint len;
793
794 ut_ad(rec_offs_validate(rec, NULL, offsets));
795 ut_ad(!page_rec_is_leaf(rec));
796 ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
797
798 /* The child address is in the last field */
799 field = rec_get_nth_field(rec, offsets,
800 rec_offs_n_fields(offsets) - 1, &len);
801
802 ut_ad(len == REC_NODE_PTR_SIZE);
803
804 if (page_zip) {
805 page_zip_write_node_ptr(page_zip, rec,
806 rec_offs_data_size(offsets),
807 page_no, mtr);
808 } else {
809 mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
810 }
811 }
812
813 /************************************************************//**
814 Returns the child page of a node pointer and sx-latches it.
815 @return child page, sx-latched */
816 static
817 buf_block_t*
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const rec_offs * offsets,mtr_t * mtr)818 btr_node_ptr_get_child(
819 /*===================*/
820 const rec_t* node_ptr,/*!< in: node pointer */
821 dict_index_t* index, /*!< in: index */
822 const rec_offs* offsets,/*!< in: array returned by rec_get_offsets() */
823 mtr_t* mtr) /*!< in: mtr */
824 {
825 ut_ad(rec_offs_validate(node_ptr, index, offsets));
826 ut_ad(index->table->space_id
827 == page_get_space_id(page_align(node_ptr)));
828
829 return btr_block_get(
830 page_id_t(index->table->space_id,
831 btr_node_ptr_get_child_page_no(node_ptr, offsets)),
832 index->table->space->zip_size(),
833 RW_SX_LATCH, index, mtr);
834 }
835
836 /************************************************************//**
837 Returns the upper level node pointer to a page. It is assumed that mtr holds
838 an sx-latch on the tree.
839 @return rec_get_offsets() of the node pointer record */
840 static
841 rec_offs*
btr_page_get_father_node_ptr_func(rec_offs * offsets,mem_heap_t * heap,btr_cur_t * cursor,ulint latch_mode,const char * file,unsigned line,mtr_t * mtr)842 btr_page_get_father_node_ptr_func(
843 /*==============================*/
844 rec_offs* offsets,/*!< in: work area for the return value */
845 mem_heap_t* heap, /*!< in: memory heap to use */
846 btr_cur_t* cursor, /*!< in: cursor pointing to user record,
847 out: cursor on node pointer record,
848 its page x-latched */
849 ulint latch_mode,/*!< in: BTR_CONT_MODIFY_TREE
850 or BTR_CONT_SEARCH_TREE */
851 const char* file, /*!< in: file name */
852 unsigned line, /*!< in: line where called */
853 mtr_t* mtr) /*!< in: mtr */
854 {
855 dtuple_t* tuple;
856 rec_t* user_rec;
857 rec_t* node_ptr;
858 ulint level;
859 ulint page_no;
860 dict_index_t* index;
861
862 ut_ad(latch_mode == BTR_CONT_MODIFY_TREE
863 || latch_mode == BTR_CONT_SEARCH_TREE);
864
865 page_no = btr_cur_get_block(cursor)->page.id.page_no();
866 index = btr_cur_get_index(cursor);
867 ut_ad(!dict_index_is_spatial(index));
868
869 ut_ad(srv_read_only_mode
870 || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
871 MTR_MEMO_X_LOCK
872 | MTR_MEMO_SX_LOCK));
873
874 ut_ad(dict_index_get_page(index) != page_no);
875
876 level = btr_page_get_level(btr_cur_get_page(cursor));
877
878 user_rec = btr_cur_get_rec(cursor);
879 ut_a(page_rec_is_user_rec(user_rec));
880
881 tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
882 dberr_t err = DB_SUCCESS;
883
884 err = btr_cur_search_to_nth_level(
885 index, level + 1, tuple,
886 PAGE_CUR_LE, latch_mode, cursor, 0,
887 file, line, mtr);
888
889 if (err != DB_SUCCESS) {
890 ib::warn() << " Error code: " << err
891 << " btr_page_get_father_node_ptr_func "
892 << " level: " << level + 1
893 << " called from file: "
894 << file << " line: " << line
895 << " table: " << index->table->name
896 << " index: " << index->name();
897 }
898
899 node_ptr = btr_cur_get_rec(cursor);
900
901 offsets = rec_get_offsets(node_ptr, index, offsets, 0,
902 ULINT_UNDEFINED, &heap);
903
904 if (btr_node_ptr_get_child_page_no(node_ptr, offsets) != page_no) {
905 rec_t* print_rec;
906
907 ib::error()
908 << "Corruption of an index tree: table "
909 << index->table->name
910 << " index " << index->name
911 << ", father ptr page no "
912 << btr_node_ptr_get_child_page_no(node_ptr, offsets)
913 << ", child page no " << page_no;
914
915 print_rec = page_rec_get_next(
916 page_get_infimum_rec(page_align(user_rec)));
917 offsets = rec_get_offsets(print_rec, index, offsets,
918 page_rec_is_leaf(user_rec)
919 ? index->n_core_fields : 0,
920 ULINT_UNDEFINED, &heap);
921 page_rec_print(print_rec, offsets);
922 offsets = rec_get_offsets(node_ptr, index, offsets, 0,
923 ULINT_UNDEFINED, &heap);
924 page_rec_print(node_ptr, offsets);
925
926 ib::fatal()
927 << "You should dump + drop + reimport the table to"
928 << " fix the corruption. If the crash happens at"
929 << " database startup. " << FORCE_RECOVERY_MSG
930 << " Then dump + drop + reimport.";
931 }
932
933 return(offsets);
934 }
935
936 #define btr_page_get_father_node_ptr(of,heap,cur,mtr) \
937 btr_page_get_father_node_ptr_func( \
938 of,heap,cur,BTR_CONT_MODIFY_TREE,__FILE__,__LINE__,mtr)
939
940 #define btr_page_get_father_node_ptr_for_validate(of,heap,cur,mtr) \
941 btr_page_get_father_node_ptr_func( \
942 of,heap,cur,BTR_CONT_SEARCH_TREE,__FILE__,__LINE__,mtr)
943
944 /************************************************************//**
945 Returns the upper level node pointer to a page. It is assumed that mtr holds
946 an x-latch on the tree.
947 @return rec_get_offsets() of the node pointer record */
948 static
949 rec_offs*
btr_page_get_father_block(rec_offs * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)950 btr_page_get_father_block(
951 /*======================*/
952 rec_offs* offsets,/*!< in: work area for the return value */
953 mem_heap_t* heap, /*!< in: memory heap to use */
954 dict_index_t* index, /*!< in: b-tree index */
955 buf_block_t* block, /*!< in: child page in the index */
956 mtr_t* mtr, /*!< in: mtr */
957 btr_cur_t* cursor) /*!< out: cursor on node pointer record,
958 its page x-latched */
959 {
960 rec_t* rec
961 = page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
962 block)));
963 btr_cur_position(index, rec, block, cursor);
964 return(btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
965 }
966
967 /** Seek to the parent page of a B-tree page.
968 @param[in,out] index b-tree
969 @param[in] block child page
970 @param[in,out] mtr mini-transaction
971 @param[out] cursor cursor pointing to the x-latched parent page */
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)972 void btr_page_get_father(dict_index_t* index, buf_block_t* block, mtr_t* mtr,
973 btr_cur_t* cursor)
974 {
975 mem_heap_t* heap;
976 rec_t* rec
977 = page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(
978 block)));
979 btr_cur_position(index, rec, block, cursor);
980
981 heap = mem_heap_create(100);
982 btr_page_get_father_node_ptr(NULL, heap, cursor, mtr);
983 mem_heap_free(heap);
984 }
985
986 /** PAGE_INDEX_ID value for freed index B-trees */
987 static const index_id_t BTR_FREED_INDEX_ID = 0;
988
989 /** Free a B-tree root page. btr_free_but_not_root() must already
990 have been called.
991 In a persistent tablespace, the caller must invoke fsp_init_file_page()
992 before mtr.commit().
993 @param[in,out] block index root page
994 @param[in,out] mtr mini-transaction
995 @param[in] invalidate whether to invalidate PAGE_INDEX_ID */
btr_free_root(buf_block_t * block,mtr_t * mtr,bool invalidate)996 static void btr_free_root(buf_block_t* block, mtr_t* mtr, bool invalidate)
997 {
998 fseg_header_t* header;
999
1000 ut_ad(mtr_memo_contains_flagged(mtr, block, MTR_MEMO_PAGE_X_FIX
1001 | MTR_MEMO_PAGE_SX_FIX));
1002 ut_ad(mtr->is_named_space(block->page.id.space()));
1003
1004 btr_search_drop_page_hash_index(block);
1005
1006 header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
1007 #ifdef UNIV_BTR_DEBUG
1008 ut_a(btr_root_fseg_validate(header, block->page.id.space()));
1009 #endif /* UNIV_BTR_DEBUG */
1010 if (invalidate) {
1011 btr_page_set_index_id(
1012 buf_block_get_frame(block),
1013 buf_block_get_page_zip(block),
1014 BTR_FREED_INDEX_ID, mtr);
1015 }
1016
1017 while (!fseg_free_step(header, mtr)) {
1018 /* Free the entire segment in small steps. */
1019 }
1020 }
1021
1022 /** Prepare to free a B-tree.
1023 @param[in] page_id page id
1024 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
1025 @param[in] index_id PAGE_INDEX_ID contents
1026 @param[in,out] mtr mini-transaction
1027 @return root block, to invoke btr_free_but_not_root() and btr_free_root()
1028 @retval NULL if the page is no longer a matching B-tree page */
1029 static MY_ATTRIBUTE((warn_unused_result))
1030 buf_block_t*
btr_free_root_check(const page_id_t page_id,ulint zip_size,index_id_t index_id,mtr_t * mtr)1031 btr_free_root_check(
1032 const page_id_t page_id,
1033 ulint zip_size,
1034 index_id_t index_id,
1035 mtr_t* mtr)
1036 {
1037 ut_ad(page_id.space() != SRV_TMP_SPACE_ID);
1038 ut_ad(index_id != BTR_FREED_INDEX_ID);
1039
1040 buf_block_t* block = buf_page_get(
1041 page_id, zip_size, RW_X_LATCH, mtr);
1042
1043 if (block) {
1044 buf_block_dbg_add_level(block, SYNC_TREE_NODE);
1045
1046 if (fil_page_index_page_check(block->frame)
1047 && index_id == btr_page_get_index_id(block->frame)) {
1048 /* This should be a root page.
1049 It should not be possible to reassign the same
1050 index_id for some other index in the tablespace. */
1051 ut_ad(!page_has_siblings(block->frame));
1052 } else {
1053 block = NULL;
1054 }
1055 }
1056
1057 return(block);
1058 }
1059
1060 /** Create the root node for a new index tree.
1061 @param[in] type type of the index
1062 @param[in] index_id index id
1063 @param[in,out] space tablespace where created
1064 @param[in] index index
1065 @param[in,out] mtr mini-transaction
1066 @return page number of the created root
1067 @retval FIL_NULL if did not succeed */
1068 ulint
btr_create(ulint type,fil_space_t * space,index_id_t index_id,dict_index_t * index,mtr_t * mtr)1069 btr_create(
1070 ulint type,
1071 fil_space_t* space,
1072 index_id_t index_id,
1073 dict_index_t* index,
1074 mtr_t* mtr)
1075 {
1076 buf_block_t* block;
1077 page_t* page;
1078 page_zip_des_t* page_zip;
1079
1080 ut_ad(mtr->is_named_space(space));
1081 ut_ad(index_id != BTR_FREED_INDEX_ID);
1082
1083 /* Create the two new segments (one, in the case of an ibuf tree) for
1084 the index tree; the segment headers are put on the allocated root page
1085 (for an ibuf tree, not in the root, but on a separate ibuf header
1086 page) */
1087
1088 if (UNIV_UNLIKELY(type & DICT_IBUF)) {
1089 /* Allocate first the ibuf header page */
1090 buf_block_t* ibuf_hdr_block = fseg_create(
1091 space, IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
1092
1093 if (ibuf_hdr_block == NULL) {
1094 return(FIL_NULL);
1095 }
1096
1097 buf_block_dbg_add_level(
1098 ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
1099
1100 ut_ad(ibuf_hdr_block->page.id.page_no()
1101 == IBUF_HEADER_PAGE_NO);
1102 /* Allocate then the next page to the segment: it will be the
1103 tree root page */
1104
1105 block = fseg_alloc_free_page(
1106 buf_block_get_frame(ibuf_hdr_block)
1107 + IBUF_HEADER + IBUF_TREE_SEG_HEADER,
1108 IBUF_TREE_ROOT_PAGE_NO,
1109 FSP_UP, mtr);
1110
1111 if (block == NULL) {
1112 return(FIL_NULL);
1113 }
1114
1115 ut_ad(block->page.id.page_no() == IBUF_TREE_ROOT_PAGE_NO);
1116
1117 buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
1118
1119 flst_init(block, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
1120 } else {
1121 block = fseg_create(space,
1122 PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
1123
1124 if (block == NULL) {
1125 return(FIL_NULL);
1126 }
1127
1128 buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1129
1130 if (!fseg_create(space,
1131 PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr,
1132 false, block)) {
1133 /* Not enough space for new segment, free root
1134 segment before return. */
1135 btr_free_root(block, mtr,
1136 !index->table->is_temporary());
1137 return(FIL_NULL);
1138 }
1139
1140 /* The fseg create acquires a second latch on the page,
1141 therefore we must declare it: */
1142 buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
1143 }
1144
1145 /* Create a new index page on the allocated segment page */
1146 page_zip = buf_block_get_page_zip(block);
1147
1148 if (page_zip) {
1149 page = page_create_zip(block, index, 0, 0, mtr);
1150 } else {
1151 page = page_create(block, mtr,
1152 dict_table_is_comp(index->table),
1153 dict_index_is_spatial(index));
1154 /* Set the level of the new index page */
1155 btr_page_set_level(page, NULL, 0, mtr);
1156 }
1157
1158 /* Set the index id of the page */
1159 btr_page_set_index_id(page, page_zip, index_id, mtr);
1160
1161 /* Set the next node and previous node fields */
1162 compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4);
1163 compile_time_assert(FIL_NULL == 0xffffffff);
1164 #if MYSQL_VERSION_ID < 100500
1165 if (UNIV_LIKELY_NULL(page_zip)) {
1166 /* Avoid tripping the ut_a() in mlog_parse_nbytes()
1167 when crash-downgrading to an earlier MariaDB 10.4 version. */
1168 btr_page_set_next(page, page_zip, FIL_NULL, mtr);
1169 btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
1170 } else {
1171 mlog_memset(block, FIL_PAGE_PREV, 8, 0xff, mtr);
1172 }
1173 #else
1174 mlog_memset(block, FIL_PAGE_PREV, 8, 0xff, mtr);
1175 if (UNIV_LIKELY_NULL(page_zip)) {
1176 memset(page_zip->data + FIL_PAGE_PREV, 0xff, 8);
1177 }
1178 #endif
1179
1180 /* We reset the free bits for the page in a separate
1181 mini-transaction to allow creation of several trees in the
1182 same mtr, otherwise the latch on a bitmap page would prevent
1183 it because of the latching order.
1184
1185 Note: Insert Buffering is disabled for temporary tables given that
1186 most temporary tables are smaller in size and short-lived. */
1187 if (!(type & DICT_CLUSTERED) && !index->table->is_temporary()) {
1188 ibuf_reset_free_bits(block);
1189 }
1190
1191 /* In the following assertion we test that two records of maximum
1192 allowed size fit on the root page: this fact is needed to ensure
1193 correctness of split algorithms */
1194
1195 ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);
1196
1197 return(block->page.id.page_no());
1198 }
1199
1200 /** Free a B-tree except the root page. The root page MUST be freed after
1201 this by calling btr_free_root.
1202 @param[in,out] block root page
1203 @param[in] log_mode mtr logging mode */
1204 static
1205 void
btr_free_but_not_root(buf_block_t * block,mtr_log_t log_mode)1206 btr_free_but_not_root(
1207 buf_block_t* block,
1208 mtr_log_t log_mode)
1209 {
1210 ibool finished;
1211 mtr_t mtr;
1212
1213 ut_ad(fil_page_index_page_check(block->frame));
1214 ut_ad(!page_has_siblings(block->frame));
1215 leaf_loop:
1216 mtr_start(&mtr);
1217 mtr_set_log_mode(&mtr, log_mode);
1218 mtr.set_named_space_id(block->page.id.space());
1219
1220 page_t* root = block->frame;
1221
1222 if (!root) {
1223 mtr_commit(&mtr);
1224 return;
1225 }
1226
1227 #ifdef UNIV_BTR_DEBUG
1228 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1229 + root, block->page.id.space()));
1230 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1231 + root, block->page.id.space()));
1232 #endif /* UNIV_BTR_DEBUG */
1233
1234 /* NOTE: page hash indexes are dropped when a page is freed inside
1235 fsp0fsp. */
1236
1237 finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF,
1238 &mtr);
1239 mtr_commit(&mtr);
1240
1241 if (!finished) {
1242
1243 goto leaf_loop;
1244 }
1245 top_loop:
1246 mtr_start(&mtr);
1247 mtr_set_log_mode(&mtr, log_mode);
1248 mtr.set_named_space_id(block->page.id.space());
1249
1250 root = block->frame;
1251
1252 #ifdef UNIV_BTR_DEBUG
1253 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1254 + root, block->page.id.space()));
1255 #endif /* UNIV_BTR_DEBUG */
1256
1257 finished = fseg_free_step_not_header(
1258 root + PAGE_HEADER + PAGE_BTR_SEG_TOP, &mtr);
1259 mtr_commit(&mtr);
1260
1261 if (!finished) {
1262 goto top_loop;
1263 }
1264 }
1265
1266 /** Free a persistent index tree if it exists.
1267 @param[in] page_id root page id
1268 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
1269 @param[in] index_id PAGE_INDEX_ID contents
1270 @param[in,out] mtr mini-transaction */
1271 void
btr_free_if_exists(const page_id_t page_id,ulint zip_size,index_id_t index_id,mtr_t * mtr)1272 btr_free_if_exists(
1273 const page_id_t page_id,
1274 ulint zip_size,
1275 index_id_t index_id,
1276 mtr_t* mtr)
1277 {
1278 buf_block_t* root = btr_free_root_check(
1279 page_id, zip_size, index_id, mtr);
1280
1281 if (root == NULL) {
1282 return;
1283 }
1284
1285 btr_free_but_not_root(root, mtr->get_log_mode());
1286 mtr->set_named_space_id(page_id.space());
1287 btr_free_root(root, mtr, true);
1288 }
1289
1290 /** Free an index tree in a temporary tablespace.
1291 @param[in] page_id root page id */
btr_free(const page_id_t page_id)1292 void btr_free(const page_id_t page_id)
1293 {
1294 mtr_t mtr;
1295 mtr.start();
1296 mtr.set_log_mode(MTR_LOG_NO_REDO);
1297
1298 buf_block_t* block = buf_page_get(page_id, 0, RW_X_LATCH, &mtr);
1299
1300 if (block) {
1301 btr_free_but_not_root(block, MTR_LOG_NO_REDO);
1302 btr_free_root(block, &mtr, false);
1303 }
1304 mtr.commit();
1305 }
1306
1307 /** Read the last used AUTO_INCREMENT value from PAGE_ROOT_AUTO_INC.
1308 @param[in,out] index clustered index
1309 @return the last used AUTO_INCREMENT value
1310 @retval 0 on error or if no AUTO_INCREMENT value was used yet */
1311 ib_uint64_t
btr_read_autoinc(dict_index_t * index)1312 btr_read_autoinc(dict_index_t* index)
1313 {
1314 ut_ad(index->is_primary());
1315 ut_ad(index->table->persistent_autoinc);
1316 ut_ad(!index->table->is_temporary());
1317 mtr_t mtr;
1318 mtr.start();
1319 ib_uint64_t autoinc;
1320 if (buf_block_t* block = buf_page_get(
1321 page_id_t(index->table->space_id, index->page),
1322 index->table->space->zip_size(),
1323 RW_S_LATCH, &mtr)) {
1324 autoinc = page_get_autoinc(block->frame);
1325 } else {
1326 autoinc = 0;
1327 }
1328 mtr.commit();
1329 return autoinc;
1330 }
1331
1332 /** Read the last used AUTO_INCREMENT value from PAGE_ROOT_AUTO_INC,
1333 or fall back to MAX(auto_increment_column).
1334 @param[in] table table containing an AUTO_INCREMENT column
1335 @param[in] col_no index of the AUTO_INCREMENT column
1336 @return the AUTO_INCREMENT value
1337 @retval 0 on error or if no AUTO_INCREMENT value was used yet */
1338 ib_uint64_t
btr_read_autoinc_with_fallback(const dict_table_t * table,unsigned col_no)1339 btr_read_autoinc_with_fallback(const dict_table_t* table, unsigned col_no)
1340 {
1341 ut_ad(table->persistent_autoinc);
1342 ut_ad(!table->is_temporary());
1343
1344 dict_index_t* index = dict_table_get_first_index(table);
1345
1346 if (index == NULL) {
1347 return 0;
1348 }
1349
1350 mtr_t mtr;
1351 mtr.start();
1352 buf_block_t* block = buf_page_get(
1353 page_id_t(index->table->space_id, index->page),
1354 index->table->space->zip_size(),
1355 RW_S_LATCH, &mtr);
1356
1357 ib_uint64_t autoinc = block ? page_get_autoinc(block->frame) : 0;
1358 const bool retry = block && autoinc == 0
1359 && !page_is_empty(block->frame);
1360 mtr.commit();
1361
1362 if (retry) {
1363 /* This should be an old data file where
1364 PAGE_ROOT_AUTO_INC was initialized to 0.
1365 Fall back to reading MAX(autoinc_col).
1366 There should be an index on it. */
1367 const dict_col_t* autoinc_col
1368 = dict_table_get_nth_col(table, col_no);
1369 while (index && index->fields[0].col != autoinc_col) {
1370 index = dict_table_get_next_index(index);
1371 }
1372
1373 if (index) {
1374 autoinc = row_search_max_autoinc(index);
1375 }
1376 }
1377
1378 return autoinc;
1379 }
1380
1381 /** Write the next available AUTO_INCREMENT value to PAGE_ROOT_AUTO_INC.
1382 @param[in,out] index clustered index
1383 @param[in] autoinc the AUTO_INCREMENT value
1384 @param[in] reset whether to reset the AUTO_INCREMENT
1385 to a possibly smaller value than currently
1386 exists in the page */
1387 void
btr_write_autoinc(dict_index_t * index,ib_uint64_t autoinc,bool reset)1388 btr_write_autoinc(dict_index_t* index, ib_uint64_t autoinc, bool reset)
1389 {
1390 ut_ad(index->is_primary());
1391 ut_ad(index->table->persistent_autoinc);
1392 ut_ad(!index->table->is_temporary());
1393
1394 mtr_t mtr;
1395 mtr.start();
1396 fil_space_t* space = index->table->space;
1397 mtr.set_named_space(space);
1398 page_set_autoinc(buf_page_get(page_id_t(space->id, index->page),
1399 space->zip_size(),
1400 RW_SX_LATCH, &mtr),
1401 index, autoinc, &mtr, reset);
1402 mtr.commit();
1403 }
1404
1405 /*************************************************************//**
1406 Reorganizes an index page.
1407
1408 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1409 if this is a compressed leaf page in a secondary index. This has to
1410 be done either within the same mini-transaction, or by invoking
1411 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1412 IBUF_BITMAP_FREE is unaffected by reorganization.
1413
1414 @retval true if the operation was successful
1415 @retval false if it is a compressed page, and recompression failed */
1416 bool
btr_page_reorganize_low(bool recovery,ulint z_level,page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1417 btr_page_reorganize_low(
1418 /*====================*/
1419 bool recovery,/*!< in: true if called in recovery:
1420 locks should not be updated, i.e.,
1421 there cannot exist locks on the
1422 page, and a hash index should not be
1423 dropped: it cannot exist */
1424 ulint z_level,/*!< in: compression level to be used
1425 if dealing with compressed page */
1426 page_cur_t* cursor, /*!< in/out: page cursor */
1427 dict_index_t* index, /*!< in: the index tree of the page */
1428 mtr_t* mtr) /*!< in/out: mini-transaction */
1429 {
1430 buf_block_t* block = page_cur_get_block(cursor);
1431 buf_pool_t* buf_pool = buf_pool_from_bpage(&block->page);
1432 page_t* page = buf_block_get_frame(block);
1433 page_zip_des_t* page_zip = buf_block_get_page_zip(block);
1434 buf_block_t* temp_block;
1435 page_t* temp_page;
1436 ulint data_size1;
1437 ulint data_size2;
1438 ulint max_ins_size1;
1439 ulint max_ins_size2;
1440 bool success = false;
1441 ulint pos;
1442 bool log_compressed;
1443 bool is_spatial;
1444
1445 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1446 btr_assert_not_corrupted(block, index);
1447 ut_ad(fil_page_index_page_check(block->frame));
1448 ut_ad(index->is_dummy
1449 || block->page.id.space() == index->table->space->id);
1450 ut_ad(index->is_dummy
1451 || block->page.id.page_no() != index->page
1452 || !page_has_siblings(page));
1453 #ifdef UNIV_ZIP_DEBUG
1454 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1455 #endif /* UNIV_ZIP_DEBUG */
1456 data_size1 = page_get_data_size(page);
1457 max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);
1458 /* Turn logging off */
1459 mtr_log_t log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1460
1461 temp_block = buf_block_alloc(buf_pool);
1462 temp_page = temp_block->frame;
1463
1464 MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
1465
1466 /* This function can be called by log redo with a "dummy" index.
1467 So we would trust more on the original page's type */
1468 is_spatial = (fil_page_get_type(page) == FIL_PAGE_RTREE
1469 || dict_index_is_spatial(index));
1470
1471 /* Copy the old page to temporary space */
1472 buf_frame_copy(temp_page, page);
1473
1474 if (!recovery) {
1475 btr_search_drop_page_hash_index(block);
1476 }
1477
1478 /* Save the cursor position. */
1479 pos = page_rec_get_n_recs_before(page_cur_get_rec(cursor));
1480
1481 /* Recreate the page: note that global data on page (possible
1482 segment headers, next page-field, etc.) is preserved intact */
1483
1484 page_create(block, mtr, dict_table_is_comp(index->table), is_spatial);
1485
1486 /* Copy the records from the temporary space to the recreated page;
1487 do not copy the lock bits yet */
1488
1489 page_copy_rec_list_end_no_locks(block, temp_block,
1490 page_get_infimum_rec(temp_page),
1491 index, mtr);
1492
1493 /* Copy the PAGE_MAX_TRX_ID or PAGE_ROOT_AUTO_INC. */
1494 memcpy(page + (PAGE_HEADER + PAGE_MAX_TRX_ID),
1495 temp_page + (PAGE_HEADER + PAGE_MAX_TRX_ID), 8);
1496 /* PAGE_MAX_TRX_ID is unused in clustered index pages
1497 (other than the root where it is repurposed as PAGE_ROOT_AUTO_INC),
1498 non-leaf pages, and in temporary tables. It was always
1499 zero-initialized in page_create() in all InnoDB versions.
1500 PAGE_MAX_TRX_ID must be nonzero on dict_index_is_sec_or_ibuf()
1501 leaf pages.
1502
1503 During redo log apply, dict_index_is_sec_or_ibuf() always
1504 holds, even for clustered indexes. */
1505 ut_ad(recovery || index->table->is_temporary()
1506 || !page_is_leaf(temp_page)
1507 || !dict_index_is_sec_or_ibuf(index)
1508 || page_get_max_trx_id(page) != 0);
1509 /* PAGE_MAX_TRX_ID must be zero on non-leaf pages other than
1510 clustered index root pages. */
1511 ut_ad(recovery
1512 || page_get_max_trx_id(page) == 0
1513 || (dict_index_is_sec_or_ibuf(index)
1514 ? page_is_leaf(temp_page)
1515 : block->page.id.page_no() == index->page));
1516
1517 /* If innodb_log_compressed_pages is ON, page reorganize should log the
1518 compressed page image.*/
1519 log_compressed = page_zip && page_zip_log_pages;
1520
1521 if (log_compressed) {
1522 mtr_set_log_mode(mtr, log_mode);
1523 }
1524
1525 if (page_zip
1526 && !page_zip_compress(page_zip, page, index, z_level, mtr)) {
1527
1528 /* Restore the old page and exit. */
1529 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1530 /* Check that the bytes that we skip are identical. */
1531 ut_a(!memcmp(page, temp_page, PAGE_HEADER));
1532 ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
1533 PAGE_HEADER + PAGE_N_RECS + temp_page,
1534 PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
1535 ut_a(!memcmp(srv_page_size - FIL_PAGE_DATA_END + page,
1536 srv_page_size - FIL_PAGE_DATA_END + temp_page,
1537 FIL_PAGE_DATA_END));
1538 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1539
1540 memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
1541 PAGE_N_RECS - PAGE_N_DIR_SLOTS);
1542 memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
1543 srv_page_size - PAGE_DATA - FIL_PAGE_DATA_END);
1544
1545 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1546 ut_a(!memcmp(page, temp_page, srv_page_size));
1547 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1548
1549 goto func_exit;
1550 }
1551
1552 data_size2 = page_get_data_size(page);
1553 max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);
1554
1555 if (data_size1 != data_size2 || max_ins_size1 != max_ins_size2) {
1556 ib::error()
1557 << "Page old data size " << data_size1
1558 << " new data size " << data_size2
1559 << ", page old max ins size " << max_ins_size1
1560 << " new max ins size " << max_ins_size2;
1561
1562 ib::error() << BUG_REPORT_MSG;
1563 ut_ad(0);
1564 } else {
1565 success = true;
1566 }
1567
1568 /* Restore the cursor position. */
1569 if (pos > 0) {
1570 cursor->rec = page_rec_get_nth(page, pos);
1571 } else {
1572 ut_ad(cursor->rec == page_get_infimum_rec(page));
1573 }
1574
1575 #ifdef UNIV_ZIP_DEBUG
1576 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1577 #endif /* UNIV_ZIP_DEBUG */
1578
1579 if (!recovery) {
1580 if (block->page.id.page_no() == index->page
1581 && fil_page_get_type(temp_page) == FIL_PAGE_TYPE_INSTANT) {
1582 /* Preserve the PAGE_INSTANT information. */
1583 ut_ad(!page_zip);
1584 ut_ad(index->is_instant());
1585 memcpy(FIL_PAGE_TYPE + page,
1586 FIL_PAGE_TYPE + temp_page, 2);
1587 memcpy(PAGE_HEADER + PAGE_INSTANT + page,
1588 PAGE_HEADER + PAGE_INSTANT + temp_page, 2);
1589 if (!index->table->instant) {
1590 } else if (page_is_comp(page)) {
1591 memcpy(PAGE_NEW_INFIMUM + page,
1592 PAGE_NEW_INFIMUM + temp_page, 8);
1593 memcpy(PAGE_NEW_SUPREMUM + page,
1594 PAGE_NEW_SUPREMUM + temp_page, 8);
1595 } else {
1596 memcpy(PAGE_OLD_INFIMUM + page,
1597 PAGE_OLD_INFIMUM + temp_page, 8);
1598 memcpy(PAGE_OLD_SUPREMUM + page,
1599 PAGE_OLD_SUPREMUM + temp_page, 8);
1600 }
1601 }
1602
1603 if (!dict_table_is_locking_disabled(index->table)) {
1604 /* Update the record lock bitmaps */
1605 lock_move_reorganize_page(block, temp_block);
1606 }
1607 }
1608
1609 func_exit:
1610 buf_block_free(temp_block);
1611
1612 /* Restore logging mode */
1613 mtr_set_log_mode(mtr, log_mode);
1614
1615 if (success) {
1616 mlog_id_t type;
1617 byte* log_ptr;
1618
1619 /* Write the log record */
1620 if (page_zip) {
1621 ut_ad(page_is_comp(page));
1622 type = MLOG_ZIP_PAGE_REORGANIZE;
1623 } else if (page_is_comp(page)) {
1624 type = MLOG_COMP_PAGE_REORGANIZE;
1625 } else {
1626 type = MLOG_PAGE_REORGANIZE;
1627 }
1628
1629 log_ptr = log_compressed
1630 ? NULL
1631 : mlog_open_and_write_index(
1632 mtr, page, index, type,
1633 page_zip ? 1 : 0);
1634
1635 /* For compressed pages write the compression level. */
1636 if (log_ptr && page_zip) {
1637 mach_write_to_1(log_ptr, z_level);
1638 mlog_close(mtr, log_ptr + 1);
1639 }
1640
1641 MONITOR_INC(MONITOR_INDEX_REORG_SUCCESSFUL);
1642 }
1643
1644 if (UNIV_UNLIKELY(fil_page_get_type(page) == FIL_PAGE_TYPE_INSTANT)) {
1645 /* Log the PAGE_INSTANT information. */
1646 ut_ad(!page_zip);
1647 ut_ad(index->is_instant());
1648 ut_ad(!recovery);
1649 mlog_write_ulint(FIL_PAGE_TYPE + page, FIL_PAGE_TYPE_INSTANT,
1650 MLOG_2BYTES, mtr);
1651 mlog_write_ulint(PAGE_HEADER + PAGE_INSTANT + page,
1652 mach_read_from_2(PAGE_HEADER + PAGE_INSTANT
1653 + page),
1654 MLOG_2BYTES, mtr);
1655 if (!index->table->instant) {
1656 } else if (page_is_comp(page)) {
1657 mlog_log_string(PAGE_NEW_INFIMUM + page, 8, mtr);
1658 mlog_log_string(PAGE_NEW_SUPREMUM + page, 8, mtr);
1659 } else {
1660 mlog_log_string(PAGE_OLD_INFIMUM + page, 8, mtr);
1661 mlog_log_string(PAGE_OLD_SUPREMUM + page, 8, mtr);
1662 }
1663 }
1664
1665 return(success);
1666 }
1667
1668 /*************************************************************//**
1669 Reorganizes an index page.
1670
1671 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1672 if this is a compressed leaf page in a secondary index. This has to
1673 be done either within the same mini-transaction, or by invoking
1674 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1675 IBUF_BITMAP_FREE is unaffected by reorganization.
1676
1677 @retval true if the operation was successful
1678 @retval false if it is a compressed page, and recompression failed */
1679 bool
btr_page_reorganize_block(bool recovery,ulint z_level,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1680 btr_page_reorganize_block(
1681 /*======================*/
1682 bool recovery,/*!< in: true if called in recovery:
1683 locks should not be updated, i.e.,
1684 there cannot exist locks on the
1685 page, and a hash index should not be
1686 dropped: it cannot exist */
1687 ulint z_level,/*!< in: compression level to be used
1688 if dealing with compressed page */
1689 buf_block_t* block, /*!< in/out: B-tree page */
1690 dict_index_t* index, /*!< in: the index tree of the page */
1691 mtr_t* mtr) /*!< in/out: mini-transaction */
1692 {
1693 page_cur_t cur;
1694 page_cur_set_before_first(block, &cur);
1695
1696 return(btr_page_reorganize_low(recovery, z_level, &cur, index, mtr));
1697 }
1698
1699 /*************************************************************//**
1700 Reorganizes an index page.
1701
1702 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1703 if this is a compressed leaf page in a secondary index. This has to
1704 be done either within the same mini-transaction, or by invoking
1705 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1706 IBUF_BITMAP_FREE is unaffected by reorganization.
1707
1708 @retval true if the operation was successful
1709 @retval false if it is a compressed page, and recompression failed */
1710 bool
btr_page_reorganize(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1711 btr_page_reorganize(
1712 /*================*/
1713 page_cur_t* cursor, /*!< in/out: page cursor */
1714 dict_index_t* index, /*!< in: the index tree of the page */
1715 mtr_t* mtr) /*!< in/out: mini-transaction */
1716 {
1717 return(btr_page_reorganize_low(false, page_zip_level,
1718 cursor, index, mtr));
1719 }
1720
1721 /***********************************************************//**
1722 Parses a redo log record of reorganizing a page.
1723 @return end of log record or NULL */
1724 byte*
btr_parse_page_reorganize(byte * ptr,byte * end_ptr,dict_index_t * index,bool compressed,buf_block_t * block,mtr_t * mtr)1725 btr_parse_page_reorganize(
1726 /*======================*/
1727 byte* ptr, /*!< in: buffer */
1728 byte* end_ptr,/*!< in: buffer end */
1729 dict_index_t* index, /*!< in: record descriptor */
1730 bool compressed,/*!< in: true if compressed page */
1731 buf_block_t* block, /*!< in: page to be reorganized, or NULL */
1732 mtr_t* mtr) /*!< in: mtr or NULL */
1733 {
1734 ulint level = page_zip_level;
1735
1736 ut_ad(ptr != NULL);
1737 ut_ad(end_ptr != NULL);
1738 ut_ad(index != NULL);
1739
1740 /* If dealing with a compressed page the record has the
1741 compression level used during original compression written in
1742 one byte. Otherwise record is empty. */
1743 if (compressed) {
1744 if (ptr == end_ptr) {
1745 return(NULL);
1746 }
1747
1748 level = mach_read_from_1(ptr);
1749
1750 ut_a(level <= 9);
1751 ++ptr;
1752 } else {
1753 level = page_zip_level;
1754 }
1755
1756 if (block != NULL) {
1757 btr_page_reorganize_block(true, level, block, index, mtr);
1758 }
1759
1760 return(ptr);
1761 }
1762
1763 /** Empty an index page (possibly the root page). @see btr_page_create().
1764 @param[in,out] block page to be emptied
1765 @param[in,out] page_zip compressed page frame, or NULL
1766 @param[in] index index of the page
1767 @param[in] level B-tree level of the page (0=leaf)
1768 @param[in,out] mtr mini-transaction */
1769 void
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1770 btr_page_empty(
1771 buf_block_t* block,
1772 page_zip_des_t* page_zip,
1773 dict_index_t* index,
1774 ulint level,
1775 mtr_t* mtr)
1776 {
1777 page_t* page = buf_block_get_frame(block);
1778
1779 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1780 ut_ad(page_zip == buf_block_get_page_zip(block));
1781 ut_ad(!index->is_dummy);
1782 ut_ad(index->table->space->id == block->page.id.space());
1783 #ifdef UNIV_ZIP_DEBUG
1784 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1785 #endif /* UNIV_ZIP_DEBUG */
1786
1787 btr_search_drop_page_hash_index(block);
1788
1789 /* Recreate the page: note that global data on page (possible
1790 segment headers, next page-field, etc.) is preserved intact */
1791
1792 /* Preserve PAGE_ROOT_AUTO_INC when creating a clustered index
1793 root page. */
1794 const ib_uint64_t autoinc
1795 = dict_index_is_clust(index)
1796 && index->page == block->page.id.page_no()
1797 ? page_get_autoinc(page)
1798 : 0;
1799
1800 if (page_zip) {
1801 page_create_zip(block, index, level, autoinc, mtr);
1802 } else {
1803 page_create(block, mtr, dict_table_is_comp(index->table),
1804 dict_index_is_spatial(index));
1805 btr_page_set_level(page, NULL, level, mtr);
1806 if (autoinc) {
1807 mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID + page,
1808 autoinc, mtr);
1809 }
1810 }
1811 }
1812
1813 /** Write instant ALTER TABLE metadata to a root page.
1814 @param[in,out] root clustered index root page
1815 @param[in] index clustered index with instant ALTER TABLE
1816 @param[in,out] mtr mini-transaction */
btr_set_instant(buf_block_t * root,const dict_index_t & index,mtr_t * mtr)1817 void btr_set_instant(buf_block_t* root, const dict_index_t& index, mtr_t* mtr)
1818 {
1819 ut_ad(index.n_core_fields > 0);
1820 ut_ad(index.n_core_fields < REC_MAX_N_FIELDS);
1821 ut_ad(index.is_instant());
1822 ut_ad(fil_page_get_type(root->frame) == FIL_PAGE_TYPE_INSTANT
1823 || fil_page_get_type(root->frame) == FIL_PAGE_INDEX);
1824 ut_ad(!page_has_siblings(root->frame));
1825 ut_ad(root->page.id.page_no() == index.page);
1826
1827 rec_t* infimum = page_get_infimum_rec(root->frame);
1828 rec_t* supremum = page_get_supremum_rec(root->frame);
1829 byte* page_type = root->frame + FIL_PAGE_TYPE;
1830 uint16_t i = page_header_get_field(root->frame, PAGE_INSTANT);
1831
1832 switch (mach_read_from_2(page_type)) {
1833 case FIL_PAGE_TYPE_INSTANT:
1834 ut_ad(page_get_instant(root->frame) == index.n_core_fields);
1835 if (memcmp(infimum, "infimum", 8)
1836 || memcmp(supremum, "supremum", 8)) {
1837 ut_ad(index.table->instant);
1838 ut_ad(!memcmp(infimum, field_ref_zero, 8));
1839 ut_ad(!memcmp(supremum, field_ref_zero, 7));
1840 /* The n_core_null_bytes only matters for
1841 ROW_FORMAT=COMPACT and ROW_FORMAT=DYNAMIC tables. */
1842 ut_ad(supremum[7] == index.n_core_null_bytes
1843 || !index.table->not_redundant());
1844 return;
1845 }
1846 break;
1847 default:
1848 ut_ad(!"wrong page type");
1849 /* fall through */
1850 case FIL_PAGE_INDEX:
1851 ut_ad(!page_is_comp(root->frame)
1852 || !page_get_instant(root->frame));
1853 ut_ad(!memcmp(infimum, "infimum", 8));
1854 ut_ad(!memcmp(supremum, "supremum", 8));
1855 mlog_write_ulint(page_type, FIL_PAGE_TYPE_INSTANT,
1856 MLOG_2BYTES, mtr);
1857 ut_ad(i <= PAGE_NO_DIRECTION);
1858 i |= index.n_core_fields << 3;
1859 mlog_write_ulint(PAGE_HEADER + PAGE_INSTANT + root->frame, i,
1860 MLOG_2BYTES, mtr);
1861 break;
1862 }
1863
1864 if (index.table->instant) {
1865 mlog_memset(root, infimum - root->frame, 8, 0, mtr);
1866 mlog_memset(root, supremum - root->frame, 7, 0, mtr);
1867 mlog_write_ulint(&supremum[7], index.n_core_null_bytes,
1868 MLOG_1BYTE, mtr);
1869 }
1870 }
1871
1872 /*************************************************************//**
1873 Makes tree one level higher by splitting the root, and inserts
1874 the tuple. It is assumed that mtr contains an x-latch on the tree.
1875 NOTE that the operation of this function must always succeed,
1876 we cannot reverse it: therefore enough free disk space must be
1877 guaranteed to be available before this function is called.
1878 @return inserted record */
1879 rec_t*
btr_root_raise_and_insert(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1880 btr_root_raise_and_insert(
1881 /*======================*/
1882 ulint flags, /*!< in: undo logging and locking flags */
1883 btr_cur_t* cursor, /*!< in: cursor at which to insert: must be
1884 on the root page; when the function returns,
1885 the cursor is positioned on the predecessor
1886 of the inserted record */
1887 rec_offs** offsets,/*!< out: offsets on inserted record */
1888 mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */
1889 const dtuple_t* tuple, /*!< in: tuple to insert */
1890 ulint n_ext, /*!< in: number of externally stored columns */
1891 mtr_t* mtr) /*!< in: mtr */
1892 {
1893 dict_index_t* index;
1894 page_t* root;
1895 page_t* new_page;
1896 ulint new_page_no;
1897 rec_t* rec;
1898 dtuple_t* node_ptr;
1899 ulint level;
1900 rec_t* node_ptr_rec;
1901 page_cur_t* page_cursor;
1902 page_zip_des_t* root_page_zip;
1903 page_zip_des_t* new_page_zip;
1904 buf_block_t* root_block;
1905 buf_block_t* new_block;
1906
1907 root = btr_cur_get_page(cursor);
1908 root_block = btr_cur_get_block(cursor);
1909 root_page_zip = buf_block_get_page_zip(root_block);
1910 ut_ad(!page_is_empty(root));
1911 index = btr_cur_get_index(cursor);
1912 ut_ad(index->n_core_null_bytes <= UT_BITS_IN_BYTES(index->n_nullable));
1913 #ifdef UNIV_ZIP_DEBUG
1914 ut_a(!root_page_zip || page_zip_validate(root_page_zip, root, index));
1915 #endif /* UNIV_ZIP_DEBUG */
1916 #ifdef UNIV_BTR_DEBUG
1917 if (!dict_index_is_ibuf(index)) {
1918 ulint space = index->table->space_id;
1919
1920 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
1921 + root, space));
1922 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
1923 + root, space));
1924 }
1925
1926 ut_a(dict_index_get_page(index) == page_get_page_no(root));
1927 #endif /* UNIV_BTR_DEBUG */
1928 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
1929 MTR_MEMO_X_LOCK
1930 | MTR_MEMO_SX_LOCK));
1931 ut_ad(mtr_memo_contains(mtr, root_block, MTR_MEMO_PAGE_X_FIX));
1932
1933 /* Allocate a new page to the tree. Root splitting is done by first
1934 moving the root records to the new page, emptying the root, putting
1935 a node pointer to the new page, and then splitting the new page. */
1936
1937 level = btr_page_get_level(root);
1938
1939 new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1940
1941 if (new_block == NULL && os_has_said_disk_full) {
1942 return(NULL);
1943 }
1944
1945 new_page = buf_block_get_frame(new_block);
1946 new_page_zip = buf_block_get_page_zip(new_block);
1947 ut_a(!new_page_zip == !root_page_zip);
1948 ut_a(!new_page_zip
1949 || page_zip_get_size(new_page_zip)
1950 == page_zip_get_size(root_page_zip));
1951
1952 btr_page_create(new_block, new_page_zip, index, level, mtr);
1953
1954 /* Set the next node and previous node fields of new page */
1955 compile_time_assert(FIL_PAGE_NEXT == FIL_PAGE_PREV + 4);
1956 compile_time_assert(FIL_NULL == 0xffffffff);
1957 #if MYSQL_VERSION_ID < 100500
1958 if (UNIV_LIKELY_NULL(new_page_zip)) {
1959 /* Avoid tripping the ut_a() in mlog_parse_nbytes()
1960 when crash-downgrading to an earlier MariaDB 10.4 version. */
1961 btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
1962 btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
1963 } else {
1964 mlog_memset(new_block, FIL_PAGE_PREV, 8, 0xff, mtr);
1965 }
1966 #else
1967 mlog_memset(new_block, FIL_PAGE_PREV, 8, 0xff, mtr);
1968 if (UNIV_LIKELY_NULL(new_page_zip)) {
1969 memset(new_page_zip->data + FIL_PAGE_PREV, 0xff, 8);
1970 }
1971 #endif
1972
1973 /* Copy the records from root to the new page one by one. */
1974
1975 if (0
1976 #ifdef UNIV_ZIP_COPY
1977 || new_page_zip
1978 #endif /* UNIV_ZIP_COPY */
1979 || !page_copy_rec_list_end(new_block, root_block,
1980 page_get_infimum_rec(root),
1981 index, mtr)) {
1982 ut_a(new_page_zip);
1983
1984 /* Copy the page byte for byte. */
1985 page_zip_copy_recs(new_page_zip, new_page,
1986 root_page_zip, root, index, mtr);
1987
1988 /* Update the lock table and possible hash index. */
1989 lock_move_rec_list_end(new_block, root_block,
1990 page_get_infimum_rec(root));
1991
1992 /* Move any existing predicate locks */
1993 if (dict_index_is_spatial(index)) {
1994 lock_prdt_rec_move(new_block, root_block);
1995 } else {
1996 btr_search_move_or_delete_hash_entries(
1997 new_block, root_block);
1998 }
1999 }
2000
2001 if (dict_index_is_sec_or_ibuf(index)) {
2002 /* In secondary indexes and the change buffer,
2003 PAGE_MAX_TRX_ID can be reset on the root page, because
2004 the field only matters on leaf pages, and the root no
2005 longer is a leaf page. (Older versions of InnoDB did
2006 set PAGE_MAX_TRX_ID on all secondary index pages.) */
2007 if (root_page_zip) {
2008 byte* p = PAGE_HEADER + PAGE_MAX_TRX_ID + root;
2009 memset(p, 0, 8);
2010 page_zip_write_header(root_page_zip, p, 8, mtr);
2011 } else {
2012 mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID
2013 + root, 0, mtr);
2014 }
2015 } else {
2016 /* PAGE_ROOT_AUTO_INC is only present in the clustered index
2017 root page; on other clustered index pages, we want to reserve
2018 the field PAGE_MAX_TRX_ID for future use. */
2019 if (new_page_zip) {
2020 byte* p = PAGE_HEADER + PAGE_MAX_TRX_ID + new_page;
2021 memset(p, 0, 8);
2022 page_zip_write_header(new_page_zip, p, 8, mtr);
2023 } else {
2024 mlog_write_ull(PAGE_HEADER + PAGE_MAX_TRX_ID
2025 + new_page, 0, mtr);
2026 }
2027 }
2028
2029 /* If this is a pessimistic insert which is actually done to
2030 perform a pessimistic update then we have stored the lock
2031 information of the record to be inserted on the infimum of the
2032 root page: we cannot discard the lock structs on the root page */
2033
2034 if (!dict_table_is_locking_disabled(index->table)) {
2035 lock_update_root_raise(new_block, root_block);
2036 }
2037
2038 /* Create a memory heap where the node pointer is stored */
2039 if (!*heap) {
2040 *heap = mem_heap_create(1000);
2041 }
2042
2043 rec = page_rec_get_next(page_get_infimum_rec(new_page));
2044 new_page_no = new_block->page.id.page_no();
2045
2046 /* Build the node pointer (= node key and page address) for the
2047 child */
2048 if (dict_index_is_spatial(index)) {
2049 rtr_mbr_t new_mbr;
2050
2051 rtr_page_cal_mbr(index, new_block, &new_mbr, *heap);
2052 node_ptr = rtr_index_build_node_ptr(
2053 index, &new_mbr, rec, new_page_no, *heap);
2054 } else {
2055 node_ptr = dict_index_build_node_ptr(
2056 index, rec, new_page_no, *heap, level);
2057 }
2058 /* The node pointer must be marked as the predefined minimum record,
2059 as there is no lower alphabetical limit to records in the leftmost
2060 node of a level: */
2061 dtuple_set_info_bits(node_ptr,
2062 dtuple_get_info_bits(node_ptr)
2063 | REC_INFO_MIN_REC_FLAG);
2064
2065 /* Rebuild the root page to get free space */
2066 btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
2067 /* btr_page_empty() is supposed to zero-initialize the field. */
2068 ut_ad(!page_get_instant(root_block->frame));
2069
2070 if (index->is_instant()) {
2071 ut_ad(!root_page_zip);
2072 btr_set_instant(root_block, *index, mtr);
2073 }
2074
2075 ut_ad(!page_has_siblings(root));
2076
2077 page_cursor = btr_cur_get_page_cur(cursor);
2078
2079 /* Insert node pointer to the root */
2080
2081 page_cur_set_before_first(root_block, page_cursor);
2082
2083 node_ptr_rec = page_cur_tuple_insert(page_cursor, node_ptr,
2084 index, offsets, heap, 0, mtr);
2085
2086 /* The root page should only contain the node pointer
2087 to new_page at this point. Thus, the data should fit. */
2088 ut_a(node_ptr_rec);
2089
2090 /* We play safe and reset the free bits for the new page */
2091
2092 if (!dict_index_is_clust(index)
2093 && !index->table->is_temporary()) {
2094 ibuf_reset_free_bits(new_block);
2095 }
2096
2097 if (tuple != NULL) {
2098 /* Reposition the cursor to the child node */
2099 page_cur_search(new_block, index, tuple, page_cursor);
2100 } else {
2101 /* Set cursor to first record on child node */
2102 page_cur_set_before_first(new_block, page_cursor);
2103 }
2104
2105 /* Split the child and insert tuple */
2106 if (dict_index_is_spatial(index)) {
2107 /* Split rtree page and insert tuple */
2108 return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2109 tuple, n_ext, mtr));
2110 } else {
2111 return(btr_page_split_and_insert(flags, cursor, offsets, heap,
2112 tuple, n_ext, mtr));
2113 }
2114 }
2115
2116 /** Decide if the page should be split at the convergence point of inserts
2117 converging to the left.
2118 @param[in] cursor insert position
2119 @return the first record to be moved to the right half page
2120 @retval NULL if no split is recommended */
btr_page_get_split_rec_to_left(const btr_cur_t * cursor)2121 rec_t* btr_page_get_split_rec_to_left(const btr_cur_t* cursor)
2122 {
2123 rec_t* split_rec = btr_cur_get_rec(cursor);
2124 const page_t* page = page_align(split_rec);
2125
2126 if (page_header_get_ptr(page, PAGE_LAST_INSERT)
2127 != page_rec_get_next(split_rec)) {
2128 return NULL;
2129 }
2130
2131 /* The metadata record must be present in the leftmost leaf page
2132 of the clustered index, if and only if index->is_instant().
2133 However, during innobase_instant_try(), index->is_instant()
2134 would already hold when row_ins_clust_index_entry_low()
2135 is being invoked to insert the the metadata record.
2136 So, we can only assert that when the metadata record exists,
2137 index->is_instant() must hold. */
2138 ut_ad(!page_is_leaf(page) || page_has_prev(page)
2139 || cursor->index->is_instant()
2140 || !(rec_get_info_bits(page_rec_get_next_const(
2141 page_get_infimum_rec(page)),
2142 cursor->index->table->not_redundant())
2143 & REC_INFO_MIN_REC_FLAG));
2144
2145 const rec_t* infimum = page_get_infimum_rec(page);
2146
2147 /* If the convergence is in the middle of a page, include also
2148 the record immediately before the new insert to the upper
2149 page. Otherwise, we could repeatedly move from page to page
2150 lots of records smaller than the convergence point. */
2151
2152 if (split_rec == infimum
2153 || split_rec == page_rec_get_next_const(infimum)) {
2154 split_rec = page_rec_get_next(split_rec);
2155 }
2156
2157 return split_rec;
2158 }
2159
2160 /** Decide if the page should be split at the convergence point of inserts
2161 converging to the right.
2162 @param[in] cursor insert position
2163 @param[out] split_rec if split recommended, the first record
2164 on the right half page, or
2165 NULL if the to-be-inserted record
2166 should be first
2167 @return whether split is recommended */
2168 bool
btr_page_get_split_rec_to_right(const btr_cur_t * cursor,rec_t ** split_rec)2169 btr_page_get_split_rec_to_right(const btr_cur_t* cursor, rec_t** split_rec)
2170 {
2171 rec_t* insert_point = btr_cur_get_rec(cursor);
2172 const page_t* page = page_align(insert_point);
2173
2174 /* We use eager heuristics: if the new insert would be right after
2175 the previous insert on the same page, we assume that there is a
2176 pattern of sequential inserts here. */
2177
2178 if (page_header_get_ptr(page, PAGE_LAST_INSERT) != insert_point) {
2179 return false;
2180 }
2181
2182 insert_point = page_rec_get_next(insert_point);
2183
2184 if (page_rec_is_supremum(insert_point)) {
2185 insert_point = NULL;
2186 } else {
2187 insert_point = page_rec_get_next(insert_point);
2188 if (page_rec_is_supremum(insert_point)) {
2189 insert_point = NULL;
2190 }
2191
2192 /* If there are >= 2 user records up from the insert
2193 point, split all but 1 off. We want to keep one because
2194 then sequential inserts can use the adaptive hash
2195 index, as they can do the necessary checks of the right
2196 search position just by looking at the records on this
2197 page. */
2198 }
2199
2200 *split_rec = insert_point;
2201 return true;
2202 }
2203
2204 /*************************************************************//**
2205 Calculates a split record such that the tuple will certainly fit on
2206 its half-page when the split is performed. We assume in this function
2207 only that the cursor page has at least one user record.
2208 @return split record, or NULL if tuple will be the first record on
2209 the lower or upper half-page (determined by btr_page_tuple_smaller()) */
2210 static
2211 rec_t*
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple,ulint n_ext)2212 btr_page_get_split_rec(
2213 /*===================*/
2214 btr_cur_t* cursor, /*!< in: cursor at which insert should be made */
2215 const dtuple_t* tuple, /*!< in: tuple to insert */
2216 ulint n_ext) /*!< in: number of externally stored columns */
2217 {
2218 page_t* page;
2219 page_zip_des_t* page_zip;
2220 ulint insert_size;
2221 ulint free_space;
2222 ulint total_data;
2223 ulint total_n_recs;
2224 ulint total_space;
2225 ulint incl_data;
2226 rec_t* ins_rec;
2227 rec_t* rec;
2228 rec_t* next_rec;
2229 ulint n;
2230 mem_heap_t* heap;
2231 rec_offs* offsets;
2232
2233 page = btr_cur_get_page(cursor);
2234
2235 insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2236 free_space = page_get_free_space_of_empty(page_is_comp(page));
2237
2238 page_zip = btr_cur_get_page_zip(cursor);
2239 if (page_zip) {
2240 /* Estimate the free space of an empty compressed page. */
2241 ulint free_space_zip = page_zip_empty_size(
2242 cursor->index->n_fields,
2243 page_zip_get_size(page_zip));
2244
2245 if (free_space > (ulint) free_space_zip) {
2246 free_space = (ulint) free_space_zip;
2247 }
2248 }
2249
2250 /* free_space is now the free space of a created new page */
2251
2252 total_data = page_get_data_size(page) + insert_size;
2253 total_n_recs = ulint(page_get_n_recs(page)) + 1;
2254 ut_ad(total_n_recs >= 2);
2255 total_space = total_data + page_dir_calc_reserved_space(total_n_recs);
2256
2257 n = 0;
2258 incl_data = 0;
2259 ins_rec = btr_cur_get_rec(cursor);
2260 rec = page_get_infimum_rec(page);
2261
2262 heap = NULL;
2263 offsets = NULL;
2264
2265 /* We start to include records to the left half, and when the
2266 space reserved by them exceeds half of total_space, then if
2267 the included records fit on the left page, they will be put there
2268 if something was left over also for the right page,
2269 otherwise the last included record will be the first on the right
2270 half page */
2271
2272 do {
2273 /* Decide the next record to include */
2274 if (rec == ins_rec) {
2275 rec = NULL; /* NULL denotes that tuple is
2276 now included */
2277 } else if (rec == NULL) {
2278 rec = page_rec_get_next(ins_rec);
2279 } else {
2280 rec = page_rec_get_next(rec);
2281 }
2282
2283 if (rec == NULL) {
2284 /* Include tuple */
2285 incl_data += insert_size;
2286 } else {
2287 offsets = rec_get_offsets(rec, cursor->index, offsets,
2288 page_is_leaf(page)
2289 ? cursor->index->n_core_fields
2290 : 0,
2291 ULINT_UNDEFINED, &heap);
2292 incl_data += rec_offs_size(offsets);
2293 }
2294
2295 n++;
2296 } while (incl_data + page_dir_calc_reserved_space(n)
2297 < total_space / 2);
2298
2299 if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
2300 /* The next record will be the first on
2301 the right half page if it is not the
2302 supremum record of page */
2303
2304 if (rec == ins_rec) {
2305 rec = NULL;
2306
2307 goto func_exit;
2308 } else if (rec == NULL) {
2309 next_rec = page_rec_get_next(ins_rec);
2310 } else {
2311 next_rec = page_rec_get_next(rec);
2312 }
2313 ut_ad(next_rec);
2314 if (!page_rec_is_supremum(next_rec)) {
2315 rec = next_rec;
2316 }
2317 }
2318
2319 func_exit:
2320 if (heap) {
2321 mem_heap_free(heap);
2322 }
2323 return(rec);
2324 }
2325
2326 /*************************************************************//**
2327 Returns TRUE if the insert fits on the appropriate half-page with the
2328 chosen split_rec.
2329 @return true if fits */
2330 static MY_ATTRIBUTE((nonnull(1,3,4,6), warn_unused_result))
2331 bool
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,rec_offs ** offsets,const dtuple_t * tuple,ulint n_ext,mem_heap_t ** heap)2332 btr_page_insert_fits(
2333 /*=================*/
2334 btr_cur_t* cursor, /*!< in: cursor at which insert
2335 should be made */
2336 const rec_t* split_rec,/*!< in: suggestion for first record
2337 on upper half-page, or NULL if
2338 tuple to be inserted should be first */
2339 rec_offs** offsets,/*!< in: rec_get_offsets(
2340 split_rec, cursor->index); out: garbage */
2341 const dtuple_t* tuple, /*!< in: tuple to insert */
2342 ulint n_ext, /*!< in: number of externally stored columns */
2343 mem_heap_t** heap) /*!< in: temporary memory heap */
2344 {
2345 page_t* page;
2346 ulint insert_size;
2347 ulint free_space;
2348 ulint total_data;
2349 ulint total_n_recs;
2350 const rec_t* rec;
2351 const rec_t* end_rec;
2352
2353 page = btr_cur_get_page(cursor);
2354
2355 ut_ad(!split_rec
2356 || !page_is_comp(page) == !rec_offs_comp(*offsets));
2357 ut_ad(!split_rec
2358 || rec_offs_validate(split_rec, cursor->index, *offsets));
2359
2360 insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
2361 free_space = page_get_free_space_of_empty(page_is_comp(page));
2362
2363 /* free_space is now the free space of a created new page */
2364
2365 total_data = page_get_data_size(page) + insert_size;
2366 total_n_recs = ulint(page_get_n_recs(page)) + 1;
2367
2368 /* We determine which records (from rec to end_rec, not including
2369 end_rec) will end up on the other half page from tuple when it is
2370 inserted. */
2371
2372 if (split_rec == NULL) {
2373 rec = page_rec_get_next(page_get_infimum_rec(page));
2374 end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
2375
2376 } else if (cmp_dtuple_rec(tuple, split_rec, *offsets) >= 0) {
2377
2378 rec = page_rec_get_next(page_get_infimum_rec(page));
2379 end_rec = split_rec;
2380 } else {
2381 rec = split_rec;
2382 end_rec = page_get_supremum_rec(page);
2383 }
2384
2385 if (total_data + page_dir_calc_reserved_space(total_n_recs)
2386 <= free_space) {
2387
2388 /* Ok, there will be enough available space on the
2389 half page where the tuple is inserted */
2390
2391 return(true);
2392 }
2393
2394 while (rec != end_rec) {
2395 /* In this loop we calculate the amount of reserved
2396 space after rec is removed from page. */
2397
2398 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
2399 page_is_leaf(page)
2400 ? cursor->index->n_core_fields
2401 : 0,
2402 ULINT_UNDEFINED, heap);
2403
2404 total_data -= rec_offs_size(*offsets);
2405 total_n_recs--;
2406
2407 if (total_data + page_dir_calc_reserved_space(total_n_recs)
2408 <= free_space) {
2409
2410 /* Ok, there will be enough available space on the
2411 half page where the tuple is inserted */
2412
2413 return(true);
2414 }
2415
2416 rec = page_rec_get_next_const(rec);
2417 }
2418
2419 return(false);
2420 }
2421
2422 /*******************************************************//**
2423 Inserts a data tuple to a tree on a non-leaf level. It is assumed
2424 that mtr holds an x-latch on the tree. */
2425 void
btr_insert_on_non_leaf_level_func(ulint flags,dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,unsigned line,mtr_t * mtr)2426 btr_insert_on_non_leaf_level_func(
2427 /*==============================*/
2428 ulint flags, /*!< in: undo logging and locking flags */
2429 dict_index_t* index, /*!< in: index */
2430 ulint level, /*!< in: level, must be > 0 */
2431 dtuple_t* tuple, /*!< in: the record to be inserted */
2432 const char* file, /*!< in: file name */
2433 unsigned line, /*!< in: line where called */
2434 mtr_t* mtr) /*!< in: mtr */
2435 {
2436 big_rec_t* dummy_big_rec;
2437 btr_cur_t cursor;
2438 dberr_t err;
2439 rec_t* rec;
2440 mem_heap_t* heap = NULL;
2441 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
2442 rec_offs* offsets = offsets_;
2443 rec_offs_init(offsets_);
2444 rtr_info_t rtr_info;
2445
2446 ut_ad(level > 0);
2447
2448 if (!dict_index_is_spatial(index)) {
2449 dberr_t err = btr_cur_search_to_nth_level(
2450 index, level, tuple, PAGE_CUR_LE,
2451 BTR_CONT_MODIFY_TREE,
2452 &cursor, 0, file, line, mtr);
2453
2454 if (err != DB_SUCCESS) {
2455 ib::warn() << " Error code: " << err
2456 << " btr_page_get_father_node_ptr_func "
2457 << " level: " << level
2458 << " called from file: "
2459 << file << " line: " << line
2460 << " table: " << index->table->name
2461 << " index: " << index->name;
2462 }
2463 } else {
2464 /* For spatial index, initialize structures to track
2465 its parents etc. */
2466 rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
2467
2468 rtr_info_update_btr(&cursor, &rtr_info);
2469
2470 btr_cur_search_to_nth_level(index, level, tuple,
2471 PAGE_CUR_RTREE_INSERT,
2472 BTR_CONT_MODIFY_TREE,
2473 &cursor, 0, file, line, mtr);
2474 }
2475
2476 ut_ad(cursor.flag == BTR_CUR_BINARY);
2477
2478 err = btr_cur_optimistic_insert(
2479 flags
2480 | BTR_NO_LOCKING_FLAG
2481 | BTR_KEEP_SYS_FLAG
2482 | BTR_NO_UNDO_LOG_FLAG,
2483 &cursor, &offsets, &heap,
2484 tuple, &rec, &dummy_big_rec, 0, NULL, mtr);
2485
2486 if (err == DB_FAIL) {
2487 err = btr_cur_pessimistic_insert(flags
2488 | BTR_NO_LOCKING_FLAG
2489 | BTR_KEEP_SYS_FLAG
2490 | BTR_NO_UNDO_LOG_FLAG,
2491 &cursor, &offsets, &heap,
2492 tuple, &rec,
2493 &dummy_big_rec, 0, NULL, mtr);
2494 ut_a(err == DB_SUCCESS);
2495 }
2496
2497 if (heap != NULL) {
2498 mem_heap_free(heap);
2499 }
2500
2501 if (dict_index_is_spatial(index)) {
2502 ut_ad(cursor.rtr_info);
2503
2504 rtr_clean_rtr_info(&rtr_info, true);
2505 }
2506 }
2507
2508 /**************************************************************//**
2509 Attaches the halves of an index page on the appropriate level in an
2510 index tree. */
2511 static MY_ATTRIBUTE((nonnull))
2512 void
btr_attach_half_pages(ulint flags,dict_index_t * index,buf_block_t * block,const rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2513 btr_attach_half_pages(
2514 /*==================*/
2515 ulint flags, /*!< in: undo logging and
2516 locking flags */
2517 dict_index_t* index, /*!< in: the index tree */
2518 buf_block_t* block, /*!< in/out: page to be split */
2519 const rec_t* split_rec, /*!< in: first record on upper
2520 half page */
2521 buf_block_t* new_block, /*!< in/out: the new half page */
2522 ulint direction, /*!< in: FSP_UP or FSP_DOWN */
2523 mtr_t* mtr) /*!< in: mtr */
2524 {
2525 ulint prev_page_no;
2526 ulint next_page_no;
2527 ulint level;
2528 page_t* page = buf_block_get_frame(block);
2529 page_t* lower_page;
2530 page_t* upper_page;
2531 ulint lower_page_no;
2532 ulint upper_page_no;
2533 page_zip_des_t* lower_page_zip;
2534 page_zip_des_t* upper_page_zip;
2535 dtuple_t* node_ptr_upper;
2536 mem_heap_t* heap;
2537 buf_block_t* prev_block = NULL;
2538 buf_block_t* next_block = NULL;
2539
2540 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2541 ut_ad(mtr_memo_contains(mtr, new_block, MTR_MEMO_PAGE_X_FIX));
2542
2543 /* Create a memory heap where the data tuple is stored */
2544 heap = mem_heap_create(1024);
2545
2546 /* Based on split direction, decide upper and lower pages */
2547 if (direction == FSP_DOWN) {
2548
2549 btr_cur_t cursor;
2550 rec_offs* offsets;
2551
2552 lower_page = buf_block_get_frame(new_block);
2553 lower_page_no = new_block->page.id.page_no();
2554 lower_page_zip = buf_block_get_page_zip(new_block);
2555 upper_page = buf_block_get_frame(block);
2556 upper_page_no = block->page.id.page_no();
2557 upper_page_zip = buf_block_get_page_zip(block);
2558
2559 /* Look up the index for the node pointer to page */
2560 offsets = btr_page_get_father_block(NULL, heap, index,
2561 block, mtr, &cursor);
2562
2563 /* Replace the address of the old child node (= page) with the
2564 address of the new lower half */
2565
2566 btr_node_ptr_set_child_page_no(
2567 btr_cur_get_rec(&cursor),
2568 btr_cur_get_page_zip(&cursor),
2569 offsets, lower_page_no, mtr);
2570 mem_heap_empty(heap);
2571 } else {
2572 lower_page = buf_block_get_frame(block);
2573 lower_page_no = block->page.id.page_no();
2574 lower_page_zip = buf_block_get_page_zip(block);
2575 upper_page = buf_block_get_frame(new_block);
2576 upper_page_no = new_block->page.id.page_no();
2577 upper_page_zip = buf_block_get_page_zip(new_block);
2578 }
2579
2580 /* Get the previous and next pages of page */
2581 prev_page_no = btr_page_get_prev(page);
2582 next_page_no = btr_page_get_next(page);
2583
2584 const ulint space = block->page.id.space();
2585
2586 /* for consistency, both blocks should be locked, before change */
2587 if (prev_page_no != FIL_NULL && direction == FSP_DOWN) {
2588 prev_block = btr_block_get(
2589 page_id_t(space, prev_page_no), block->zip_size(),
2590 RW_X_LATCH, index, mtr);
2591 }
2592 if (next_page_no != FIL_NULL && direction != FSP_DOWN) {
2593 next_block = btr_block_get(
2594 page_id_t(space, next_page_no), block->zip_size(),
2595 RW_X_LATCH, index, mtr);
2596 }
2597
2598 /* Get the level of the split pages */
2599 level = btr_page_get_level(buf_block_get_frame(block));
2600 ut_ad(level == btr_page_get_level(buf_block_get_frame(new_block)));
2601
2602 /* Build the node pointer (= node key and page address) for the upper
2603 half */
2604
2605 node_ptr_upper = dict_index_build_node_ptr(index, split_rec,
2606 upper_page_no, heap, level);
2607
2608 /* Insert it next to the pointer to the lower half. Note that this
2609 may generate recursion leading to a split on the higher level. */
2610
2611 btr_insert_on_non_leaf_level(flags, index, level + 1,
2612 node_ptr_upper, mtr);
2613
2614 /* Free the memory heap */
2615 mem_heap_free(heap);
2616
2617 /* Update page links of the level */
2618
2619 if (prev_block) {
2620 #ifdef UNIV_BTR_DEBUG
2621 ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
2622 ut_a(btr_page_get_next(prev_block->frame)
2623 == block->page.id.page_no());
2624 #endif /* UNIV_BTR_DEBUG */
2625
2626 btr_page_set_next(buf_block_get_frame(prev_block),
2627 buf_block_get_page_zip(prev_block),
2628 lower_page_no, mtr);
2629 }
2630
2631 if (next_block) {
2632 #ifdef UNIV_BTR_DEBUG
2633 ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
2634 ut_a(btr_page_get_prev(next_block->frame)
2635 == page_get_page_no(page));
2636 #endif /* UNIV_BTR_DEBUG */
2637
2638 btr_page_set_prev(buf_block_get_frame(next_block),
2639 buf_block_get_page_zip(next_block),
2640 upper_page_no, mtr);
2641 }
2642
2643 if (direction == FSP_DOWN) {
2644 /* lower_page is new */
2645 btr_page_set_prev(lower_page, lower_page_zip,
2646 prev_page_no, mtr);
2647 } else {
2648 ut_ad(btr_page_get_prev(lower_page) == prev_page_no);
2649 }
2650
2651 btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
2652 btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
2653
2654 if (direction != FSP_DOWN) {
2655 /* upper_page is new */
2656 btr_page_set_next(upper_page, upper_page_zip,
2657 next_page_no, mtr);
2658 } else {
2659 ut_ad(btr_page_get_next(upper_page) == next_page_no);
2660 }
2661 }
2662
2663 /*************************************************************//**
2664 Determine if a tuple is smaller than any record on the page.
2665 @return TRUE if smaller */
2666 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2667 bool
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,rec_offs ** offsets,ulint n_uniq,mem_heap_t ** heap)2668 btr_page_tuple_smaller(
2669 /*===================*/
2670 btr_cur_t* cursor, /*!< in: b-tree cursor */
2671 const dtuple_t* tuple, /*!< in: tuple to consider */
2672 rec_offs** offsets,/*!< in/out: temporary storage */
2673 ulint n_uniq, /*!< in: number of unique fields
2674 in the index page records */
2675 mem_heap_t** heap) /*!< in/out: heap for offsets */
2676 {
2677 buf_block_t* block;
2678 const rec_t* first_rec;
2679 page_cur_t pcur;
2680
2681 /* Read the first user record in the page. */
2682 block = btr_cur_get_block(cursor);
2683 page_cur_set_before_first(block, &pcur);
2684 page_cur_move_to_next(&pcur);
2685 first_rec = page_cur_get_rec(&pcur);
2686
2687 *offsets = rec_get_offsets(
2688 first_rec, cursor->index, *offsets,
2689 page_is_leaf(block->frame) ? cursor->index->n_core_fields : 0,
2690 n_uniq, heap);
2691
2692 return(cmp_dtuple_rec(tuple, first_rec, *offsets) < 0);
2693 }
2694
2695 /** Insert the tuple into the right sibling page, if the cursor is at the end
2696 of a page.
2697 @param[in] flags undo logging and locking flags
2698 @param[in,out] cursor cursor at which to insert; when the function succeeds,
2699 the cursor is positioned before the insert point.
2700 @param[out] offsets offsets on inserted record
2701 @param[in,out] heap memory heap for allocating offsets
2702 @param[in] tuple tuple to insert
2703 @param[in] n_ext number of externally stored columns
2704 @param[in,out] mtr mini-transaction
2705 @return inserted record (first record on the right sibling page);
2706 the cursor will be positioned on the page infimum
2707 @retval NULL if the operation was not performed */
2708 static
2709 rec_t*
btr_insert_into_right_sibling(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t * heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2710 btr_insert_into_right_sibling(
2711 ulint flags,
2712 btr_cur_t* cursor,
2713 rec_offs** offsets,
2714 mem_heap_t* heap,
2715 const dtuple_t* tuple,
2716 ulint n_ext,
2717 mtr_t* mtr)
2718 {
2719 buf_block_t* block = btr_cur_get_block(cursor);
2720 page_t* page = buf_block_get_frame(block);
2721 const uint32_t next_page_no = btr_page_get_next(page);
2722
2723 ut_ad(mtr_memo_contains_flagged(
2724 mtr, dict_index_get_lock(cursor->index),
2725 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2726 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2727 ut_ad(heap);
2728
2729 if (next_page_no == FIL_NULL || !page_rec_is_supremum(
2730 page_rec_get_next(btr_cur_get_rec(cursor)))) {
2731
2732 return(NULL);
2733 }
2734
2735 page_cur_t next_page_cursor;
2736 buf_block_t* next_block;
2737 page_t* next_page;
2738 btr_cur_t next_father_cursor;
2739 rec_t* rec = NULL;
2740 ulint max_size;
2741
2742 const ulint space = block->page.id.space();
2743
2744 next_block = btr_block_get(
2745 page_id_t(space, next_page_no), block->zip_size(),
2746 RW_X_LATCH, cursor->index, mtr);
2747 next_page = buf_block_get_frame(next_block);
2748
2749 bool is_leaf = page_is_leaf(next_page);
2750
2751 btr_page_get_father(
2752 cursor->index, next_block, mtr, &next_father_cursor);
2753
2754 page_cur_search(
2755 next_block, cursor->index, tuple, PAGE_CUR_LE,
2756 &next_page_cursor);
2757
2758 max_size = page_get_max_insert_size_after_reorganize(next_page, 1);
2759
2760 /* Extends gap lock for the next page */
2761 if (!dict_table_is_locking_disabled(cursor->index->table)) {
2762 lock_update_split_left(next_block, block);
2763 }
2764
2765 rec = page_cur_tuple_insert(
2766 &next_page_cursor, tuple, cursor->index, offsets, &heap,
2767 n_ext, mtr);
2768
2769 if (rec == NULL) {
2770 if (is_leaf
2771 && next_block->page.zip.ssize
2772 && !dict_index_is_clust(cursor->index)
2773 && !cursor->index->table->is_temporary()) {
2774 /* Reset the IBUF_BITMAP_FREE bits, because
2775 page_cur_tuple_insert() will have attempted page
2776 reorganize before failing. */
2777 ibuf_reset_free_bits(next_block);
2778 }
2779 return(NULL);
2780 }
2781
2782 ibool compressed;
2783 dberr_t err;
2784 ulint level = btr_page_get_level(next_page);
2785
2786 /* adjust cursor position */
2787 *btr_cur_get_page_cur(cursor) = next_page_cursor;
2788
2789 ut_ad(btr_cur_get_rec(cursor) == page_get_infimum_rec(next_page));
2790 ut_ad(page_rec_get_next(page_get_infimum_rec(next_page)) == rec);
2791
2792 /* We have to change the parent node pointer */
2793
2794 compressed = btr_cur_pessimistic_delete(
2795 &err, TRUE, &next_father_cursor,
2796 BTR_CREATE_FLAG, false, mtr);
2797
2798 ut_a(err == DB_SUCCESS);
2799
2800 if (!compressed) {
2801 btr_cur_compress_if_useful(&next_father_cursor, FALSE, mtr);
2802 }
2803
2804 dtuple_t* node_ptr = dict_index_build_node_ptr(
2805 cursor->index, rec, next_block->page.id.page_no(),
2806 heap, level);
2807
2808 btr_insert_on_non_leaf_level(
2809 flags, cursor->index, level + 1, node_ptr, mtr);
2810
2811 ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2812
2813 if (is_leaf
2814 && !dict_index_is_clust(cursor->index)
2815 && !cursor->index->table->is_temporary()) {
2816 /* Update the free bits of the B-tree page in the
2817 insert buffer bitmap. */
2818
2819 if (next_block->page.zip.ssize) {
2820 ibuf_update_free_bits_zip(next_block, mtr);
2821 } else {
2822 ibuf_update_free_bits_if_full(
2823 next_block, max_size,
2824 rec_offs_size(*offsets) + PAGE_DIR_SLOT_SIZE);
2825 }
2826 }
2827
2828 return(rec);
2829 }
2830
2831 /*************************************************************//**
2832 Splits an index page to halves and inserts the tuple. It is assumed
2833 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2834 released within this function! NOTE that the operation of this
2835 function must always succeed, we cannot reverse it: therefore enough
2836 free disk space (2 pages) must be guaranteed to be available before
2837 this function is called.
2838 NOTE: jonaso added support for calling function with tuple == NULL
2839 which cause it to only split a page.
2840
2841 @return inserted record or NULL if run out of space */
2842 rec_t*
btr_page_split_and_insert(ulint flags,btr_cur_t * cursor,rec_offs ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)2843 btr_page_split_and_insert(
2844 /*======================*/
2845 ulint flags, /*!< in: undo logging and locking flags */
2846 btr_cur_t* cursor, /*!< in: cursor at which to insert; when the
2847 function returns, the cursor is positioned
2848 on the predecessor of the inserted record */
2849 rec_offs** offsets,/*!< out: offsets on inserted record */
2850 mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */
2851 const dtuple_t* tuple, /*!< in: tuple to insert */
2852 ulint n_ext, /*!< in: number of externally stored columns */
2853 mtr_t* mtr) /*!< in: mtr */
2854 {
2855 buf_block_t* block;
2856 page_t* page;
2857 page_zip_des_t* page_zip;
2858 buf_block_t* new_block;
2859 page_t* new_page;
2860 page_zip_des_t* new_page_zip;
2861 rec_t* split_rec;
2862 buf_block_t* left_block;
2863 buf_block_t* right_block;
2864 page_cur_t* page_cursor;
2865 rec_t* first_rec;
2866 byte* buf = 0; /* remove warning */
2867 rec_t* move_limit;
2868 ulint n_iterations = 0;
2869 ulint n_uniq;
2870
2871 if (cursor->index->is_spatial()) {
2872 /* Split rtree page and update parent */
2873 return(rtr_page_split_and_insert(flags, cursor, offsets, heap,
2874 tuple, n_ext, mtr));
2875 }
2876
2877 if (!*heap) {
2878 *heap = mem_heap_create(1024);
2879 }
2880 n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
2881 func_start:
2882 mem_heap_empty(*heap);
2883 *offsets = NULL;
2884
2885 ut_ad(mtr_memo_contains_flagged(mtr,
2886 dict_index_get_lock(cursor->index),
2887 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2888 ut_ad(!dict_index_is_online_ddl(cursor->index)
2889 || (flags & BTR_CREATE_FLAG)
2890 || dict_index_is_clust(cursor->index));
2891 ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
2892 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
2893
2894 block = btr_cur_get_block(cursor);
2895 page = buf_block_get_frame(block);
2896 page_zip = buf_block_get_page_zip(block);
2897
2898 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
2899 ut_ad(!page_is_empty(page));
2900
2901 /* try to insert to the next page if possible before split */
2902 if (rec_t* rec = btr_insert_into_right_sibling(
2903 flags, cursor, offsets, *heap, tuple, n_ext, mtr)) {
2904 return(rec);
2905 }
2906
2907 /* 1. Decide the split record; split_rec == NULL means that the
2908 tuple to be inserted should be the first record on the upper
2909 half-page */
2910 bool insert_left = false;
2911 ulint hint_page_no = block->page.id.page_no() + 1;
2912 byte direction = FSP_UP;
2913
2914 if (tuple && n_iterations > 0) {
2915 split_rec = btr_page_get_split_rec(cursor, tuple, n_ext);
2916
2917 if (split_rec == NULL) {
2918 insert_left = btr_page_tuple_smaller(
2919 cursor, tuple, offsets, n_uniq, heap);
2920 }
2921 } else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
2922 } else if ((split_rec = btr_page_get_split_rec_to_left(cursor))) {
2923 direction = FSP_DOWN;
2924 hint_page_no -= 2;
2925 } else {
2926 /* If there is only one record in the index page, we
2927 can't split the node in the middle by default. We need
2928 to determine whether the new record will be inserted
2929 to the left or right. */
2930
2931 if (page_get_n_recs(page) > 1) {
2932 split_rec = page_get_middle_rec(page);
2933 } else if (btr_page_tuple_smaller(cursor, tuple,
2934 offsets, n_uniq, heap)) {
2935 split_rec = page_rec_get_next(
2936 page_get_infimum_rec(page));
2937 } else {
2938 split_rec = NULL;
2939 }
2940 }
2941
2942 DBUG_EXECUTE_IF("disk_is_full",
2943 os_has_said_disk_full = true;
2944 return(NULL););
2945
2946 /* 2. Allocate a new page to the index */
2947 new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2948 btr_page_get_level(page), mtr, mtr);
2949
2950 if (!new_block) {
2951 return(NULL);
2952 }
2953
2954 new_page = buf_block_get_frame(new_block);
2955 new_page_zip = buf_block_get_page_zip(new_block);
2956 btr_page_create(new_block, new_page_zip, cursor->index,
2957 btr_page_get_level(page), mtr);
2958 /* Only record the leaf level page splits. */
2959 if (page_is_leaf(page)) {
2960 cursor->index->stat_defrag_n_page_split ++;
2961 cursor->index->stat_defrag_modified_counter ++;
2962 btr_defragment_save_defrag_stats_if_needed(cursor->index);
2963 }
2964
2965 /* 3. Calculate the first record on the upper half-page, and the
2966 first record (move_limit) on original page which ends up on the
2967 upper half */
2968
2969 if (split_rec) {
2970 first_rec = move_limit = split_rec;
2971
2972 *offsets = rec_get_offsets(split_rec, cursor->index, *offsets,
2973 page_is_leaf(page)
2974 ? cursor->index->n_core_fields : 0,
2975 n_uniq, heap);
2976
2977 insert_left = !tuple
2978 || cmp_dtuple_rec(tuple, split_rec, *offsets) < 0;
2979
2980 if (!insert_left && new_page_zip && n_iterations > 0) {
2981 /* If a compressed page has already been split,
2982 avoid further splits by inserting the record
2983 to an empty page. */
2984 split_rec = NULL;
2985 goto insert_empty;
2986 }
2987 } else if (insert_left) {
2988 ut_a(n_iterations > 0);
2989 first_rec = page_rec_get_next(page_get_infimum_rec(page));
2990 move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2991 } else {
2992 insert_empty:
2993 ut_ad(!split_rec);
2994 ut_ad(!insert_left);
2995 buf = UT_NEW_ARRAY_NOKEY(
2996 byte,
2997 rec_get_converted_size(cursor->index, tuple, n_ext));
2998
2999 first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
3000 tuple, n_ext);
3001 move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
3002 }
3003
3004 /* 4. Do first the modifications in the tree structure */
3005
3006 btr_attach_half_pages(flags, cursor->index, block,
3007 first_rec, new_block, direction, mtr);
3008
3009 /* If the split is made on the leaf level and the insert will fit
3010 on the appropriate half-page, we may release the tree x-latch.
3011 We can then move the records after releasing the tree latch,
3012 thus reducing the tree latch contention. */
3013 bool insert_will_fit;
3014 if (tuple == NULL) {
3015 insert_will_fit = true;
3016 } else if (split_rec) {
3017 insert_will_fit = !new_page_zip
3018 && btr_page_insert_fits(cursor, split_rec,
3019 offsets, tuple, n_ext, heap);
3020 } else {
3021 if (!insert_left) {
3022 UT_DELETE_ARRAY(buf);
3023 buf = NULL;
3024 }
3025
3026 insert_will_fit = !new_page_zip
3027 && btr_page_insert_fits(cursor, NULL,
3028 offsets, tuple, n_ext, heap);
3029 }
3030
3031 if (!srv_read_only_mode
3032 && insert_will_fit
3033 && page_is_leaf(page)
3034 && !dict_index_is_online_ddl(cursor->index)) {
3035
3036 mtr->memo_release(
3037 dict_index_get_lock(cursor->index),
3038 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK);
3039
3040 /* NOTE: We cannot release root block latch here, because it
3041 has segment header and already modified in most of cases.*/
3042 }
3043
3044 /* 5. Move then the records to the new page */
3045 if (direction == FSP_DOWN) {
3046 /* fputs("Split left\n", stderr); */
3047
3048 if (0
3049 #ifdef UNIV_ZIP_COPY
3050 || page_zip
3051 #endif /* UNIV_ZIP_COPY */
3052 || !page_move_rec_list_start(new_block, block, move_limit,
3053 cursor->index, mtr)) {
3054 /* For some reason, compressing new_page failed,
3055 even though it should contain fewer records than
3056 the original page. Copy the page byte for byte
3057 and then delete the records from both pages
3058 as appropriate. Deleting will always succeed. */
3059 ut_a(new_page_zip);
3060
3061 page_zip_copy_recs(new_page_zip, new_page,
3062 page_zip, page, cursor->index, mtr);
3063 page_delete_rec_list_end(move_limit - page + new_page,
3064 new_block, cursor->index,
3065 ULINT_UNDEFINED,
3066 ULINT_UNDEFINED, mtr);
3067
3068 /* Update the lock table and possible hash index. */
3069 lock_move_rec_list_start(
3070 new_block, block, move_limit,
3071 new_page + PAGE_NEW_INFIMUM);
3072
3073 btr_search_move_or_delete_hash_entries(
3074 new_block, block);
3075
3076 /* Delete the records from the source page. */
3077
3078 page_delete_rec_list_start(move_limit, block,
3079 cursor->index, mtr);
3080 }
3081
3082 left_block = new_block;
3083 right_block = block;
3084
3085 if (!dict_table_is_locking_disabled(cursor->index->table)) {
3086 lock_update_split_left(right_block, left_block);
3087 }
3088 } else {
3089 /* fputs("Split right\n", stderr); */
3090
3091 if (0
3092 #ifdef UNIV_ZIP_COPY
3093 || page_zip
3094 #endif /* UNIV_ZIP_COPY */
3095 || !page_move_rec_list_end(new_block, block, move_limit,
3096 cursor->index, mtr)) {
3097 /* For some reason, compressing new_page failed,
3098 even though it should contain fewer records than
3099 the original page. Copy the page byte for byte
3100 and then delete the records from both pages
3101 as appropriate. Deleting will always succeed. */
3102 ut_a(new_page_zip);
3103
3104 page_zip_copy_recs(new_page_zip, new_page,
3105 page_zip, page, cursor->index, mtr);
3106 page_delete_rec_list_start(move_limit - page
3107 + new_page, new_block,
3108 cursor->index, mtr);
3109
3110 /* Update the lock table and possible hash index. */
3111 lock_move_rec_list_end(new_block, block, move_limit);
3112
3113 btr_search_move_or_delete_hash_entries(
3114 new_block, block);
3115
3116 /* Delete the records from the source page. */
3117
3118 page_delete_rec_list_end(move_limit, block,
3119 cursor->index,
3120 ULINT_UNDEFINED,
3121 ULINT_UNDEFINED, mtr);
3122 }
3123
3124 left_block = block;
3125 right_block = new_block;
3126
3127 if (!dict_table_is_locking_disabled(cursor->index->table)) {
3128 lock_update_split_right(right_block, left_block);
3129 }
3130 }
3131
3132 #ifdef UNIV_ZIP_DEBUG
3133 if (page_zip) {
3134 ut_a(page_zip_validate(page_zip, page, cursor->index));
3135 ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
3136 }
3137 #endif /* UNIV_ZIP_DEBUG */
3138
3139 /* At this point, split_rec, move_limit and first_rec may point
3140 to garbage on the old page. */
3141
3142 /* 6. The split and the tree modification is now completed. Decide the
3143 page where the tuple should be inserted */
3144 rec_t* rec;
3145 buf_block_t* const insert_block = insert_left
3146 ? left_block : right_block;
3147
3148 if (UNIV_UNLIKELY(!tuple)) {
3149 rec = NULL;
3150 goto func_exit;
3151 }
3152
3153 /* 7. Reposition the cursor for insert and try insertion */
3154 page_cursor = btr_cur_get_page_cur(cursor);
3155
3156 page_cur_search(insert_block, cursor->index, tuple, page_cursor);
3157
3158 rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3159 offsets, heap, n_ext, mtr);
3160
3161 #ifdef UNIV_ZIP_DEBUG
3162 {
3163 page_t* insert_page
3164 = buf_block_get_frame(insert_block);
3165
3166 page_zip_des_t* insert_page_zip
3167 = buf_block_get_page_zip(insert_block);
3168
3169 ut_a(!insert_page_zip
3170 || page_zip_validate(insert_page_zip, insert_page,
3171 cursor->index));
3172 }
3173 #endif /* UNIV_ZIP_DEBUG */
3174
3175 if (rec != NULL) {
3176
3177 goto func_exit;
3178 }
3179
3180 /* 8. If insert did not fit, try page reorganization.
3181 For compressed pages, page_cur_tuple_insert() will have
3182 attempted this already. */
3183
3184 if (page_cur_get_page_zip(page_cursor)
3185 || !btr_page_reorganize(page_cursor, cursor->index, mtr)) {
3186
3187 goto insert_failed;
3188 }
3189
3190 rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
3191 offsets, heap, n_ext, mtr);
3192
3193 if (rec == NULL) {
3194 /* The insert did not fit on the page: loop back to the
3195 start of the function for a new split */
3196 insert_failed:
3197 /* We play safe and reset the free bits for new_page */
3198 if (!dict_index_is_clust(cursor->index)
3199 && !cursor->index->table->is_temporary()) {
3200 ibuf_reset_free_bits(new_block);
3201 ibuf_reset_free_bits(block);
3202 }
3203
3204 n_iterations++;
3205 ut_ad(n_iterations < 2
3206 || buf_block_get_page_zip(insert_block));
3207 ut_ad(!insert_will_fit);
3208
3209 goto func_start;
3210 }
3211
3212 func_exit:
3213 /* Insert fit on the page: update the free bits for the
3214 left and right pages in the same mtr */
3215
3216 if (!dict_index_is_clust(cursor->index)
3217 && !cursor->index->table->is_temporary()
3218 && page_is_leaf(page)) {
3219
3220 ibuf_update_free_bits_for_two_pages_low(
3221 left_block, right_block, mtr);
3222 }
3223
3224 MONITOR_INC(MONITOR_INDEX_SPLIT);
3225
3226 ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
3227 ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
3228
3229 ut_ad(tuple || !rec);
3230 ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
3231 return(rec);
3232 }
3233
3234 /** Remove a page from the level list of pages.
3235 @param[in] space space where removed
3236 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
3237 @param[in,out] page page to remove
3238 @param[in] index index tree
3239 @param[in,out] mtr mini-transaction */
3240 dberr_t
btr_level_list_remove_func(ulint space,ulint zip_size,page_t * page,dict_index_t * index,mtr_t * mtr)3241 btr_level_list_remove_func(
3242 ulint space,
3243 ulint zip_size,
3244 page_t* page,
3245 dict_index_t* index,
3246 mtr_t* mtr)
3247 {
3248 ut_ad(page != NULL);
3249 ut_ad(mtr != NULL);
3250 ut_ad(mtr_memo_contains_page(mtr, page, MTR_MEMO_PAGE_X_FIX));
3251 ut_ad(space == page_get_space_id(page));
3252 /* Get the previous and next page numbers of page */
3253
3254 const uint32_t prev_page_no = btr_page_get_prev(page);
3255 const uint32_t next_page_no = btr_page_get_next(page);
3256
3257 /* Update page links of the level */
3258
3259 if (prev_page_no != FIL_NULL) {
3260 buf_block_t* prev_block
3261 = btr_block_get(page_id_t(space, prev_page_no),
3262 zip_size, RW_X_LATCH, index, mtr);
3263
3264 page_t* prev_page
3265 = buf_block_get_frame(prev_block);
3266 #ifdef UNIV_BTR_DEBUG
3267 ut_a(page_is_comp(prev_page) == page_is_comp(page));
3268 ut_a(!memcmp(prev_page + FIL_PAGE_NEXT, page + FIL_PAGE_OFFSET,
3269 4));
3270 #endif /* UNIV_BTR_DEBUG */
3271
3272 btr_page_set_next(prev_page,
3273 buf_block_get_page_zip(prev_block),
3274 next_page_no, mtr);
3275 }
3276
3277 if (next_page_no != FIL_NULL) {
3278 buf_block_t* next_block
3279 = btr_block_get(
3280 page_id_t(space, next_page_no), zip_size,
3281 RW_X_LATCH, index, mtr);
3282
3283 if (!next_block) {
3284 return DB_ERROR;
3285 }
3286
3287 page_t* next_page
3288 = buf_block_get_frame(next_block);
3289 #ifdef UNIV_BTR_DEBUG
3290 ut_a(page_is_comp(next_page) == page_is_comp(page));
3291 ut_a(!memcmp(next_page + FIL_PAGE_PREV, page + FIL_PAGE_OFFSET,
3292 4));
3293 #endif /* UNIV_BTR_DEBUG */
3294
3295 btr_page_set_prev(next_page,
3296 buf_block_get_page_zip(next_block),
3297 prev_page_no, mtr);
3298 }
3299
3300 return DB_SUCCESS;
3301 }
3302
3303 /****************************************************************//**
3304 Writes the redo log record for setting an index record as the predefined
3305 minimum record. */
3306 UNIV_INLINE
3307 void
btr_set_min_rec_mark_log(rec_t * rec,mlog_id_t type,mtr_t * mtr)3308 btr_set_min_rec_mark_log(
3309 /*=====================*/
3310 rec_t* rec, /*!< in: record */
3311 mlog_id_t type, /*!< in: MLOG_COMP_REC_MIN_MARK or
3312 MLOG_REC_MIN_MARK */
3313 mtr_t* mtr) /*!< in: mtr */
3314 {
3315 mlog_write_initial_log_record(rec, type, mtr);
3316
3317 /* Write rec offset as a 2-byte ulint */
3318 mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
3319 }
3320
3321 /****************************************************************//**
3322 Parses the redo log record for setting an index record as the predefined
3323 minimum record.
3324 @return end of log record or NULL */
3325 byte*
btr_parse_set_min_rec_mark(byte * ptr,byte * end_ptr,ulint comp,page_t * page,mtr_t * mtr)3326 btr_parse_set_min_rec_mark(
3327 /*=======================*/
3328 byte* ptr, /*!< in: buffer */
3329 byte* end_ptr,/*!< in: buffer end */
3330 ulint comp, /*!< in: nonzero=compact page format */
3331 page_t* page, /*!< in: page or NULL */
3332 mtr_t* mtr) /*!< in: mtr or NULL */
3333 {
3334 rec_t* rec;
3335
3336 if (end_ptr < ptr + 2) {
3337
3338 return(NULL);
3339 }
3340
3341 if (page) {
3342 ut_a(!page_is_comp(page) == !comp);
3343
3344 rec = page + mach_read_from_2(ptr);
3345
3346 btr_set_min_rec_mark(rec, mtr);
3347 }
3348
3349 return(ptr + 2);
3350 }
3351
3352 /** Sets a record as the predefined minimum record. */
btr_set_min_rec_mark(rec_t * rec,mtr_t * mtr)3353 void btr_set_min_rec_mark(rec_t* rec, mtr_t* mtr)
3354 {
3355 const bool comp = page_rec_is_comp(rec);
3356
3357 ut_ad(rec == page_rec_get_next_const(page_get_infimum_rec(
3358 page_align(rec))));
3359 ut_ad(!(rec_get_info_bits(page_rec_get_next(rec), comp)
3360 & REC_INFO_MIN_REC_FLAG));
3361
3362 size_t info_bits = rec_get_info_bits(rec, comp);
3363 if (comp) {
3364 rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3365
3366 btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
3367 } else {
3368 rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);
3369
3370 btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
3371 }
3372 }
3373
3374 /*************************************************************//**
3375 If page is the only on its level, this function moves its records to the
3376 father page, thus reducing the tree height.
3377 @return father block */
3378 UNIV_INTERN
3379 buf_block_t*
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3380 btr_lift_page_up(
3381 /*=============*/
3382 dict_index_t* index, /*!< in: index tree */
3383 buf_block_t* block, /*!< in: page which is the only on its level;
3384 must not be empty: use
3385 btr_discard_only_page_on_level if the last
3386 record from the page should be removed */
3387 mtr_t* mtr) /*!< in: mtr */
3388 {
3389 buf_block_t* father_block;
3390 page_t* father_page;
3391 ulint page_level;
3392 page_zip_des_t* father_page_zip;
3393 page_t* page = buf_block_get_frame(block);
3394 ulint root_page_no;
3395 buf_block_t* blocks[BTR_MAX_LEVELS];
3396 ulint n_blocks; /*!< last used index in blocks[] */
3397 ulint i;
3398 bool lift_father_up;
3399 buf_block_t* block_orig = block;
3400
3401 ut_ad(!page_has_siblings(page));
3402 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3403
3404 page_level = btr_page_get_level(page);
3405 root_page_no = dict_index_get_page(index);
3406
3407 {
3408 btr_cur_t cursor;
3409 rec_offs* offsets = NULL;
3410 mem_heap_t* heap = mem_heap_create(
3411 sizeof(*offsets)
3412 * (REC_OFFS_HEADER_SIZE + 1 + 1
3413 + unsigned(index->n_fields)));
3414 buf_block_t* b;
3415
3416 if (dict_index_is_spatial(index)) {
3417 offsets = rtr_page_get_father_block(
3418 NULL, heap, index, block, mtr,
3419 NULL, &cursor);
3420 } else {
3421 offsets = btr_page_get_father_block(offsets, heap,
3422 index, block,
3423 mtr, &cursor);
3424 }
3425 father_block = btr_cur_get_block(&cursor);
3426 father_page_zip = buf_block_get_page_zip(father_block);
3427 father_page = buf_block_get_frame(father_block);
3428
3429 n_blocks = 0;
3430
3431 /* Store all ancestor pages so we can reset their
3432 levels later on. We have to do all the searches on
3433 the tree now because later on, after we've replaced
3434 the first level, the tree is in an inconsistent state
3435 and can not be searched. */
3436 for (b = father_block;
3437 b->page.id.page_no() != root_page_no; ) {
3438 ut_a(n_blocks < BTR_MAX_LEVELS);
3439
3440 if (dict_index_is_spatial(index)) {
3441 offsets = rtr_page_get_father_block(
3442 NULL, heap, index, b, mtr,
3443 NULL, &cursor);
3444 } else {
3445 offsets = btr_page_get_father_block(offsets,
3446 heap,
3447 index, b,
3448 mtr,
3449 &cursor);
3450 }
3451
3452 blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
3453 }
3454
3455 lift_father_up = (n_blocks && page_level == 0);
3456 if (lift_father_up) {
3457 /* The father page also should be the only on its level (not
3458 root). We should lift up the father page at first.
3459 Because the leaf page should be lifted up only for root page.
3460 The freeing page is based on page_level (==0 or !=0)
3461 to choose segment. If the page_level is changed ==0 from !=0,
3462 later freeing of the page doesn't find the page allocation
3463 to be freed.*/
3464
3465 block = father_block;
3466 page = buf_block_get_frame(block);
3467 page_level = btr_page_get_level(page);
3468
3469 ut_ad(!page_has_siblings(page));
3470 ut_ad(mtr_memo_contains(
3471 mtr, block, MTR_MEMO_PAGE_X_FIX));
3472
3473 father_block = blocks[0];
3474 father_page_zip = buf_block_get_page_zip(father_block);
3475 father_page = buf_block_get_frame(father_block);
3476 }
3477
3478 mem_heap_free(heap);
3479 }
3480
3481 btr_search_drop_page_hash_index(block);
3482
3483 /* Make the father empty */
3484 btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
3485 /* btr_page_empty() is supposed to zero-initialize the field. */
3486 ut_ad(!page_get_instant(father_block->frame));
3487
3488 if (index->is_instant()
3489 && father_block->page.id.page_no() == root_page_no) {
3490 ut_ad(!father_page_zip);
3491 btr_set_instant(father_block, *index, mtr);
3492 }
3493
3494 page_level++;
3495
3496 /* Copy the records to the father page one by one. */
3497 if (0
3498 #ifdef UNIV_ZIP_COPY
3499 || father_page_zip
3500 #endif /* UNIV_ZIP_COPY */
3501 || !page_copy_rec_list_end(father_block, block,
3502 page_get_infimum_rec(page),
3503 index, mtr)) {
3504 const page_zip_des_t* page_zip
3505 = buf_block_get_page_zip(block);
3506 ut_a(father_page_zip);
3507 ut_a(page_zip);
3508
3509 /* Copy the page byte for byte. */
3510 page_zip_copy_recs(father_page_zip, father_page,
3511 page_zip, page, index, mtr);
3512
3513 /* Update the lock table and possible hash index. */
3514
3515 lock_move_rec_list_end(father_block, block,
3516 page_get_infimum_rec(page));
3517
3518 /* Also update the predicate locks */
3519 if (dict_index_is_spatial(index)) {
3520 lock_prdt_rec_move(father_block, block);
3521 } else {
3522 btr_search_move_or_delete_hash_entries(
3523 father_block, block);
3524 }
3525 }
3526
3527 if (!dict_table_is_locking_disabled(index->table)) {
3528 /* Free predicate page locks on the block */
3529 if (dict_index_is_spatial(index)) {
3530 lock_mutex_enter();
3531 lock_prdt_page_free_from_discard(
3532 block, lock_sys.prdt_page_hash);
3533 lock_mutex_exit();
3534 }
3535 lock_update_copy_and_discard(father_block, block);
3536 }
3537
3538 /* Go upward to root page, decrementing levels by one. */
3539 for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
3540 page_t* page = buf_block_get_frame(blocks[i]);
3541 page_zip_des_t* page_zip= buf_block_get_page_zip(blocks[i]);
3542
3543 ut_ad(btr_page_get_level(page) == page_level + 1);
3544
3545 btr_page_set_level(page, page_zip, page_level, mtr);
3546 #ifdef UNIV_ZIP_DEBUG
3547 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
3548 #endif /* UNIV_ZIP_DEBUG */
3549 }
3550
3551 if (dict_index_is_spatial(index)) {
3552 rtr_check_discard_page(index, NULL, block);
3553 }
3554
3555 /* Free the file page */
3556 btr_page_free(index, block, mtr);
3557
3558 /* We play it safe and reset the free bits for the father */
3559 if (!dict_index_is_clust(index)
3560 && !index->table->is_temporary()) {
3561 ibuf_reset_free_bits(father_block);
3562 }
3563 ut_ad(page_validate(father_page, index));
3564 ut_ad(btr_check_node_ptr(index, father_block, mtr));
3565
3566 return(lift_father_up ? block_orig : father_block);
3567 }
3568
3569 /*************************************************************//**
3570 Tries to merge the page first to the left immediate brother if such a
3571 brother exists, and the node pointers to the current page and to the brother
3572 reside on the same page. If the left brother does not satisfy these
3573 conditions, looks at the right brother. If the page is the only one on that
3574 level lifts the records of the page to the father page, thus reducing the
3575 tree height. It is assumed that mtr holds an x-latch on the tree and on the
3576 page. If cursor is on the leaf level, mtr must also hold x-latches to the
3577 brothers, if they exist.
3578 @return TRUE on success */
3579 ibool
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)3580 btr_compress(
3581 /*=========*/
3582 btr_cur_t* cursor, /*!< in/out: cursor on the page to merge
3583 or lift; the page must not be empty:
3584 when deleting records, use btr_discard_page()
3585 if the page would become empty */
3586 ibool adjust, /*!< in: TRUE if should adjust the
3587 cursor position even if compression occurs */
3588 mtr_t* mtr) /*!< in/out: mini-transaction */
3589 {
3590 dict_index_t* index;
3591 ulint left_page_no;
3592 ulint right_page_no;
3593 buf_block_t* merge_block;
3594 page_t* merge_page = NULL;
3595 page_zip_des_t* merge_page_zip;
3596 ibool is_left;
3597 buf_block_t* block;
3598 page_t* page;
3599 btr_cur_t father_cursor;
3600 mem_heap_t* heap;
3601 rec_offs* offsets;
3602 ulint nth_rec = 0; /* remove bogus warning */
3603 bool mbr_changed = false;
3604 #ifdef UNIV_DEBUG
3605 bool leftmost_child;
3606 #endif
3607 DBUG_ENTER("btr_compress");
3608
3609 block = btr_cur_get_block(cursor);
3610 page = btr_cur_get_page(cursor);
3611 index = btr_cur_get_index(cursor);
3612
3613 btr_assert_not_corrupted(block, index);
3614
3615 #ifdef UNIV_DEBUG
3616 if (dict_index_is_spatial(index)) {
3617 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3618 MTR_MEMO_X_LOCK));
3619 } else {
3620 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3621 MTR_MEMO_X_LOCK
3622 | MTR_MEMO_SX_LOCK));
3623 }
3624 #endif /* UNIV_DEBUG */
3625
3626 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
3627
3628 const ulint zip_size = index->table->space->zip_size();
3629
3630 MONITOR_INC(MONITOR_INDEX_MERGE_ATTEMPTS);
3631
3632 left_page_no = btr_page_get_prev(page);
3633 right_page_no = btr_page_get_next(page);
3634
3635 #ifdef UNIV_DEBUG
3636 if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3637 ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3638 page_rec_get_next(page_get_infimum_rec(page)),
3639 page_is_comp(page)));
3640 }
3641 #endif /* UNIV_DEBUG */
3642
3643 heap = mem_heap_create(100);
3644
3645 if (dict_index_is_spatial(index)) {
3646 offsets = rtr_page_get_father_block(
3647 NULL, heap, index, block, mtr, cursor, &father_cursor);
3648 ut_ad(cursor->page_cur.block->page.id.page_no()
3649 == block->page.id.page_no());
3650 rec_t* my_rec = father_cursor.page_cur.rec;
3651
3652 ulint page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3653
3654 if (page_no != block->page.id.page_no()) {
3655 ib::info() << "father positioned on page "
3656 << page_no << "instead of "
3657 << block->page.id.page_no();
3658 offsets = btr_page_get_father_block(
3659 NULL, heap, index, block, mtr, &father_cursor);
3660 }
3661 } else {
3662 offsets = btr_page_get_father_block(
3663 NULL, heap, index, block, mtr, &father_cursor);
3664 }
3665
3666 if (adjust) {
3667 nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3668 ut_ad(nth_rec > 0);
3669 }
3670
3671 if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3672 /* The page is the only one on the level, lift the records
3673 to the father */
3674
3675 merge_block = btr_lift_page_up(index, block, mtr);
3676 goto func_exit;
3677 }
3678
3679 ut_d(leftmost_child =
3680 left_page_no != FIL_NULL
3681 && (page_rec_get_next(
3682 page_get_infimum_rec(
3683 btr_cur_get_page(&father_cursor)))
3684 == btr_cur_get_rec(&father_cursor)));
3685
3686 /* Decide the page to which we try to merge and which will inherit
3687 the locks */
3688
3689 is_left = btr_can_merge_with_page(cursor, left_page_no,
3690 &merge_block, mtr);
3691
3692 DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3693 retry:
3694 if (!is_left
3695 && !btr_can_merge_with_page(cursor, right_page_no, &merge_block,
3696 mtr)) {
3697 if (!merge_block) {
3698 merge_page = NULL;
3699 }
3700 goto err_exit;
3701 }
3702
3703 merge_page = buf_block_get_frame(merge_block);
3704
3705 #ifdef UNIV_BTR_DEBUG
3706 if (is_left) {
3707 ut_a(btr_page_get_next(merge_page)
3708 == block->page.id.page_no());
3709 } else {
3710 ut_a(btr_page_get_prev(merge_page)
3711 == block->page.id.page_no());
3712 }
3713 #endif /* UNIV_BTR_DEBUG */
3714
3715 #ifdef UNIV_GIS_DEBUG
3716 if (dict_index_is_spatial(index)) {
3717 if (is_left) {
3718 fprintf(stderr, "GIS_DIAG: merge left %ld to %ld \n",
3719 (long) block->page.id.page_no(), left_page_no);
3720 } else {
3721 fprintf(stderr, "GIS_DIAG: merge right %ld to %ld\n",
3722 (long) block->page.id.page_no(), right_page_no);
3723 }
3724 }
3725 #endif /* UNIV_GIS_DEBUG */
3726
3727 ut_ad(page_validate(merge_page, index));
3728
3729 merge_page_zip = buf_block_get_page_zip(merge_block);
3730 #ifdef UNIV_ZIP_DEBUG
3731 if (merge_page_zip) {
3732 const page_zip_des_t* page_zip
3733 = buf_block_get_page_zip(block);
3734 ut_a(page_zip);
3735 ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3736 ut_a(page_zip_validate(page_zip, page, index));
3737 }
3738 #endif /* UNIV_ZIP_DEBUG */
3739
3740 /* Move records to the merge page */
3741 if (is_left) {
3742 btr_cur_t cursor2;
3743 rtr_mbr_t new_mbr;
3744 rec_offs* offsets2 = NULL;
3745
3746 /* For rtree, we need to update father's mbr. */
3747 if (index->is_spatial()) {
3748 /* We only support merge pages with the same parent
3749 page */
3750 if (!rtr_check_same_block(
3751 index, &cursor2,
3752 btr_cur_get_block(&father_cursor),
3753 merge_block, heap)) {
3754 is_left = false;
3755 goto retry;
3756 }
3757
3758 /* Set rtr_info for cursor2, since it is
3759 necessary in recursive page merge. */
3760 cursor2.rtr_info = cursor->rtr_info;
3761 cursor2.tree_height = cursor->tree_height;
3762
3763 offsets2 = rec_get_offsets(
3764 btr_cur_get_rec(&cursor2), index, NULL,
3765 page_is_leaf(cursor2.page_cur.block->frame)
3766 ? index->n_fields : 0,
3767 ULINT_UNDEFINED, &heap);
3768
3769 /* Check if parent entry needs to be updated */
3770 mbr_changed = rtr_merge_mbr_changed(
3771 &cursor2, &father_cursor,
3772 offsets2, offsets, &new_mbr);
3773 }
3774
3775 rec_t* orig_pred = page_copy_rec_list_start(
3776 merge_block, block, page_get_supremum_rec(page),
3777 index, mtr);
3778
3779 if (!orig_pred) {
3780 goto err_exit;
3781 }
3782
3783 btr_search_drop_page_hash_index(block);
3784
3785 /* Remove the page from the level list */
3786 if (DB_SUCCESS != btr_level_list_remove(index->table->space_id,
3787 zip_size, page, index,
3788 mtr)) {
3789 goto err_exit;
3790 }
3791
3792 if (dict_index_is_spatial(index)) {
3793 rec_t* my_rec = father_cursor.page_cur.rec;
3794
3795 ulint page_no = btr_node_ptr_get_child_page_no(
3796 my_rec, offsets);
3797
3798 if (page_no != block->page.id.page_no()) {
3799
3800 ib::fatal() << "father positioned on "
3801 << page_no << " instead of "
3802 << block->page.id.page_no();
3803
3804 ut_ad(0);
3805 }
3806
3807 if (mbr_changed) {
3808 #ifdef UNIV_DEBUG
3809 bool success = rtr_update_mbr_field(
3810 &cursor2, offsets2, &father_cursor,
3811 merge_page, &new_mbr, NULL, mtr);
3812
3813 ut_ad(success);
3814 #else
3815 rtr_update_mbr_field(
3816 &cursor2, offsets2, &father_cursor,
3817 merge_page, &new_mbr, NULL, mtr);
3818 #endif
3819 } else {
3820 rtr_node_ptr_delete(&father_cursor, mtr);
3821 }
3822
3823 /* No GAP lock needs to be worrying about */
3824 lock_mutex_enter();
3825 lock_prdt_page_free_from_discard(
3826 block, lock_sys.prdt_page_hash);
3827 lock_rec_free_all_from_discard_page(block);
3828 lock_mutex_exit();
3829 } else {
3830 btr_cur_node_ptr_delete(&father_cursor, mtr);
3831 if (!dict_table_is_locking_disabled(index->table)) {
3832 lock_update_merge_left(
3833 merge_block, orig_pred, block);
3834 }
3835 }
3836
3837 if (adjust) {
3838 nth_rec += page_rec_get_n_recs_before(orig_pred);
3839 }
3840 } else {
3841 rec_t* orig_succ;
3842 ibool compressed;
3843 dberr_t err;
3844 btr_cur_t cursor2;
3845 /* father cursor pointing to node ptr
3846 of the right sibling */
3847 #ifdef UNIV_BTR_DEBUG
3848 byte fil_page_prev[4];
3849 #endif /* UNIV_BTR_DEBUG */
3850
3851 if (dict_index_is_spatial(index)) {
3852 cursor2.rtr_info = NULL;
3853
3854 /* For spatial index, we disallow merge of blocks
3855 with different parents, since the merge would need
3856 to update entry (for MBR and Primary key) in the
3857 parent of block being merged */
3858 if (!rtr_check_same_block(
3859 index, &cursor2,
3860 btr_cur_get_block(&father_cursor),
3861 merge_block, heap)) {
3862 goto err_exit;
3863 }
3864
3865 /* Set rtr_info for cursor2, since it is
3866 necessary in recursive page merge. */
3867 cursor2.rtr_info = cursor->rtr_info;
3868 cursor2.tree_height = cursor->tree_height;
3869 } else {
3870 btr_page_get_father(index, merge_block, mtr, &cursor2);
3871 }
3872
3873 if (merge_page_zip && left_page_no == FIL_NULL) {
3874
3875 /* The function page_zip_compress(), which will be
3876 invoked by page_copy_rec_list_end() below,
3877 requires that FIL_PAGE_PREV be FIL_NULL.
3878 Clear the field, but prepare to restore it. */
3879 #ifdef UNIV_BTR_DEBUG
3880 memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3881 #endif /* UNIV_BTR_DEBUG */
3882 compile_time_assert(FIL_NULL == 0xffffffffU);
3883 memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
3884 }
3885
3886 orig_succ = page_copy_rec_list_end(merge_block, block,
3887 page_get_infimum_rec(page),
3888 cursor->index, mtr);
3889
3890 if (!orig_succ) {
3891 ut_a(merge_page_zip);
3892 #ifdef UNIV_BTR_DEBUG
3893 if (left_page_no == FIL_NULL) {
3894 /* FIL_PAGE_PREV was restored from
3895 merge_page_zip. */
3896 ut_a(!memcmp(fil_page_prev,
3897 merge_page + FIL_PAGE_PREV, 4));
3898 }
3899 #endif /* UNIV_BTR_DEBUG */
3900 goto err_exit;
3901 }
3902
3903 btr_search_drop_page_hash_index(block);
3904
3905 #ifdef UNIV_BTR_DEBUG
3906 if (merge_page_zip && left_page_no == FIL_NULL) {
3907
3908 /* Restore FIL_PAGE_PREV in order to avoid an assertion
3909 failure in btr_level_list_remove(), which will set
3910 the field again to FIL_NULL. Even though this makes
3911 merge_page and merge_page_zip inconsistent for a
3912 split second, it is harmless, because the pages
3913 are X-latched. */
3914 memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3915 }
3916 #endif /* UNIV_BTR_DEBUG */
3917
3918 /* Remove the page from the level list */
3919 if (DB_SUCCESS != btr_level_list_remove(index->table->space_id,
3920 zip_size, page, index,
3921 mtr)) {
3922 goto err_exit;
3923 }
3924
3925 ut_ad(btr_node_ptr_get_child_page_no(
3926 btr_cur_get_rec(&father_cursor), offsets)
3927 == block->page.id.page_no());
3928
3929 /* Replace the address of the old child node (= page) with the
3930 address of the merge page to the right */
3931 btr_node_ptr_set_child_page_no(
3932 btr_cur_get_rec(&father_cursor),
3933 btr_cur_get_page_zip(&father_cursor),
3934 offsets, right_page_no, mtr);
3935
3936 #ifdef UNIV_DEBUG
3937 if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3938 ut_ad(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
3939 page_rec_get_next(page_get_infimum_rec(
3940 buf_block_get_frame(merge_block))),
3941 page_is_comp(page)));
3942 }
3943 #endif /* UNIV_DEBUG */
3944
3945 /* For rtree, we need to update father's mbr. */
3946 if (index->is_spatial()) {
3947 rec_offs* offsets2;
3948 ulint rec_info;
3949
3950 offsets2 = rec_get_offsets(
3951 btr_cur_get_rec(&cursor2), index, NULL,
3952 page_is_leaf(cursor2.page_cur.block->frame)
3953 ? index->n_fields : 0,
3954 ULINT_UNDEFINED, &heap);
3955
3956 ut_ad(btr_node_ptr_get_child_page_no(
3957 btr_cur_get_rec(&cursor2), offsets2)
3958 == right_page_no);
3959
3960 rec_info = rec_get_info_bits(
3961 btr_cur_get_rec(&father_cursor),
3962 rec_offs_comp(offsets));
3963 if (rec_info & REC_INFO_MIN_REC_FLAG) {
3964 /* When the father node ptr is minimal rec,
3965 we will keep it and delete the node ptr of
3966 merge page. */
3967 rtr_merge_and_update_mbr(&father_cursor,
3968 &cursor2,
3969 offsets, offsets2,
3970 merge_page, mtr);
3971 } else {
3972 /* Otherwise, we will keep the node ptr of
3973 merge page and delete the father node ptr.
3974 This is for keeping the rec order in upper
3975 level. */
3976 rtr_merge_and_update_mbr(&cursor2,
3977 &father_cursor,
3978 offsets2, offsets,
3979 merge_page, mtr);
3980 }
3981 lock_mutex_enter();
3982 lock_prdt_page_free_from_discard(
3983 block, lock_sys.prdt_page_hash);
3984 lock_rec_free_all_from_discard_page(block);
3985 lock_mutex_exit();
3986 } else {
3987
3988 compressed = btr_cur_pessimistic_delete(&err, TRUE,
3989 &cursor2,
3990 BTR_CREATE_FLAG,
3991 false, mtr);
3992 ut_a(err == DB_SUCCESS);
3993
3994 if (!compressed) {
3995 btr_cur_compress_if_useful(&cursor2,
3996 FALSE,
3997 mtr);
3998 }
3999
4000 if (!dict_table_is_locking_disabled(index->table)) {
4001 lock_update_merge_right(
4002 merge_block, orig_succ, block);
4003 }
4004 }
4005 }
4006
4007 if (!dict_index_is_clust(index)
4008 && !index->table->is_temporary()
4009 && page_is_leaf(merge_page)) {
4010 /* Update the free bits of the B-tree page in the
4011 insert buffer bitmap. This has to be done in a
4012 separate mini-transaction that is committed before the
4013 main mini-transaction. We cannot update the insert
4014 buffer bitmap in this mini-transaction, because
4015 btr_compress() can be invoked recursively without
4016 committing the mini-transaction in between. Since
4017 insert buffer bitmap pages have a lower rank than
4018 B-tree pages, we must not access other pages in the
4019 same mini-transaction after accessing an insert buffer
4020 bitmap page. */
4021
4022 /* The free bits in the insert buffer bitmap must
4023 never exceed the free space on a page. It is safe to
4024 decrement or reset the bits in the bitmap in a
4025 mini-transaction that is committed before the
4026 mini-transaction that affects the free space. */
4027
4028 /* It is unsafe to increment the bits in a separately
4029 committed mini-transaction, because in crash recovery,
4030 the free bits could momentarily be set too high. */
4031
4032 if (zip_size) {
4033 /* Because the free bits may be incremented
4034 and we cannot update the insert buffer bitmap
4035 in the same mini-transaction, the only safe
4036 thing we can do here is the pessimistic
4037 approach: reset the free bits. */
4038 ibuf_reset_free_bits(merge_block);
4039 } else {
4040 /* On uncompressed pages, the free bits will
4041 never increase here. Thus, it is safe to
4042 write the bits accurately in a separate
4043 mini-transaction. */
4044 ibuf_update_free_bits_if_full(merge_block,
4045 srv_page_size,
4046 ULINT_UNDEFINED);
4047 }
4048 }
4049
4050 ut_ad(page_validate(merge_page, index));
4051 #ifdef UNIV_ZIP_DEBUG
4052 ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page,
4053 index));
4054 #endif /* UNIV_ZIP_DEBUG */
4055
4056 if (dict_index_is_spatial(index)) {
4057 #ifdef UNIV_GIS_DEBUG
4058 fprintf(stderr, "GIS_DIAG: compressed away %ld\n",
4059 (long) block->page.id.page_no());
4060 fprintf(stderr, "GIS_DIAG: merged to %ld\n",
4061 (long) merge_block->page.id.page_no());
4062 #endif
4063
4064 rtr_check_discard_page(index, NULL, block);
4065 }
4066
4067 /* Free the file page */
4068 btr_page_free(index, block, mtr);
4069
4070 /* btr_check_node_ptr() needs parent block latched.
4071 If the merge_block's parent block is not same,
4072 we cannot use btr_check_node_ptr() */
4073 ut_ad(leftmost_child
4074 || btr_check_node_ptr(index, merge_block, mtr));
4075 func_exit:
4076 mem_heap_free(heap);
4077
4078 if (adjust) {
4079 ut_ad(nth_rec > 0);
4080 btr_cur_position(
4081 index,
4082 page_rec_get_nth(merge_block->frame, nth_rec),
4083 merge_block, cursor);
4084 }
4085
4086 MONITOR_INC(MONITOR_INDEX_MERGE_SUCCESSFUL);
4087
4088 DBUG_RETURN(TRUE);
4089
4090 err_exit:
4091 /* We play it safe and reset the free bits. */
4092 if (zip_size
4093 && merge_page
4094 && page_is_leaf(merge_page)
4095 && !dict_index_is_clust(index)) {
4096
4097 ibuf_reset_free_bits(merge_block);
4098 }
4099
4100 mem_heap_free(heap);
4101 DBUG_RETURN(FALSE);
4102 }
4103
4104 /*************************************************************//**
4105 Discards a page that is the only page on its level. This will empty
4106 the whole B-tree, leaving just an empty root page. This function
4107 should almost never be reached, because btr_compress(), which is invoked in
4108 delete operations, calls btr_lift_page_up() to flatten the B-tree. */
4109 ATTRIBUTE_COLD
4110 static
4111 void
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4112 btr_discard_only_page_on_level(
4113 /*===========================*/
4114 dict_index_t* index, /*!< in: index tree */
4115 buf_block_t* block, /*!< in: page which is the only on its level */
4116 mtr_t* mtr) /*!< in: mtr */
4117 {
4118 ulint page_level = 0;
4119
4120 ut_ad(!index->is_dummy);
4121
4122 /* Save the PAGE_MAX_TRX_ID from the leaf page. */
4123 const trx_id_t max_trx_id = page_get_max_trx_id(block->frame);
4124 const rec_t* r = page_rec_get_next(page_get_infimum_rec(block->frame));
4125 ut_ad(rec_is_metadata(r, *index) == index->is_instant());
4126
4127 while (block->page.id.page_no() != dict_index_get_page(index)) {
4128 btr_cur_t cursor;
4129 buf_block_t* father;
4130 const page_t* page = buf_block_get_frame(block);
4131
4132 ut_a(page_get_n_recs(page) == 1);
4133 ut_a(page_level == btr_page_get_level(page));
4134 ut_a(!page_has_siblings(page));
4135 ut_ad(fil_page_index_page_check(page));
4136 ut_ad(block->page.id.space() == index->table->space->id);
4137 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4138 btr_search_drop_page_hash_index(block);
4139
4140 if (dict_index_is_spatial(index)) {
4141 /* Check any concurrent search having this page */
4142 rtr_check_discard_page(index, NULL, block);
4143 rtr_page_get_father(index, block, mtr, NULL, &cursor);
4144 } else {
4145 btr_page_get_father(index, block, mtr, &cursor);
4146 }
4147 father = btr_cur_get_block(&cursor);
4148
4149 if (!dict_table_is_locking_disabled(index->table)) {
4150 lock_update_discard(
4151 father, PAGE_HEAP_NO_SUPREMUM, block);
4152 }
4153
4154 /* Free the file page */
4155 btr_page_free(index, block, mtr);
4156
4157 block = father;
4158 page_level++;
4159 }
4160
4161 /* block is the root page, which must be empty, except
4162 for the node pointer to the (now discarded) block(s). */
4163 ut_ad(!page_has_siblings(block->frame));
4164
4165 #ifdef UNIV_BTR_DEBUG
4166 if (!dict_index_is_ibuf(index)) {
4167 const page_t* root = buf_block_get_frame(block);
4168 const ulint space = index->table->space_id;
4169 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF
4170 + root, space));
4171 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP
4172 + root, space));
4173 }
4174 #endif /* UNIV_BTR_DEBUG */
4175
4176 mem_heap_t* heap = nullptr;
4177 const rec_t* rec = nullptr;
4178 rec_offs* offsets = nullptr;
4179 if (index->table->instant) {
4180 if (rec_is_alter_metadata(r, *index)) {
4181 heap = mem_heap_create(srv_page_size);
4182 offsets = rec_get_offsets(r, index, nullptr,
4183 index->n_core_fields,
4184 ULINT_UNDEFINED, &heap);
4185 rec = rec_copy(mem_heap_alloc(heap,
4186 rec_offs_size(offsets)),
4187 r, offsets);
4188 rec_offs_make_valid(rec, index, true, offsets);
4189 }
4190 }
4191
4192 btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
4193 ut_ad(page_is_leaf(buf_block_get_frame(block)));
4194 /* btr_page_empty() is supposed to zero-initialize the field. */
4195 ut_ad(!page_get_instant(block->frame));
4196
4197 if (index->is_primary()) {
4198 if (rec) {
4199 DBUG_ASSERT(index->table->instant);
4200 DBUG_ASSERT(rec_is_alter_metadata(rec, *index));
4201 btr_set_instant(block, *index, mtr);
4202 rec = page_cur_insert_rec_low(
4203 page_get_infimum_rec(block->frame),
4204 index, rec, offsets, mtr);
4205 ut_ad(rec);
4206 mem_heap_free(heap);
4207 } else if (index->is_instant()) {
4208 index->clear_instant_add();
4209 }
4210 } else if (!index->table->is_temporary()) {
4211 /* We play it safe and reset the free bits for the root */
4212 ibuf_reset_free_bits(block);
4213
4214 ut_a(max_trx_id);
4215 page_set_max_trx_id(block,
4216 buf_block_get_page_zip(block),
4217 max_trx_id, mtr);
4218 }
4219 }
4220
4221 /*************************************************************//**
4222 Discards a page from a B-tree. This is used to remove the last record from
4223 a B-tree page: the whole page must be removed at the same time. This cannot
4224 be used for the root page, which is allowed to be empty. */
4225 void
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)4226 btr_discard_page(
4227 /*=============*/
4228 btr_cur_t* cursor, /*!< in: cursor on the page to discard: not on
4229 the root page */
4230 mtr_t* mtr) /*!< in: mtr */
4231 {
4232 dict_index_t* index;
4233 ulint left_page_no;
4234 ulint right_page_no;
4235 buf_block_t* merge_block;
4236 page_t* merge_page;
4237 buf_block_t* block;
4238 page_t* page;
4239 rec_t* node_ptr;
4240 btr_cur_t parent_cursor;
4241
4242 block = btr_cur_get_block(cursor);
4243 index = btr_cur_get_index(cursor);
4244
4245 ut_ad(dict_index_get_page(index) != block->page.id.page_no());
4246
4247 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
4248 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
4249
4250 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4251
4252 MONITOR_INC(MONITOR_INDEX_DISCARD);
4253
4254 if (dict_index_is_spatial(index)) {
4255 rtr_page_get_father(index, block, mtr, cursor, &parent_cursor);
4256 } else {
4257 btr_page_get_father(index, block, mtr, &parent_cursor);
4258 }
4259
4260 /* Decide the page which will inherit the locks */
4261
4262 left_page_no = btr_page_get_prev(buf_block_get_frame(block));
4263 right_page_no = btr_page_get_next(buf_block_get_frame(block));
4264
4265 const ulint zip_size = index->table->space->zip_size();
4266 ut_d(bool parent_is_different = false);
4267 if (left_page_no != FIL_NULL) {
4268 merge_block = btr_block_get(
4269 page_id_t(index->table->space_id, left_page_no),
4270 zip_size, RW_X_LATCH, index, mtr);
4271
4272 merge_page = buf_block_get_frame(merge_block);
4273 #ifdef UNIV_BTR_DEBUG
4274 ut_a(btr_page_get_next(merge_page)
4275 == block->page.id.page_no());
4276 #endif /* UNIV_BTR_DEBUG */
4277 ut_d(parent_is_different =
4278 (page_rec_get_next(
4279 page_get_infimum_rec(
4280 btr_cur_get_page(
4281 &parent_cursor)))
4282 == btr_cur_get_rec(&parent_cursor)));
4283 } else if (right_page_no != FIL_NULL) {
4284 merge_block = btr_block_get(
4285 page_id_t(index->table->space_id, right_page_no),
4286 zip_size, RW_X_LATCH, index, mtr);
4287
4288 merge_page = buf_block_get_frame(merge_block);
4289 #ifdef UNIV_BTR_DEBUG
4290 ut_a(btr_page_get_prev(merge_page)
4291 == block->page.id.page_no());
4292 #endif /* UNIV_BTR_DEBUG */
4293 ut_d(parent_is_different = page_rec_is_supremum(
4294 page_rec_get_next(btr_cur_get_rec(&parent_cursor))));
4295 } else {
4296 btr_discard_only_page_on_level(index, block, mtr);
4297
4298 return;
4299 }
4300
4301 page = buf_block_get_frame(block);
4302 ut_a(page_is_comp(merge_page) == page_is_comp(page));
4303 btr_search_drop_page_hash_index(block);
4304
4305 if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
4306
4307 /* We have to mark the leftmost node pointer on the right
4308 side page as the predefined minimum record */
4309 node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));
4310
4311 ut_ad(page_rec_is_user_rec(node_ptr));
4312
4313 /* This will make page_zip_validate() fail on merge_page
4314 until btr_level_list_remove() completes. This is harmless,
4315 because everything will take place within a single
4316 mini-transaction and because writing to the redo log
4317 is an atomic operation (performed by mtr_commit()). */
4318 btr_set_min_rec_mark(node_ptr, mtr);
4319 }
4320
4321 if (dict_index_is_spatial(index)) {
4322 rtr_node_ptr_delete(&parent_cursor, mtr);
4323 } else {
4324 btr_cur_node_ptr_delete(&parent_cursor, mtr);
4325 }
4326
4327 /* Remove the page from the level list */
4328 ut_a(DB_SUCCESS == btr_level_list_remove(index->table->space_id,
4329 zip_size, page, index, mtr));
4330
4331 #ifdef UNIV_ZIP_DEBUG
4332 {
4333 page_zip_des_t* merge_page_zip
4334 = buf_block_get_page_zip(merge_block);
4335 ut_a(!merge_page_zip
4336 || page_zip_validate(merge_page_zip, merge_page, index));
4337 }
4338 #endif /* UNIV_ZIP_DEBUG */
4339
4340 if (!dict_table_is_locking_disabled(index->table)) {
4341 if (left_page_no != FIL_NULL) {
4342 lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM,
4343 block);
4344 } else {
4345 lock_update_discard(merge_block,
4346 lock_get_min_heap_no(merge_block),
4347 block);
4348 }
4349 }
4350
4351 if (dict_index_is_spatial(index)) {
4352 rtr_check_discard_page(index, cursor, block);
4353 }
4354
4355 /* Free the file page */
4356 btr_page_free(index, block, mtr);
4357
4358 /* btr_check_node_ptr() needs parent block latched.
4359 If the merge_block's parent block is not same,
4360 we cannot use btr_check_node_ptr() */
4361 ut_ad(parent_is_different
4362 || btr_check_node_ptr(index, merge_block, mtr));
4363
4364 if (btr_cur_get_block(&parent_cursor)->page.id.page_no() == index->page
4365 && !page_has_siblings(btr_cur_get_page(&parent_cursor))
4366 && page_get_n_recs(btr_cur_get_page(&parent_cursor)) == 1) {
4367 btr_lift_page_up(index, merge_block, mtr);
4368 }
4369 }
4370
4371 #ifdef UNIV_BTR_PRINT
4372 /*************************************************************//**
4373 Prints size info of a B-tree. */
4374 void
btr_print_size(dict_index_t * index)4375 btr_print_size(
4376 /*===========*/
4377 dict_index_t* index) /*!< in: index tree */
4378 {
4379 page_t* root;
4380 fseg_header_t* seg;
4381 mtr_t mtr;
4382
4383 if (dict_index_is_ibuf(index)) {
4384 fputs("Sorry, cannot print info of an ibuf tree:"
4385 " use ibuf functions\n", stderr);
4386
4387 return;
4388 }
4389
4390 mtr_start(&mtr);
4391
4392 root = btr_root_get(index, &mtr);
4393
4394 seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4395
4396 fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
4397 fseg_print(seg, &mtr);
4398
4399 if (!dict_index_is_ibuf(index)) {
4400
4401 seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
4402
4403 fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
4404 fseg_print(seg, &mtr);
4405 }
4406
4407 mtr_commit(&mtr);
4408 }
4409
4410 /************************************************************//**
4411 Prints recursively index tree pages. */
4412 static
4413 void
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,rec_offs ** offsets,mtr_t * mtr)4414 btr_print_recursive(
4415 /*================*/
4416 dict_index_t* index, /*!< in: index tree */
4417 buf_block_t* block, /*!< in: index page */
4418 ulint width, /*!< in: print this many entries from start
4419 and end */
4420 mem_heap_t** heap, /*!< in/out: heap for rec_get_offsets() */
4421 rec_offs** offsets,/*!< in/out: buffer for rec_get_offsets() */
4422 mtr_t* mtr) /*!< in: mtr */
4423 {
4424 const page_t* page = buf_block_get_frame(block);
4425 page_cur_t cursor;
4426 ulint n_recs;
4427 ulint i = 0;
4428 mtr_t mtr2;
4429
4430 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_SX_FIX));
4431
4432 ib::info() << "NODE ON LEVEL " << btr_page_get_level(page)
4433 << " page " << block->page.id;
4434
4435 page_print(block, index, width, width);
4436
4437 n_recs = page_get_n_recs(page);
4438
4439 page_cur_set_before_first(block, &cursor);
4440 page_cur_move_to_next(&cursor);
4441
4442 while (!page_cur_is_after_last(&cursor)) {
4443
4444 if (page_is_leaf(page)) {
4445
4446 /* If this is the leaf level, do nothing */
4447
4448 } else if ((i <= width) || (i >= n_recs - width)) {
4449
4450 const rec_t* node_ptr;
4451
4452 mtr_start(&mtr2);
4453
4454 node_ptr = page_cur_get_rec(&cursor);
4455
4456 *offsets = rec_get_offsets(
4457 node_ptr, index, *offsets, 0,
4458 ULINT_UNDEFINED, heap);
4459 btr_print_recursive(index,
4460 btr_node_ptr_get_child(node_ptr,
4461 index,
4462 *offsets,
4463 &mtr2),
4464 width, heap, offsets, &mtr2);
4465 mtr_commit(&mtr2);
4466 }
4467
4468 page_cur_move_to_next(&cursor);
4469 i++;
4470 }
4471 }
4472
4473 /**************************************************************//**
4474 Prints directories and other info of all nodes in the tree. */
4475 void
btr_print_index(dict_index_t * index,ulint width)4476 btr_print_index(
4477 /*============*/
4478 dict_index_t* index, /*!< in: index */
4479 ulint width) /*!< in: print this many entries from start
4480 and end */
4481 {
4482 mtr_t mtr;
4483 buf_block_t* root;
4484 mem_heap_t* heap = NULL;
4485 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
4486 rec_offs* offsets = offsets_;
4487 rec_offs_init(offsets_);
4488
4489 fputs("--------------------------\n"
4490 "INDEX TREE PRINT\n", stderr);
4491
4492 mtr_start(&mtr);
4493
4494 root = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4495
4496 btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
4497 if (heap) {
4498 mem_heap_free(heap);
4499 }
4500
4501 mtr_commit(&mtr);
4502
4503 ut_ad(btr_validate_index(index, 0));
4504 }
4505 #endif /* UNIV_BTR_PRINT */
4506
4507 #ifdef UNIV_DEBUG
4508 /************************************************************//**
4509 Checks that the node pointer to a page is appropriate.
4510 @return TRUE */
4511 ibool
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)4512 btr_check_node_ptr(
4513 /*===============*/
4514 dict_index_t* index, /*!< in: index tree */
4515 buf_block_t* block, /*!< in: index page */
4516 mtr_t* mtr) /*!< in: mtr */
4517 {
4518 mem_heap_t* heap;
4519 dtuple_t* tuple;
4520 rec_offs* offsets;
4521 btr_cur_t cursor;
4522 page_t* page = buf_block_get_frame(block);
4523
4524 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
4525
4526 if (dict_index_get_page(index) == block->page.id.page_no()) {
4527
4528 return(TRUE);
4529 }
4530
4531 heap = mem_heap_create(256);
4532
4533 if (dict_index_is_spatial(index)) {
4534 offsets = rtr_page_get_father_block(NULL, heap, index, block, mtr,
4535 NULL, &cursor);
4536 } else {
4537 offsets = btr_page_get_father_block(NULL, heap, index, block, mtr,
4538 &cursor);
4539 }
4540
4541 if (page_is_leaf(page)) {
4542
4543 goto func_exit;
4544 }
4545
4546 tuple = dict_index_build_node_ptr(
4547 index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
4548 btr_page_get_level(page));
4549
4550 /* For spatial index, the MBR in the parent rec could be different
4551 with that of first rec of child, their relationship should be
4552 "WITHIN" relationship */
4553 if (dict_index_is_spatial(index)) {
4554 ut_a(!cmp_dtuple_rec_with_gis(
4555 tuple, btr_cur_get_rec(&cursor),
4556 offsets, PAGE_CUR_WITHIN));
4557 } else {
4558 ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), offsets));
4559 }
4560 func_exit:
4561 mem_heap_free(heap);
4562
4563 return(TRUE);
4564 }
4565 #endif /* UNIV_DEBUG */
4566
4567 /************************************************************//**
4568 Display identification information for a record. */
4569 static
4570 void
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)4571 btr_index_rec_validate_report(
4572 /*==========================*/
4573 const page_t* page, /*!< in: index page */
4574 const rec_t* rec, /*!< in: index record */
4575 const dict_index_t* index) /*!< in: index */
4576 {
4577 ib::info() << "Record in index " << index->name
4578 << " of table " << index->table->name
4579 << ", page " << page_id_t(page_get_space_id(page),
4580 page_get_page_no(page))
4581 << ", at offset " << page_offset(rec);
4582 }
4583
4584 /************************************************************//**
4585 Checks the size and number of fields in a record based on the definition of
4586 the index.
4587 @return TRUE if ok */
4588 ibool
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)4589 btr_index_rec_validate(
4590 /*===================*/
4591 const rec_t* rec, /*!< in: index record */
4592 const dict_index_t* index, /*!< in: index */
4593 ibool dump_on_error) /*!< in: TRUE if the function
4594 should print hex dump of record
4595 and page on error */
4596 {
4597 ulint len;
4598 const page_t* page;
4599 mem_heap_t* heap = NULL;
4600 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
4601 rec_offs* offsets = offsets_;
4602 rec_offs_init(offsets_);
4603
4604 page = page_align(rec);
4605
4606 ut_ad(index->n_core_fields);
4607
4608 if (index->is_ibuf()) {
4609 /* The insert buffer index tree can contain records from any
4610 other index: we cannot check the number of fields or
4611 their length */
4612
4613 return(TRUE);
4614 }
4615
4616 #ifdef VIRTUAL_INDEX_DEBUG
4617 if (dict_index_has_virtual(index)) {
4618 fprintf(stderr, "index name is %s\n", index->name());
4619 }
4620 #endif
4621 if ((ibool)!!page_is_comp(page) != dict_table_is_comp(index->table)) {
4622 btr_index_rec_validate_report(page, rec, index);
4623
4624 ib::error() << "Compact flag=" << !!page_is_comp(page)
4625 << ", should be " << dict_table_is_comp(index->table);
4626
4627 return(FALSE);
4628 }
4629
4630 const bool is_alter_metadata = page_is_leaf(page)
4631 && !page_has_prev(page)
4632 && index->is_primary() && index->table->instant
4633 && rec == page_rec_get_next_const(page_get_infimum_rec(page));
4634
4635 if (is_alter_metadata
4636 && !rec_is_alter_metadata(rec, page_is_comp(page))) {
4637 btr_index_rec_validate_report(page, rec, index);
4638
4639 ib::error() << "First record is not ALTER TABLE metadata";
4640 return FALSE;
4641 }
4642
4643 if (!page_is_comp(page)) {
4644 const ulint n_rec_fields = rec_get_n_fields_old(rec);
4645 if (n_rec_fields == DICT_FLD__SYS_INDEXES__MERGE_THRESHOLD
4646 && index->id == DICT_INDEXES_ID) {
4647 /* A record for older SYS_INDEXES table
4648 (missing merge_threshold column) is acceptable. */
4649 } else if (is_alter_metadata) {
4650 if (n_rec_fields != ulint(index->n_fields) + 1) {
4651 goto n_field_mismatch;
4652 }
4653 } else if (n_rec_fields < index->n_core_fields
4654 || n_rec_fields > index->n_fields) {
4655 n_field_mismatch:
4656 btr_index_rec_validate_report(page, rec, index);
4657
4658 ib::error() << "Has " << rec_get_n_fields_old(rec)
4659 << " fields, should have "
4660 << index->n_core_fields << ".."
4661 << index->n_fields;
4662
4663 if (dump_on_error) {
4664 fputs("InnoDB: corrupt record ", stderr);
4665 rec_print_old(stderr, rec);
4666 putc('\n', stderr);
4667 }
4668 return(FALSE);
4669 }
4670 }
4671
4672 offsets = rec_get_offsets(rec, index, offsets, page_is_leaf(page)
4673 ? index->n_core_fields : 0,
4674 ULINT_UNDEFINED, &heap);
4675 const dict_field_t* field = index->fields;
4676 ut_ad(rec_offs_n_fields(offsets)
4677 == ulint(index->n_fields) + is_alter_metadata);
4678
4679 for (unsigned i = 0; i < rec_offs_n_fields(offsets); i++) {
4680 rec_get_nth_field_offs(offsets, i, &len);
4681
4682 ulint fixed_size;
4683
4684 if (is_alter_metadata && i == index->first_user_field()) {
4685 fixed_size = FIELD_REF_SIZE;
4686 if (len != FIELD_REF_SIZE
4687 || !rec_offs_nth_extern(offsets, i)) {
4688 goto len_mismatch;
4689 }
4690
4691 continue;
4692 } else {
4693 fixed_size = dict_col_get_fixed_size(
4694 field->col, page_is_comp(page));
4695 if (rec_offs_nth_extern(offsets, i)) {
4696 const byte* data = rec_get_nth_field(
4697 rec, offsets, i, &len);
4698 len -= BTR_EXTERN_FIELD_REF_SIZE;
4699 ulint extern_len = mach_read_from_4(
4700 data + len + BTR_EXTERN_LEN + 4);
4701 if (fixed_size == extern_len) {
4702 goto next_field;
4703 }
4704 }
4705 }
4706
4707 /* Note that if fixed_size != 0, it equals the
4708 length of a fixed-size column in the clustered index.
4709 We should adjust it here.
4710 A prefix index of the column is of fixed, but different
4711 length. When fixed_size == 0, prefix_len is the maximum
4712 length of the prefix index column. */
4713
4714 if (len_is_stored(len)
4715 && (field->prefix_len
4716 ? len > field->prefix_len
4717 : (fixed_size && len != fixed_size))) {
4718 len_mismatch:
4719 btr_index_rec_validate_report(page, rec, index);
4720 ib::error error;
4721
4722 error << "Field " << i << " len is " << len
4723 << ", should be " << fixed_size;
4724
4725 if (dump_on_error) {
4726 error << "; ";
4727 rec_print(error.m_oss, rec,
4728 rec_get_info_bits(
4729 rec, rec_offs_comp(offsets)),
4730 offsets);
4731 }
4732 if (heap) {
4733 mem_heap_free(heap);
4734 }
4735 return(FALSE);
4736 }
4737 next_field:
4738 field++;
4739 }
4740
4741 #ifdef VIRTUAL_INDEX_DEBUG
4742 if (dict_index_has_virtual(index)) {
4743 rec_print_new(stderr, rec, offsets);
4744 }
4745 #endif
4746
4747 if (heap) {
4748 mem_heap_free(heap);
4749 }
4750 return(TRUE);
4751 }
4752
4753 /************************************************************//**
4754 Checks the size and number of fields in records based on the definition of
4755 the index.
4756 @return TRUE if ok */
4757 static
4758 ibool
btr_index_page_validate(buf_block_t * block,dict_index_t * index)4759 btr_index_page_validate(
4760 /*====================*/
4761 buf_block_t* block, /*!< in: index page */
4762 dict_index_t* index) /*!< in: index */
4763 {
4764 page_cur_t cur;
4765 ibool ret = TRUE;
4766 #ifndef DBUG_OFF
4767 ulint nth = 1;
4768 #endif /* !DBUG_OFF */
4769
4770 page_cur_set_before_first(block, &cur);
4771
4772 /* Directory slot 0 should only contain the infimum record. */
4773 DBUG_EXECUTE_IF("check_table_rec_next",
4774 ut_a(page_rec_get_nth_const(
4775 page_cur_get_page(&cur), 0)
4776 == cur.rec);
4777 ut_a(page_dir_slot_get_n_owned(
4778 page_dir_get_nth_slot(
4779 page_cur_get_page(&cur), 0))
4780 == 1););
4781
4782 page_cur_move_to_next(&cur);
4783
4784 for (;;) {
4785 if (page_cur_is_after_last(&cur)) {
4786
4787 break;
4788 }
4789
4790 if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4791
4792 return(FALSE);
4793 }
4794
4795 /* Verify that page_rec_get_nth_const() is correctly
4796 retrieving each record. */
4797 DBUG_EXECUTE_IF("check_table_rec_next",
4798 ut_a(cur.rec == page_rec_get_nth_const(
4799 page_cur_get_page(&cur),
4800 page_rec_get_n_recs_before(
4801 cur.rec)));
4802 ut_a(nth++ == page_rec_get_n_recs_before(
4803 cur.rec)););
4804
4805 page_cur_move_to_next(&cur);
4806 }
4807
4808 return(ret);
4809 }
4810
4811 /************************************************************//**
4812 Report an error on one page of an index tree. */
4813 static
4814 void
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4815 btr_validate_report1(
4816 /*=================*/
4817 dict_index_t* index, /*!< in: index */
4818 ulint level, /*!< in: B-tree level */
4819 const buf_block_t* block) /*!< in: index page */
4820 {
4821 ib::error error;
4822 error << "In page " << block->page.id.page_no()
4823 << " of index " << index->name
4824 << " of table " << index->table->name;
4825
4826 if (level > 0) {
4827 error << ", index tree level " << level;
4828 }
4829 }
4830
4831 /************************************************************//**
4832 Report an error on two pages of an index tree. */
4833 static
4834 void
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4835 btr_validate_report2(
4836 /*=================*/
4837 const dict_index_t* index, /*!< in: index */
4838 ulint level, /*!< in: B-tree level */
4839 const buf_block_t* block1, /*!< in: first index page */
4840 const buf_block_t* block2) /*!< in: second index page */
4841 {
4842 ib::error error;
4843 error << "In pages " << block1->page.id
4844 << " and " << block2->page.id << " of index " << index->name
4845 << " of table " << index->table->name;
4846
4847 if (level > 0) {
4848 error << ", index tree level " << level;
4849 }
4850 }
4851
4852 /************************************************************//**
4853 Validates index tree level.
4854 @return TRUE if ok */
4855 static
4856 bool
btr_validate_level(dict_index_t * index,const trx_t * trx,ulint level,bool lockout)4857 btr_validate_level(
4858 /*===============*/
4859 dict_index_t* index, /*!< in: index tree */
4860 const trx_t* trx, /*!< in: transaction or NULL */
4861 ulint level, /*!< in: level number */
4862 bool lockout)/*!< in: true if X-latch index is intended */
4863 {
4864 buf_block_t* block;
4865 page_t* page;
4866 buf_block_t* right_block = 0; /* remove warning */
4867 page_t* right_page = 0; /* remove warning */
4868 page_t* father_page;
4869 btr_cur_t node_cur;
4870 btr_cur_t right_node_cur;
4871 rec_t* rec;
4872 ulint right_page_no;
4873 ulint left_page_no;
4874 page_cur_t cursor;
4875 dtuple_t* node_ptr_tuple;
4876 bool ret = true;
4877 mtr_t mtr;
4878 mem_heap_t* heap = mem_heap_create(256);
4879 rec_offs* offsets = NULL;
4880 rec_offs* offsets2= NULL;
4881 #ifdef UNIV_ZIP_DEBUG
4882 page_zip_des_t* page_zip;
4883 #endif /* UNIV_ZIP_DEBUG */
4884 ulint savepoint = 0;
4885 ulint savepoint2 = 0;
4886 ulint parent_page_no = FIL_NULL;
4887 ulint parent_right_page_no = FIL_NULL;
4888 bool rightmost_child = false;
4889
4890 mtr.start();
4891
4892 if (!srv_read_only_mode) {
4893 if (lockout) {
4894 mtr_x_lock_index(index, &mtr);
4895 } else {
4896 mtr_sx_lock_index(index, &mtr);
4897 }
4898 }
4899
4900 block = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4901 page = buf_block_get_frame(block);
4902
4903 fil_space_t* space = index->table->space;
4904 const ulint zip_size = space->zip_size();
4905
4906 while (level != btr_page_get_level(page)) {
4907 const rec_t* node_ptr;
4908
4909 if (fseg_page_is_free(space, block->page.id.page_no())) {
4910
4911 btr_validate_report1(index, level, block);
4912
4913 ib::warn() << "Page is free";
4914
4915 ret = false;
4916 }
4917
4918 ut_a(index->table->space_id == block->page.id.space());
4919 ut_a(block->page.id.space() == page_get_space_id(page));
4920 #ifdef UNIV_ZIP_DEBUG
4921 page_zip = buf_block_get_page_zip(block);
4922 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4923 #endif /* UNIV_ZIP_DEBUG */
4924 ut_a(!page_is_leaf(page));
4925
4926 page_cur_set_before_first(block, &cursor);
4927 page_cur_move_to_next(&cursor);
4928
4929 node_ptr = page_cur_get_rec(&cursor);
4930 offsets = rec_get_offsets(node_ptr, index, offsets, 0,
4931 ULINT_UNDEFINED, &heap);
4932
4933 savepoint2 = mtr_set_savepoint(&mtr);
4934 block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4935 page = buf_block_get_frame(block);
4936
4937 /* For R-Tree, since record order might not be the same as
4938 linked index page in the lower level, we need to travers
4939 backwards to get the first page rec in this level.
4940 This is only used for index validation. Spatial index
4941 does not use such scan for any of its DML or query
4942 operations */
4943 if (dict_index_is_spatial(index)) {
4944 left_page_no = btr_page_get_prev(page);
4945
4946 while (left_page_no != FIL_NULL) {
4947 /* To obey latch order of tree blocks,
4948 we should release the right_block once to
4949 obtain lock of the uncle block. */
4950 mtr_release_block_at_savepoint(
4951 &mtr, savepoint2, block);
4952
4953 savepoint2 = mtr_set_savepoint(&mtr);
4954 block = btr_block_get(
4955 page_id_t(index->table->space_id,
4956 left_page_no),
4957 zip_size,
4958 RW_SX_LATCH, index, &mtr);
4959 page = buf_block_get_frame(block);
4960 left_page_no = btr_page_get_prev(page);
4961 }
4962 }
4963 }
4964
4965 /* Now we are on the desired level. Loop through the pages on that
4966 level. */
4967
4968 loop:
4969 mem_heap_empty(heap);
4970 offsets = offsets2 = NULL;
4971 if (!srv_read_only_mode) {
4972 if (lockout) {
4973 mtr_x_lock_index(index, &mtr);
4974 } else {
4975 mtr_sx_lock_index(index, &mtr);
4976 }
4977 }
4978
4979 #ifdef UNIV_ZIP_DEBUG
4980 page_zip = buf_block_get_page_zip(block);
4981 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4982 #endif /* UNIV_ZIP_DEBUG */
4983
4984 ut_a(block->page.id.space() == index->table->space_id);
4985
4986 if (fseg_page_is_free(space, block->page.id.page_no())) {
4987
4988 btr_validate_report1(index, level, block);
4989
4990 ib::warn() << "Page is marked as free";
4991 ret = false;
4992
4993 } else if (btr_page_get_index_id(page) != index->id) {
4994
4995 ib::error() << "Page index id " << btr_page_get_index_id(page)
4996 << " != data dictionary index id " << index->id;
4997
4998 ret = false;
4999
5000 } else if (!page_validate(page, index)) {
5001
5002 btr_validate_report1(index, level, block);
5003 ret = false;
5004
5005 } else if (level == 0 && !btr_index_page_validate(block, index)) {
5006
5007 /* We are on level 0. Check that the records have the right
5008 number of fields, and field lengths are right. */
5009
5010 ret = false;
5011 }
5012
5013 ut_a(btr_page_get_level(page) == level);
5014
5015 right_page_no = btr_page_get_next(page);
5016 left_page_no = btr_page_get_prev(page);
5017
5018 ut_a(!page_is_empty(page)
5019 || (level == 0
5020 && page_get_page_no(page) == dict_index_get_page(index)));
5021
5022 if (right_page_no != FIL_NULL) {
5023 const rec_t* right_rec;
5024 savepoint = mtr_set_savepoint(&mtr);
5025
5026 right_block = btr_block_get(
5027 page_id_t(index->table->space_id, right_page_no),
5028 zip_size,
5029 RW_SX_LATCH, index, &mtr);
5030
5031 right_page = buf_block_get_frame(right_block);
5032
5033 if (btr_page_get_prev(right_page) != page_get_page_no(page)) {
5034 btr_validate_report2(index, level, block, right_block);
5035 fputs("InnoDB: broken FIL_PAGE_NEXT"
5036 " or FIL_PAGE_PREV links\n", stderr);
5037
5038 ret = false;
5039 }
5040
5041 if (page_is_comp(right_page) != page_is_comp(page)) {
5042 btr_validate_report2(index, level, block, right_block);
5043 fputs("InnoDB: 'compact' flag mismatch\n", stderr);
5044
5045 ret = false;
5046
5047 goto node_ptr_fails;
5048 }
5049
5050 rec = page_rec_get_prev(page_get_supremum_rec(page));
5051 right_rec = page_rec_get_next(page_get_infimum_rec(
5052 right_page));
5053 offsets = rec_get_offsets(rec, index, offsets,
5054 page_is_leaf(page)
5055 ? index->n_core_fields : 0,
5056 ULINT_UNDEFINED, &heap);
5057 offsets2 = rec_get_offsets(right_rec, index, offsets2,
5058 page_is_leaf(right_page)
5059 ? index->n_core_fields : 0,
5060 ULINT_UNDEFINED, &heap);
5061
5062 /* For spatial index, we cannot guarantee the key ordering
5063 across pages, so skip the record compare verification for
5064 now. Will enhanced in special R-Tree index validation scheme */
5065 if (!dict_index_is_spatial(index)
5066 && cmp_rec_rec(rec, right_rec,
5067 offsets, offsets2, index) >= 0) {
5068
5069 btr_validate_report2(index, level, block, right_block);
5070
5071 fputs("InnoDB: records in wrong order"
5072 " on adjacent pages\n", stderr);
5073
5074 fputs("InnoDB: record ", stderr);
5075 rec = page_rec_get_prev(page_get_supremum_rec(page));
5076 rec_print(stderr, rec, index);
5077 putc('\n', stderr);
5078 fputs("InnoDB: record ", stderr);
5079 rec = page_rec_get_next(
5080 page_get_infimum_rec(right_page));
5081 rec_print(stderr, rec, index);
5082 putc('\n', stderr);
5083
5084 ret = false;
5085 }
5086 }
5087
5088 if (level > 0 && left_page_no == FIL_NULL) {
5089 ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
5090 page_rec_get_next(page_get_infimum_rec(page)),
5091 page_is_comp(page)));
5092 }
5093
5094 /* Similarly skip the father node check for spatial index for now,
5095 for a couple of reasons:
5096 1) As mentioned, there is no ordering relationship between records
5097 in parent level and linked pages in the child level.
5098 2) Search parent from root is very costly for R-tree.
5099 We will add special validation mechanism for R-tree later (WL #7520) */
5100 if (!dict_index_is_spatial(index)
5101 && block->page.id.page_no() != dict_index_get_page(index)) {
5102
5103 /* Check father node pointers */
5104 rec_t* node_ptr;
5105
5106 btr_cur_position(
5107 index, page_rec_get_next(page_get_infimum_rec(page)),
5108 block, &node_cur);
5109 offsets = btr_page_get_father_node_ptr_for_validate(
5110 offsets, heap, &node_cur, &mtr);
5111
5112 father_page = btr_cur_get_page(&node_cur);
5113 node_ptr = btr_cur_get_rec(&node_cur);
5114
5115 parent_page_no = page_get_page_no(father_page);
5116 parent_right_page_no = btr_page_get_next(father_page);
5117 rightmost_child = page_rec_is_supremum(
5118 page_rec_get_next(node_ptr));
5119
5120 btr_cur_position(
5121 index,
5122 page_rec_get_prev(page_get_supremum_rec(page)),
5123 block, &node_cur);
5124
5125 offsets = btr_page_get_father_node_ptr_for_validate(
5126 offsets, heap, &node_cur, &mtr);
5127
5128 if (node_ptr != btr_cur_get_rec(&node_cur)
5129 || btr_node_ptr_get_child_page_no(node_ptr, offsets)
5130 != block->page.id.page_no()) {
5131
5132 btr_validate_report1(index, level, block);
5133
5134 fputs("InnoDB: node pointer to the page is wrong\n",
5135 stderr);
5136
5137 fputs("InnoDB: node ptr ", stderr);
5138 rec_print(stderr, node_ptr, index);
5139
5140 rec = btr_cur_get_rec(&node_cur);
5141 fprintf(stderr, "\n"
5142 "InnoDB: node ptr child page n:o "
5143 ULINTPF "\n",
5144 btr_node_ptr_get_child_page_no(rec, offsets));
5145
5146 fputs("InnoDB: record on page ", stderr);
5147 rec_print_new(stderr, rec, offsets);
5148 putc('\n', stderr);
5149 ret = false;
5150
5151 goto node_ptr_fails;
5152 }
5153
5154 if (!page_is_leaf(page)) {
5155 node_ptr_tuple = dict_index_build_node_ptr(
5156 index,
5157 page_rec_get_next(page_get_infimum_rec(page)),
5158 0, heap, btr_page_get_level(page));
5159
5160 if (cmp_dtuple_rec(node_ptr_tuple, node_ptr,
5161 offsets)) {
5162 const rec_t* first_rec = page_rec_get_next(
5163 page_get_infimum_rec(page));
5164
5165 btr_validate_report1(index, level, block);
5166
5167 ib::error() << "Node ptrs differ on levels > 0";
5168
5169 fputs("InnoDB: node ptr ",stderr);
5170 rec_print_new(stderr, node_ptr, offsets);
5171 fputs("InnoDB: first rec ", stderr);
5172 rec_print(stderr, first_rec, index);
5173 putc('\n', stderr);
5174 ret = false;
5175
5176 goto node_ptr_fails;
5177 }
5178 }
5179
5180 if (left_page_no == FIL_NULL) {
5181 ut_a(node_ptr == page_rec_get_next(
5182 page_get_infimum_rec(father_page)));
5183 ut_a(!page_has_prev(father_page));
5184 }
5185
5186 if (right_page_no == FIL_NULL) {
5187 ut_a(node_ptr == page_rec_get_prev(
5188 page_get_supremum_rec(father_page)));
5189 ut_a(!page_has_next(father_page));
5190 } else {
5191 const rec_t* right_node_ptr;
5192
5193 right_node_ptr = page_rec_get_next(node_ptr);
5194
5195 if (!lockout && rightmost_child) {
5196
5197 /* To obey latch order of tree blocks,
5198 we should release the right_block once to
5199 obtain lock of the uncle block. */
5200 mtr_release_block_at_savepoint(
5201 &mtr, savepoint, right_block);
5202
5203 if (parent_right_page_no != FIL_NULL) {
5204 btr_block_get(
5205 page_id_t(index->table
5206 ->space_id,
5207 parent_right_page_no),
5208 zip_size,
5209 RW_SX_LATCH, index, &mtr);
5210 }
5211
5212 right_block = btr_block_get(
5213 page_id_t(index->table->space_id,
5214 right_page_no),
5215 zip_size,
5216 RW_SX_LATCH, index, &mtr);
5217 }
5218
5219 btr_cur_position(
5220 index, page_rec_get_next(
5221 page_get_infimum_rec(
5222 buf_block_get_frame(
5223 right_block))),
5224 right_block, &right_node_cur);
5225
5226 offsets = btr_page_get_father_node_ptr_for_validate(
5227 offsets, heap, &right_node_cur, &mtr);
5228
5229 if (right_node_ptr
5230 != page_get_supremum_rec(father_page)) {
5231
5232 if (btr_cur_get_rec(&right_node_cur)
5233 != right_node_ptr) {
5234 ret = false;
5235 fputs("InnoDB: node pointer to"
5236 " the right page is wrong\n",
5237 stderr);
5238
5239 btr_validate_report1(index, level,
5240 block);
5241 }
5242 } else {
5243 page_t* right_father_page
5244 = btr_cur_get_page(&right_node_cur);
5245
5246 if (btr_cur_get_rec(&right_node_cur)
5247 != page_rec_get_next(
5248 page_get_infimum_rec(
5249 right_father_page))) {
5250 ret = false;
5251 fputs("InnoDB: node pointer 2 to"
5252 " the right page is wrong\n",
5253 stderr);
5254
5255 btr_validate_report1(index, level,
5256 block);
5257 }
5258
5259 if (page_get_page_no(right_father_page)
5260 != btr_page_get_next(father_page)) {
5261
5262 ret = false;
5263 fputs("InnoDB: node pointer 3 to"
5264 " the right page is wrong\n",
5265 stderr);
5266
5267 btr_validate_report1(index, level,
5268 block);
5269 }
5270 }
5271 }
5272 }
5273
5274 node_ptr_fails:
5275 /* Commit the mini-transaction to release the latch on 'page'.
5276 Re-acquire the latch on right_page, which will become 'page'
5277 on the next loop. The page has already been checked. */
5278 mtr.commit();
5279
5280 if (trx_is_interrupted(trx)) {
5281 /* On interrupt, return the current status. */
5282 } else if (right_page_no != FIL_NULL) {
5283
5284 mtr.start();
5285
5286 if (!lockout) {
5287 if (rightmost_child) {
5288 if (parent_right_page_no != FIL_NULL) {
5289 btr_block_get(
5290 page_id_t(
5291 index->table->space_id,
5292 parent_right_page_no),
5293 zip_size,
5294 RW_SX_LATCH, index, &mtr);
5295 }
5296 } else if (parent_page_no != FIL_NULL) {
5297 btr_block_get(
5298 page_id_t(index->table->space_id,
5299 parent_page_no),
5300 zip_size,
5301 RW_SX_LATCH, index, &mtr);
5302 }
5303 }
5304
5305 block = btr_block_get(
5306 page_id_t(index->table->space_id, right_page_no),
5307 zip_size,
5308 RW_SX_LATCH, index, &mtr);
5309
5310 page = buf_block_get_frame(block);
5311
5312 goto loop;
5313 }
5314
5315 mem_heap_free(heap);
5316
5317 return(ret);
5318 }
5319
5320 /**************************************************************//**
5321 Checks the consistency of an index tree.
5322 @return DB_SUCCESS if ok, error code if not */
5323 dberr_t
btr_validate_index(dict_index_t * index,const trx_t * trx)5324 btr_validate_index(
5325 /*===============*/
5326 dict_index_t* index, /*!< in: index */
5327 const trx_t* trx) /*!< in: transaction or NULL */
5328 {
5329 dberr_t err = DB_SUCCESS;
5330 bool lockout = dict_index_is_spatial(index);
5331
5332 /* Full Text index are implemented by auxiliary tables,
5333 not the B-tree */
5334 if (dict_index_is_online_ddl(index) || (index->type & DICT_FTS)) {
5335 return(err);
5336 }
5337
5338 mtr_t mtr;
5339
5340 mtr_start(&mtr);
5341
5342 if (!srv_read_only_mode) {
5343 if (lockout) {
5344 mtr_x_lock_index(index, &mtr);
5345 } else {
5346 mtr_sx_lock_index(index, &mtr);
5347 }
5348 }
5349
5350 page_t* root = btr_root_get(index, &mtr);
5351
5352 if (!root) {
5353 mtr_commit(&mtr);
5354 return DB_CORRUPTION;
5355 }
5356
5357 ulint n = btr_page_get_level(root);
5358
5359 btr_validate_index_running++;
5360 for (ulint i = 0; i <= n; ++i) {
5361
5362 if (!btr_validate_level(index, trx, n - i, lockout)) {
5363 err = DB_CORRUPTION;
5364 }
5365 }
5366
5367 mtr_commit(&mtr);
5368 /* In theory we need release barrier here, so that
5369 btr_validate_index_running decrement is guaranteed to
5370 happen after latches are released.
5371
5372 Original code issued SEQ_CST on update and non-atomic
5373 access on load. Which means it had broken synchronisation
5374 as well. */
5375 btr_validate_index_running--;
5376
5377 return(err);
5378 }
5379
5380 /**************************************************************//**
5381 Checks if the page in the cursor can be merged with given page.
5382 If necessary, re-organize the merge_page.
5383 @return true if possible to merge. */
5384 static
5385 bool
btr_can_merge_with_page(btr_cur_t * cursor,ulint page_no,buf_block_t ** merge_block,mtr_t * mtr)5386 btr_can_merge_with_page(
5387 /*====================*/
5388 btr_cur_t* cursor, /*!< in: cursor on the page to merge */
5389 ulint page_no, /*!< in: a sibling page */
5390 buf_block_t** merge_block, /*!< out: the merge block */
5391 mtr_t* mtr) /*!< in: mini-transaction */
5392 {
5393 dict_index_t* index;
5394 page_t* page;
5395 ulint n_recs;
5396 ulint data_size;
5397 ulint max_ins_size_reorg;
5398 ulint max_ins_size;
5399 buf_block_t* mblock;
5400 page_t* mpage;
5401 DBUG_ENTER("btr_can_merge_with_page");
5402
5403 if (page_no == FIL_NULL) {
5404 *merge_block = NULL;
5405 DBUG_RETURN(false);
5406 }
5407
5408 index = btr_cur_get_index(cursor);
5409 page = btr_cur_get_page(cursor);
5410
5411 const page_id_t page_id(index->table->space_id, page_no);
5412 const ulint zip_size = index->table->space->zip_size();
5413
5414 mblock = btr_block_get(page_id, zip_size, RW_X_LATCH, index, mtr);
5415 mpage = buf_block_get_frame(mblock);
5416
5417 n_recs = page_get_n_recs(page);
5418 data_size = page_get_data_size(page);
5419
5420 max_ins_size_reorg = page_get_max_insert_size_after_reorganize(
5421 mpage, n_recs);
5422
5423 if (data_size > max_ins_size_reorg) {
5424 goto error;
5425 }
5426
5427 /* If compression padding tells us that merging will result in
5428 too packed up page i.e.: which is likely to cause compression
5429 failure then don't merge the pages. */
5430 if (zip_size && page_is_leaf(mpage)
5431 && (page_get_data_size(mpage) + data_size
5432 >= dict_index_zip_pad_optimal_page_size(index))) {
5433
5434 goto error;
5435 }
5436
5437 max_ins_size = page_get_max_insert_size(mpage, n_recs);
5438
5439 if (data_size > max_ins_size) {
5440
5441 /* We have to reorganize mpage */
5442
5443 if (!btr_page_reorganize_block(
5444 false, page_zip_level, mblock, index, mtr)) {
5445
5446 goto error;
5447 }
5448
5449 max_ins_size = page_get_max_insert_size(mpage, n_recs);
5450
5451 ut_ad(page_validate(mpage, index));
5452 ut_ad(max_ins_size == max_ins_size_reorg);
5453
5454 if (data_size > max_ins_size) {
5455
5456 /* Add fault tolerance, though this should
5457 never happen */
5458
5459 goto error;
5460 }
5461 }
5462
5463 *merge_block = mblock;
5464 DBUG_RETURN(true);
5465
5466 error:
5467 *merge_block = NULL;
5468 DBUG_RETURN(false);
5469 }
5470