1 /*****************************************************************************
2
3 Copyright (c) 1994, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License, version 2.0, as published by the
8 Free Software Foundation.
9
10 This program is also distributed with certain software (including but not
11 limited to OpenSSL) that is licensed under separate terms, as designated in a
12 particular file or component or in included license documentation. The authors
13 of MySQL hereby grant you an additional permission to link the program and
14 your derivative works with the separately licensed software that they have
15 included with MySQL.
16
17 This program is distributed in the hope that it will be useful, but WITHOUT
18 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
19 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
20 for more details.
21
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25
26 *****************************************************************************/
27
28 /** @file btr/btr0btr.cc
29 The B-tree
30
31 Created 6/2/1994 Heikki Tuuri
32 *******************************************************/
33
34 #include "btr0btr.h"
35
36 #include <sys/types.h>
37
38 #ifndef UNIV_HOTBACKUP
39 #include "btr0cur.h"
40 #include "btr0pcur.h"
41 #include "btr0sea.h"
42 #include "buf0stats.h"
43 #include "dict0boot.h"
44 #include "fsp0sysspace.h"
45 #include "gis0geo.h"
46 #include "gis0rtree.h"
47 #include "ibuf0ibuf.h"
48 #include "lock0lock.h"
49 #include "my_dbug.h"
50 #include "page0page.h"
51 #include "page0zip.h"
52 #include "rem0cmp.h"
53 #include "srv0mon.h"
54 #include "trx0trx.h"
55 #include "ut0new.h"
56 #endif /* !UNIV_HOTBACKUP */
57
58 #ifndef UNIV_HOTBACKUP
59 /** Checks if the page in the cursor can be merged with given page.
60 If necessary, re-organize the merge_page.
61 @return true if possible to merge. */
62 static bool btr_can_merge_with_page(
63 btr_cur_t *cursor, /*!< in: cursor on the page to merge */
64 page_no_t page_no, /*!< in: a sibling page */
65 buf_block_t **merge_block, /*!< out: the merge block */
66 mtr_t *mtr); /*!< in: mini-transaction */
67 #endif /* !UNIV_HOTBACKUP */
68
69 /** Report that an index page is corrupted. */
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)70 void btr_corruption_report(const buf_block_t *block, /*!< in: corrupted block */
71 const dict_index_t *index) /*!< in: index tree */
72 {
73 ib::error(ER_IB_MSG_27) << "Flag mismatch in page " << block->page.id
74 << " index " << index->name << " of table "
75 << index->table->name;
76 }
77
78 #ifndef UNIV_HOTBACKUP
79 /*
80 Latching strategy of the InnoDB B-tree
81 --------------------------------------
82 A tree latch protects all non-leaf nodes of the tree. Each node of a tree
83 also has a latch of its own.
84
85 A B-tree operation normally first acquires an S-latch on the tree. It
86 searches down the tree and releases the tree latch when it has the
87 leaf node latch. To save CPU time we do not acquire any latch on
88 non-leaf nodes of the tree during a search, those pages are only bufferfixed.
89
90 If an operation needs to restructure the tree, it acquires an X-latch on
91 the tree before searching to a leaf node. If it needs, for example, to
92 split a leaf,
93 (1) InnoDB decides the split point in the leaf,
94 (2) allocates a new page,
95 (3) inserts the appropriate node pointer to the first non-leaf level,
96 (4) releases the tree X-latch,
97 (5) and then moves records from the leaf to the new allocated page.
98
99 Node pointers
100 -------------
101 Leaf pages of a B-tree contain the index records stored in the
102 tree. On levels n > 0 we store 'node pointers' to pages on level
103 n - 1. For each page there is exactly one node pointer stored:
104 thus the our tree is an ordinary B-tree, not a B-link tree.
105
106 A node pointer contains a prefix P of an index record. The prefix
107 is long enough so that it determines an index record uniquely.
108 The file page number of the child page is added as the last
109 field. To the child page we can store node pointers or index records
110 which are >= P in the alphabetical order, but < P1 if there is
111 a next node pointer on the level, and P1 is its prefix.
112
113 If a node pointer with a prefix P points to a non-leaf child,
114 then the leftmost record in the child must have the same
115 prefix P. If it points to a leaf node, the child is not required
116 to contain any record with a prefix equal to P. The leaf case
117 is decided this way to allow arbitrary deletions in a leaf node
118 without touching upper levels of the tree.
119
120 We have predefined a special minimum record which we
121 define as the smallest record in any alphabetical order.
122 A minimum record is denoted by setting a bit in the record
123 header. A minimum record acts as the prefix of a node pointer
124 which points to a leftmost node on any level of the tree.
125
126 File page allocation
127 --------------------
128 In the root node of a B-tree there are two file segment headers.
129 The leaf pages of a tree are allocated from one file segment, to
130 make them consecutive on disk if possible. From the other file segment
131 we allocate pages for the non-leaf levels of the tree.
132 */
133
134 #ifdef UNIV_BTR_DEBUG
135 /** Checks a file segment header within a B-tree root page.
136 @return true if valid */
btr_root_fseg_validate(const fseg_header_t * seg_header,space_id_t space)137 static ibool btr_root_fseg_validate(
138 const fseg_header_t *seg_header, /*!< in: segment header */
139 space_id_t space) /*!< in: tablespace identifier */
140 {
141 ulint offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
142
143 ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
144 ut_a(offset >= FIL_PAGE_DATA);
145 ut_a(offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
146 return (TRUE);
147 }
148 #endif /* UNIV_BTR_DEBUG */
149
150 /** Gets the root node of a tree and x- or s-latches it.
151 @return root page, x- or s-latched */
btr_root_block_get(const dict_index_t * index,ulint mode,mtr_t * mtr)152 buf_block_t *btr_root_block_get(
153 const dict_index_t *index, /*!< in: index tree */
154 ulint mode, /*!< in: either RW_S_LATCH
155 or RW_X_LATCH */
156 mtr_t *mtr) /*!< in: mtr */
157 {
158 const space_id_t space_id = dict_index_get_space(index);
159 const page_id_t page_id(space_id, dict_index_get_page(index));
160 const page_size_t page_size(dict_table_page_size(index->table));
161
162 buf_block_t *block = btr_block_get(page_id, page_size, mode, index, mtr);
163
164 btr_assert_not_corrupted(block, index);
165 #ifdef UNIV_BTR_DEBUG
166 if (!dict_index_is_ibuf(index)) {
167 const page_t *root = buf_block_get_frame(block);
168
169 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + root,
170 space_id));
171 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
172 space_id));
173 }
174 #endif /* UNIV_BTR_DEBUG */
175
176 return (block);
177 }
178
179 /** Gets the root node of a tree and sx-latches it for segment access.
180 @return root page, sx-latched */
btr_root_get(const dict_index_t * index,mtr_t * mtr)181 page_t *btr_root_get(const dict_index_t *index, /*!< in: index tree */
182 mtr_t *mtr) /*!< in: mtr */
183 {
184 /* Intended to be used for segment list access.
185 SX lock doesn't block reading user data by other threads.
186 And block the segment list access by others.*/
187 return (buf_block_get_frame(btr_root_block_get(index, RW_SX_LATCH, mtr)));
188 }
189
190 /** Gets the height of the B-tree (the level of the root, when the leaf
191 level is assumed to be 0). The caller must hold an S or X latch on
192 the index.
193 @return tree height (level of the root) */
btr_height_get(dict_index_t * index,mtr_t * mtr)194 ulint btr_height_get(dict_index_t *index, /*!< in: index tree */
195 mtr_t *mtr) /*!< in/out: mini-transaction */
196 {
197 ulint height;
198 buf_block_t *root_block;
199
200 ut_ad(srv_read_only_mode ||
201 mtr_memo_contains_flagged(
202 mtr, dict_index_get_lock(index),
203 MTR_MEMO_S_LOCK | MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
204 index->table->is_intrinsic());
205
206 /* S latches the page */
207 root_block = btr_root_block_get(index, RW_S_LATCH, mtr);
208
209 height = btr_page_get_level(buf_block_get_frame(root_block), mtr);
210
211 /* Release the S latch on the root page. */
212 mtr->memo_release(root_block, MTR_MEMO_PAGE_S_FIX);
213
214 ut_d(sync_check_unlock(&root_block->lock));
215
216 return (height);
217 }
218
219 /** Checks a file segment header within a B-tree root page and updates
220 the segment header space id.
221 @return true if valid */
btr_root_fseg_adjust_on_import(fseg_header_t * seg_header,page_zip_des_t * page_zip,space_id_t space,mtr_t * mtr)222 static bool btr_root_fseg_adjust_on_import(
223 fseg_header_t *seg_header, /*!< in/out: segment header */
224 page_zip_des_t *page_zip, /*!< in/out: compressed page,
225 or NULL */
226 space_id_t space, /*!< in: tablespace identifier */
227 mtr_t *mtr) /*!< in/out: mini-transaction */
228 {
229 page_no_t offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
230
231 if (offset < FIL_PAGE_DATA || offset > UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) {
232 return (FALSE);
233
234 } else if (page_zip) {
235 mach_write_to_4(seg_header + FSEG_HDR_SPACE, space);
236 page_zip_write_header(page_zip, seg_header + FSEG_HDR_SPACE, 4, mtr);
237 } else {
238 mlog_write_ulint(seg_header + FSEG_HDR_SPACE, space, MLOG_4BYTES, mtr);
239 }
240
241 return (TRUE);
242 }
243
244 /** Checks and adjusts the root node of a tree during IMPORT TABLESPACE.
245 @return error code, or DB_SUCCESS */
btr_root_adjust_on_import(const dict_index_t * index)246 dberr_t btr_root_adjust_on_import(
247 const dict_index_t *index) /*!< in: index tree */
248 {
249 dberr_t err;
250 mtr_t mtr;
251 page_t *page;
252 buf_block_t *block;
253 page_zip_des_t *page_zip;
254 dict_table_t *table = index->table;
255 const space_id_t space_id = dict_index_get_space(index);
256 const page_id_t page_id(space_id, dict_index_get_page(index));
257 const page_size_t page_size(dict_table_page_size(table));
258
259 DBUG_EXECUTE_IF("ib_import_trigger_corruption_3", return (DB_CORRUPTION););
260
261 mtr_start(&mtr);
262
263 mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
264
265 block = btr_block_get(page_id, page_size, RW_X_LATCH, index, &mtr);
266
267 page = buf_block_get_frame(block);
268 page_zip = buf_block_get_page_zip(block);
269
270 if (!page_is_root(page)) {
271 err = DB_CORRUPTION;
272
273 } else if (index->is_clustered()) {
274 bool page_is_compact_format;
275
276 page_is_compact_format = page_is_comp(page) > 0;
277
278 /* Check if the page format and table format agree. */
279 if (page_is_compact_format != !!dict_table_is_comp(table)) {
280 err = DB_CORRUPTION;
281 } else {
282 /* Check that the table flags and the tablespace
283 flags match. */
284 uint32_t flags = dict_tf_to_fsp_flags(table->flags);
285 uint32_t fsp_flags = fil_space_get_flags(table->space);
286
287 /* We remove SDI flag from space flags temporarily for
288 comparison because the space flags derived from table
289 flags will not have SDI flag */
290 fsp_flags_unset_sdi(fsp_flags);
291
292 /* As encryption is not a table property, we don't keep
293 any encryption property related flag in table. Thus
294 exclude encryption flag as well. */
295 fsp_flags_unset_encryption(fsp_flags);
296
297 err = fsp_flags_are_equal(flags, fsp_flags) ? DB_SUCCESS : DB_CORRUPTION;
298 }
299 } else {
300 err = DB_SUCCESS;
301 }
302
303 /* Check and adjust the file segment headers, if all OK so far. */
304 if (err == DB_SUCCESS &&
305 (!btr_root_fseg_adjust_on_import(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + page,
306 page_zip, space_id, &mtr) ||
307 !btr_root_fseg_adjust_on_import(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + page,
308 page_zip, space_id, &mtr))) {
309 err = DB_CORRUPTION;
310 }
311
312 mtr_commit(&mtr);
313
314 return (err);
315 }
316
317 /** Creates a new index page (not the root, and also not
318 used in page reorganization). @see btr_page_empty(). */
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)319 void btr_page_create(
320 buf_block_t *block, /*!< in/out: page to be created */
321 page_zip_des_t *page_zip, /*!< in/out: compressed page, or NULL */
322 dict_index_t *index, /*!< in: index */
323 ulint level, /*!< in: the B-tree level of the page */
324 mtr_t *mtr) /*!< in: mtr */
325 {
326 page_t *page = buf_block_get_frame(block);
327
328 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
329
330 uint16_t page_create_type;
331 if (dict_index_is_spatial(index)) {
332 page_create_type = FIL_PAGE_RTREE;
333 } else if (dict_index_is_sdi(index)) {
334 page_create_type = FIL_PAGE_SDI;
335 } else {
336 page_create_type = FIL_PAGE_INDEX;
337 }
338
339 if (page_zip) {
340 page_create_zip(block, index, level, 0, mtr, page_create_type);
341 } else {
342 page_create(block, mtr, dict_table_is_comp(index->table), page_create_type);
343 /* Set the level of the new index page */
344 btr_page_set_level(page, nullptr, level, mtr);
345 }
346
347 /* For Spatial Index, initialize the Split Sequence Number */
348 if (dict_index_is_spatial(index)) {
349 page_set_ssn_id(block, page_zip, 0, mtr);
350 }
351
352 btr_page_set_index_id(page, page_zip, index->id, mtr);
353
354 if (level == 0) {
355 buf_stat_per_index->inc(index_id_t(index->space, index->id));
356 }
357 }
358
359 /** Allocates a new file page to be used in an ibuf tree. Takes the page from
360 the free list of the tree, which must contain pages!
361 @return new allocated block, x-latched */
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)362 static buf_block_t *btr_page_alloc_for_ibuf(
363 dict_index_t *index, /*!< in: index tree */
364 mtr_t *mtr) /*!< in: mtr */
365 {
366 fil_addr_t node_addr;
367 page_t *root;
368 page_t *new_page;
369 buf_block_t *new_block;
370
371 root = btr_root_get(index, mtr);
372
373 node_addr = flst_get_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
374 ut_a(node_addr.page != FIL_NULL);
375
376 new_block =
377 buf_page_get(page_id_t(dict_index_get_space(index), node_addr.page),
378 dict_table_page_size(index->table), RW_X_LATCH, mtr);
379
380 new_page = buf_block_get_frame(new_block);
381 buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
382
383 flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
384 new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
385 ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr));
386
387 return (new_block);
388 }
389
390 /** Allocates a new file page to be used in an index tree. NOTE: we assume
391 that the caller has made the reservation for free extents!
392 @retval NULL if no page could be allocated
393 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
394 (init_mtr == mtr, or the page was not previously freed in mtr)
395 @retval block (not allocated or initialized) otherwise */
btr_page_alloc_low(dict_index_t * index,page_no_t hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)396 static MY_ATTRIBUTE((warn_unused_result)) buf_block_t *btr_page_alloc_low(
397 dict_index_t *index, /*!< in: index */
398 page_no_t hint_page_no, /*!< in: hint of a good page */
399 byte file_direction, /*!< in: direction where a possible
400 page split is made */
401 ulint level, /*!< in: level where the page is placed
402 in the tree */
403 mtr_t *mtr, /*!< in/out: mini-transaction
404 for the allocation */
405 mtr_t *init_mtr) /*!< in/out: mtr or another
406 mini-transaction in which the
407 page should be initialized.
408 If init_mtr!=mtr, but the page
409 is already X-latched in mtr, do
410 not initialize the page. */
411 {
412 fseg_header_t *seg_header;
413 page_t *root;
414
415 root = btr_root_get(index, mtr);
416
417 if (level == 0) {
418 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
419 } else {
420 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
421 }
422
423 /* Parameter TRUE below states that the caller has made the
424 reservation for free extents, and thus we know that a page can
425 be allocated: */
426
427 return (fseg_alloc_free_page_general(seg_header, hint_page_no, file_direction,
428 TRUE, mtr, init_mtr));
429 }
430
431 /** Allocates a new file page to be used in an index tree. NOTE: we assume
432 that the caller has made the reservation for free extents!
433 @retval NULL if no page could be allocated
434 @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
435 (init_mtr == mtr, or the page was not previously freed in mtr)
436 @retval block (not allocated or initialized) otherwise */
btr_page_alloc(dict_index_t * index,page_no_t hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)437 buf_block_t *btr_page_alloc(
438 dict_index_t *index, /*!< in: index */
439 page_no_t hint_page_no, /*!< in: hint of a good page */
440 byte file_direction, /*!< in: direction where a possible
441 page split is made */
442 ulint level, /*!< in: level where the page is placed
443 in the tree */
444 mtr_t *mtr, /*!< in/out: mini-transaction
445 for the allocation */
446 mtr_t *init_mtr) /*!< in/out: mini-transaction
447 for x-latching and initializing
448 the page */
449 {
450 buf_block_t *new_block;
451
452 if (dict_index_is_ibuf(index)) {
453 return (btr_page_alloc_for_ibuf(index, mtr));
454 }
455
456 new_block = btr_page_alloc_low(index, hint_page_no, file_direction, level,
457 mtr, init_mtr);
458
459 if (new_block) {
460 buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
461 }
462
463 return (new_block);
464 }
465
466 /** Gets the number of pages in a B-tree.
467 @return number of pages, or ULINT_UNDEFINED if the index is unavailable */
btr_get_size(dict_index_t * index,ulint flag,mtr_t * mtr)468 ulint btr_get_size(dict_index_t *index, /*!< in: index */
469 ulint flag, /*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
470 mtr_t *mtr) /*!< in/out: mini-transaction where index
471 is s-latched */
472 {
473 fseg_header_t *seg_header;
474 page_t *root;
475 ulint n;
476 ulint dummy;
477
478 ut_ad(srv_read_only_mode ||
479 mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_S_LOCK) ||
480 index->table->is_intrinsic());
481 ut_ad(index->page >= FSP_FIRST_INODE_PAGE_NO);
482
483 if (index->page == FIL_NULL || dict_index_is_online_ddl(index) ||
484 !index->is_committed()) {
485 return (ULINT_UNDEFINED);
486 }
487
488 root = btr_root_get(index, mtr);
489
490 if (flag == BTR_N_LEAF_PAGES) {
491 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
492
493 fseg_n_reserved_pages(seg_header, &n, mtr);
494
495 } else if (flag == BTR_TOTAL_SIZE) {
496 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
497
498 n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
499
500 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
501
502 n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
503 } else {
504 ut_error;
505 }
506
507 return (n);
508 }
509
510 /** Frees a page used in an ibuf tree. Puts the page to the free list of the
511 ibuf tree. */
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)512 static void btr_page_free_for_ibuf(
513 dict_index_t *index, /*!< in: index tree */
514 buf_block_t *block, /*!< in: block to be freed, x-latched */
515 mtr_t *mtr) /*!< in: mtr */
516 {
517 page_t *root;
518
519 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
520 root = btr_root_get(index, mtr);
521
522 flst_add_first(
523 root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
524 buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
525 mtr);
526
527 ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr));
528 }
529
530 /** Frees a file page used in an index tree. Can be used also to (BLOB)
531 external storage pages. */
btr_page_free_low(dict_index_t * index,buf_block_t * block,ulint level,mtr_t * mtr)532 void btr_page_free_low(
533 dict_index_t *index, /*!< in: index tree */
534 buf_block_t *block, /*!< in: block to be freed, x-latched */
535 ulint level, /*!< in: page level (ULINT_UNDEFINED=BLOB) */
536 mtr_t *mtr) /*!< in: mtr */
537 {
538 fseg_header_t *seg_header;
539 page_t *root;
540
541 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
542 /* The page gets invalid for optimistic searches: increment the frame
543 modify clock */
544
545 buf_block_modify_clock_inc(block);
546
547 if (dict_index_is_ibuf(index)) {
548 btr_page_free_for_ibuf(index, block, mtr);
549
550 return;
551 }
552
553 root = btr_root_get(index, mtr);
554
555 if (level == 0 || level == ULINT_UNDEFINED) {
556 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
557 } else {
558 seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
559 }
560
561 #ifdef UNIV_GIS_DEBUG
562 if (dict_index_is_spatial(index)) {
563 fprintf(stderr, "GIS_DIAG: Freed %ld\n", (long)block->page.id.page_no());
564 }
565 #endif
566
567 fseg_free_page(seg_header, block->page.id.space(), block->page.id.page_no(),
568 level != ULINT_UNDEFINED, mtr);
569
570 /* The page was marked free in the allocation bitmap, but it
571 should remain buffer-fixed until mtr_commit(mtr) or until it
572 is explicitly freed from the mini-transaction. */
573 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
574 /* TODO: Discard any operations on the page from the redo log
575 and remove the block from the flush list and the buffer pool.
576 This would free up buffer pool earlier and reduce writes to
577 both the tablespace and the redo log. */
578 }
579
580 /** Frees a file page used in an index tree. NOTE: cannot free field external
581 storage pages because the page must contain info on its level. */
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr)582 void btr_page_free(dict_index_t *index, /*!< in: index tree */
583 buf_block_t *block, /*!< in: block to be freed, x-latched */
584 mtr_t *mtr) /*!< in: mtr */
585 {
586 const page_t *page = buf_block_get_frame(block);
587 ulint level = btr_page_get_level(page, mtr);
588
589 ut_ad(fil_page_index_page_check(block->frame));
590 ut_ad(level != ULINT_UNDEFINED);
591 btr_page_free_low(index, block, level, mtr);
592 }
593
594 /** Sets the child node file address in a node pointer. */
595 UNIV_INLINE
btr_node_ptr_set_child_page_no(rec_t * rec,page_zip_des_t * page_zip,const ulint * offsets,page_no_t page_no,mtr_t * mtr)596 void btr_node_ptr_set_child_page_no(
597 rec_t *rec, /*!< in: node pointer record */
598 page_zip_des_t *page_zip, /*!< in/out: compressed page whose uncompressed
599 part will be updated, or NULL */
600 const ulint *offsets, /*!< in: array returned by rec_get_offsets() */
601 page_no_t page_no, /*!< in: child node address */
602 mtr_t *mtr) /*!< in: mtr */
603 {
604 byte *field;
605 ulint len;
606
607 ut_ad(rec_offs_validate(rec, nullptr, offsets));
608 ut_ad(!page_is_leaf(page_align(rec)));
609 ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
610
611 /* The child address is in the last field */
612 field = const_cast<byte *>(
613 rec_get_nth_field(rec, offsets, rec_offs_n_fields(offsets) - 1, &len));
614
615 ut_ad(len == REC_NODE_PTR_SIZE);
616
617 if (page_zip) {
618 page_zip_write_node_ptr(page_zip, rec, rec_offs_data_size(offsets), page_no,
619 mtr);
620 } else {
621 mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
622 }
623 }
624
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const ulint * offsets,mtr_t * mtr,rw_lock_type_t type)625 buf_block_t *btr_node_ptr_get_child(const rec_t *node_ptr, dict_index_t *index,
626 const ulint *offsets, mtr_t *mtr,
627 rw_lock_type_t type) {
628 ut_ad(rec_offs_validate(node_ptr, index, offsets));
629
630 const page_id_t page_id(page_get_space_id(page_align(node_ptr)),
631 btr_node_ptr_get_child_page_no(node_ptr, offsets));
632
633 return (btr_block_get(page_id, dict_table_page_size(index->table), type,
634 index, mtr));
635 }
636
637 /** Returns the upper level node pointer to a page. It is assumed that mtr holds
638 an sx-latch on the tree.
639 @return rec_get_offsets() of the node pointer record */
btr_page_get_father_node_ptr_func(ulint * offsets,mem_heap_t * heap,btr_cur_t * cursor,ulint latch_mode,const char * file,ulint line,mtr_t * mtr)640 static ulint *btr_page_get_father_node_ptr_func(
641 ulint *offsets, /*!< in: work area for the return value */
642 mem_heap_t *heap, /*!< in: memory heap to use */
643 btr_cur_t *cursor, /*!< in: cursor pointing to user record,
644 out: cursor on node pointer record,
645 its page x-latched */
646 ulint latch_mode, /*!< in: BTR_CONT_MODIFY_TREE
647 or BTR_CONT_SEARCH_TREE */
648 const char *file, /*!< in: file name */
649 ulint line, /*!< in: line where called */
650 mtr_t *mtr) /*!< in: mtr */
651 {
652 dtuple_t *tuple;
653 rec_t *user_rec;
654 rec_t *node_ptr;
655 ulint level;
656 page_no_t page_no;
657 dict_index_t *index;
658
659 ut_ad(latch_mode == BTR_CONT_MODIFY_TREE ||
660 latch_mode == BTR_CONT_SEARCH_TREE);
661
662 page_no = btr_cur_get_block(cursor)->page.id.page_no();
663 index = btr_cur_get_index(cursor);
664 ut_ad(!dict_index_is_spatial(index));
665
666 ut_ad(srv_read_only_mode ||
667 mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
668 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
669 index->table->is_intrinsic());
670
671 ut_ad(dict_index_get_page(index) != page_no);
672
673 level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
674
675 user_rec = btr_cur_get_rec(cursor);
676 ut_a(page_rec_is_user_rec(user_rec));
677
678 tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
679 if (index->table->is_intrinsic()) {
680 btr_cur_search_to_nth_level_with_no_latch(
681 index, level + 1, tuple, PAGE_CUR_LE, cursor, file, line, mtr);
682 } else {
683 btr_cur_search_to_nth_level(index, level + 1, tuple, PAGE_CUR_LE,
684 latch_mode, cursor, 0, file, line, mtr);
685 }
686
687 node_ptr = btr_cur_get_rec(cursor);
688 ut_ad(!page_rec_is_comp(node_ptr) ||
689 rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
690 offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
691
692 if (btr_node_ptr_get_child_page_no(node_ptr, offsets) != page_no) {
693 rec_t *print_rec;
694
695 ib::error(ER_IB_MSG_28)
696 << "Corruption of an index tree: table " << index->table->name
697 << " index " << index->name << ", father ptr page no "
698 << btr_node_ptr_get_child_page_no(node_ptr, offsets)
699 << ", child page no " << page_no;
700
701 print_rec = page_rec_get_next(page_get_infimum_rec(page_align(user_rec)));
702 offsets =
703 rec_get_offsets(print_rec, index, offsets, ULINT_UNDEFINED, &heap);
704 page_rec_print(print_rec, offsets);
705 offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
706 page_rec_print(node_ptr, offsets);
707
708 ib::fatal(ER_IB_MSG_29) << "You should dump + drop + reimport the table to"
709 << " fix the corruption. If the crash happens at"
710 << " database startup. " << FORCE_RECOVERY_MSG
711 << " Then dump + drop + reimport.";
712 }
713
714 return (offsets);
715 }
716
717 #define btr_page_get_father_node_ptr(of, heap, cur, mtr) \
718 btr_page_get_father_node_ptr_func(of, heap, cur, BTR_CONT_MODIFY_TREE, \
719 __FILE__, __LINE__, mtr)
720
721 #define btr_page_get_father_node_ptr_for_validate(of, heap, cur, mtr) \
722 btr_page_get_father_node_ptr_func(of, heap, cur, BTR_CONT_SEARCH_TREE, \
723 __FILE__, __LINE__, mtr)
724
725 /** Returns the upper level node pointer to a page. It is assumed that mtr holds
726 an x-latch on the tree.
727 @return rec_get_offsets() of the node pointer record */
btr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)728 static ulint *btr_page_get_father_block(
729 ulint *offsets, /*!< in: work area for the return value */
730 mem_heap_t *heap, /*!< in: memory heap to use */
731 dict_index_t *index, /*!< in: b-tree index */
732 buf_block_t *block, /*!< in: child page in the index */
733 mtr_t *mtr, /*!< in: mtr */
734 btr_cur_t *cursor) /*!< out: cursor on node pointer record,
735 its page x-latched */
736 {
737 rec_t *rec =
738 page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(block)));
739 btr_cur_position(index, rec, block, cursor);
740 return (btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
741 }
742
743 /** Seeks to the upper level node pointer to a page.
744 It is assumed that mtr holds an x-latch on the tree. */
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)745 static void btr_page_get_father(
746 dict_index_t *index, /*!< in: b-tree index */
747 buf_block_t *block, /*!< in: child page in the index */
748 mtr_t *mtr, /*!< in: mtr */
749 btr_cur_t *cursor) /*!< out: cursor on node pointer record,
750 its page x-latched */
751 {
752 mem_heap_t *heap;
753 rec_t *rec =
754 page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(block)));
755 btr_cur_position(index, rec, block, cursor);
756
757 heap = mem_heap_create(100);
758 btr_page_get_father_node_ptr(nullptr, heap, cursor, mtr);
759 mem_heap_free(heap);
760 }
761
762 /** Free a B-tree root page. btr_free_but_not_root() must already
763 have been called.
764 In a persistent tablespace, the caller must invoke fsp_init_file_page()
765 before mtr.commit().
766 @param[in,out] block index root page
767 @param[in,out] mtr mini-transaction */
btr_free_root(buf_block_t * block,mtr_t * mtr)768 static void btr_free_root(buf_block_t *block, mtr_t *mtr) {
769 fseg_header_t *header;
770
771 ut_ad(mtr_memo_contains_flagged(mtr, block, MTR_MEMO_PAGE_X_FIX));
772
773 btr_search_drop_page_hash_index(block);
774
775 header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
776 #ifdef UNIV_BTR_DEBUG
777 ut_a(btr_root_fseg_validate(header, block->page.id.space()));
778 #endif /* UNIV_BTR_DEBUG */
779
780 while (!fseg_free_step(header, true, mtr)) {
781 /* Free the entire segment in small steps. */
782 }
783 }
784
785 /** PAGE_INDEX_ID value for freed index B-trees */
786 static const space_index_t BTR_FREED_INDEX_ID = 0;
787
788 /** Invalidate an index root page so that btr_free_root_check()
789 will not find it.
790 @param[in,out] block index root page
791 @param[in,out] mtr mini-transaction */
btr_free_root_invalidate(buf_block_t * block,mtr_t * mtr)792 static void btr_free_root_invalidate(buf_block_t *block, mtr_t *mtr) {
793 ut_ad(page_is_root(block->frame));
794
795 btr_page_set_index_id(buf_block_get_frame(block),
796 buf_block_get_page_zip(block), BTR_FREED_INDEX_ID, mtr);
797 }
798
799 /** Prepare to free a B-tree.
800 @param[in] page_id page id
801 @param[in] page_size page size
802 @param[in] index_id PAGE_INDEX_ID contents
803 @param[in,out] mtr mini-transaction
804 @return root block, to invoke btr_free_but_not_root() and btr_free_root()
805 @retval NULL if the page is no longer a matching B-tree page */
btr_free_root_check(const page_id_t & page_id,const page_size_t & page_size,space_index_t index_id,mtr_t * mtr)806 static MY_ATTRIBUTE((warn_unused_result)) buf_block_t *btr_free_root_check(
807 const page_id_t &page_id, const page_size_t &page_size,
808 space_index_t index_id, mtr_t *mtr) {
809 ut_ad(!fsp_is_system_temporary(page_id.space()));
810 ut_ad(index_id != BTR_FREED_INDEX_ID);
811
812 buf_block_t *block = buf_page_get(page_id, page_size, RW_X_LATCH, mtr);
813 buf_block_dbg_add_level(block, SYNC_TREE_NODE);
814
815 if (fil_page_index_page_check(block->frame) &&
816 index_id == btr_page_get_index_id(block->frame)) {
817 /* This should be a root page.
818 It should not be possible to reassign the same
819 index_id for some other index in the tablespace. */
820 ut_ad(page_is_root(block->frame));
821 } else {
822 block = nullptr;
823 }
824
825 return (block);
826 }
827
828 /** Create the root node for a new index tree.
829 @param[in] type type of the index
830 @param[in] space space where created
831 @param[in] page_size page size
832 @param[in] index_id index id
833 @param[in] index index tree
834 @param[in,out] mtr mini-transaction
835 @return page number of the created root
836 @retval FIL_NULL if did not succeed */
btr_create(ulint type,space_id_t space,const page_size_t & page_size,space_index_t index_id,dict_index_t * index,mtr_t * mtr)837 ulint btr_create(ulint type, space_id_t space, const page_size_t &page_size,
838 space_index_t index_id, dict_index_t *index, mtr_t *mtr) {
839 page_no_t page_no;
840 buf_block_t *block;
841 buf_frame_t *frame;
842 page_t *page;
843 page_zip_des_t *page_zip;
844
845 ut_ad(index_id != BTR_FREED_INDEX_ID);
846
847 /* Create the two new segments (one, in the case of an ibuf tree) for
848 the index tree; the segment headers are put on the allocated root page
849 (for an ibuf tree, not in the root, but on a separate ibuf header
850 page) */
851
852 if (type & DICT_IBUF) {
853 /* Allocate first the ibuf header page */
854 buf_block_t *ibuf_hdr_block =
855 fseg_create(space, 0, IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
856
857 if (ibuf_hdr_block == nullptr) {
858 return (FIL_NULL);
859 }
860
861 buf_block_dbg_add_level(ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
862
863 ut_ad(ibuf_hdr_block->page.id.page_no() == IBUF_HEADER_PAGE_NO);
864 /* Allocate then the next page to the segment: it will be the
865 tree root page */
866
867 block = fseg_alloc_free_page(buf_block_get_frame(ibuf_hdr_block) +
868 IBUF_HEADER + IBUF_TREE_SEG_HEADER,
869 IBUF_TREE_ROOT_PAGE_NO, FSP_UP, mtr);
870 ut_ad(block->page.id.page_no() == IBUF_TREE_ROOT_PAGE_NO);
871 } else {
872 block = fseg_create(space, 0, PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
873 }
874
875 if (block == nullptr) {
876 return (FIL_NULL);
877 }
878
879 page_no = block->page.id.page_no();
880 frame = buf_block_get_frame(block);
881
882 if (type & DICT_IBUF) {
883 /* It is an insert buffer tree: initialize the free list */
884 buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
885
886 ut_ad(page_no == IBUF_TREE_ROOT_PAGE_NO);
887
888 flst_init(frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
889 } else {
890 /* It is a non-ibuf tree: create a file segment for leaf
891 pages */
892 buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
893
894 if (!fseg_create(space, page_no, PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr)) {
895 /* Not enough space for new segment, free root
896 segment before return. */
897 btr_free_root(block, mtr);
898 if (!index->table->is_temporary()) {
899 btr_free_root_invalidate(block, mtr);
900 }
901
902 return (FIL_NULL);
903 }
904
905 /* The fseg create acquires a second latch on the page,
906 therefore we must declare it: */
907 buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
908 }
909
910 uint16_t page_create_type;
911 if (dict_index_is_spatial(index)) {
912 page_create_type = FIL_PAGE_RTREE;
913 } else if (dict_index_is_sdi(index)) {
914 page_create_type = FIL_PAGE_SDI;
915 } else {
916 page_create_type = FIL_PAGE_INDEX;
917 }
918
919 /* Create a new index page on the allocated segment page */
920 page_zip = buf_block_get_page_zip(block);
921
922 if (page_zip) {
923 page = page_create_zip(block, index, 0, 0, mtr, page_create_type);
924 } else {
925 page = page_create(block, mtr, dict_table_is_comp(index->table),
926 page_create_type);
927 /* Set the level of the new index page */
928 btr_page_set_level(page, nullptr, 0, mtr);
929 }
930
931 /* Set the index id of the page */
932 btr_page_set_index_id(page, page_zip, index_id, mtr);
933
934 /* Set the next node and previous node fields */
935 btr_page_set_next(page, page_zip, FIL_NULL, mtr);
936 btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
937
938 /* We reset the free bits for the page to allow creation of several
939 trees in the same mtr, otherwise the latch on a bitmap page would
940 prevent it because of the latching order.
941
942 Note: Insert Buffering is disabled for temporary tables given that
943 most temporary tables are smaller in size and short-lived. */
944 if (!(type & DICT_CLUSTERED) && !index->table->is_temporary()) {
945 ibuf_reset_free_bits(block);
946 }
947
948 /* In the following assertion we test that two records of maximum
949 allowed size fit on the root page: this fact is needed to ensure
950 correctness of split algorithms */
951
952 ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);
953
954 buf_stat_per_index->inc(index_id_t(space, index_id));
955
956 return (page_no);
957 }
958
959 /** Free a B-tree except the root page. The root page MUST be freed after
960 this by calling btr_free_root.
961 @param[in,out] block root page
962 @param[in] log_mode mtr logging mode */
btr_free_but_not_root(buf_block_t * block,mtr_log_t log_mode)963 static void btr_free_but_not_root(buf_block_t *block, mtr_log_t log_mode) {
964 ibool finished;
965 mtr_t mtr;
966
967 ut_ad(page_is_root(block->frame));
968 leaf_loop:
969 mtr_start(&mtr);
970 mtr_set_log_mode(&mtr, log_mode);
971
972 page_t *root = block->frame;
973
974 #ifdef UNIV_BTR_DEBUG
975 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + root,
976 block->page.id.space()));
977 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
978 block->page.id.space()));
979 #endif /* UNIV_BTR_DEBUG */
980
981 /* NOTE: page hash indexes are dropped when a page is freed inside
982 fsp0fsp. */
983
984 finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF, true, &mtr);
985 mtr_commit(&mtr);
986
987 if (!finished) {
988 goto leaf_loop;
989 }
990 top_loop:
991 mtr_start(&mtr);
992 mtr_set_log_mode(&mtr, log_mode);
993
994 root = block->frame;
995
996 #ifdef UNIV_BTR_DEBUG
997 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
998 block->page.id.space()));
999 #endif /* UNIV_BTR_DEBUG */
1000
1001 finished = fseg_free_step_not_header(root + PAGE_HEADER + PAGE_BTR_SEG_TOP,
1002 true, &mtr);
1003 mtr_commit(&mtr);
1004
1005 if (!finished) {
1006 goto top_loop;
1007 }
1008 }
1009
1010 /** Free a persistent index tree if it exists.
1011 @param[in] page_id root page id
1012 @param[in] page_size page size
1013 @param[in] index_id PAGE_INDEX_ID contents
1014 @param[in,out] mtr mini-transaction */
btr_free_if_exists(const page_id_t & page_id,const page_size_t & page_size,space_index_t index_id,mtr_t * mtr)1015 void btr_free_if_exists(const page_id_t &page_id, const page_size_t &page_size,
1016 space_index_t index_id, mtr_t *mtr) {
1017 buf_block_t *root = btr_free_root_check(page_id, page_size, index_id, mtr);
1018
1019 if (root == nullptr) {
1020 return;
1021 }
1022
1023 btr_free_but_not_root(root, mtr->get_log_mode());
1024 btr_free_root(root, mtr);
1025 btr_free_root_invalidate(root, mtr);
1026 }
1027
1028 /** Free an index tree in a temporary tablespace.
1029 @param[in] page_id root page id
1030 @param[in] page_size page size */
btr_free(const page_id_t & page_id,const page_size_t & page_size)1031 void btr_free(const page_id_t &page_id, const page_size_t &page_size) {
1032 mtr_t mtr;
1033 mtr.start();
1034 mtr.set_log_mode(MTR_LOG_NO_REDO);
1035
1036 buf_block_t *block = buf_page_get(page_id, page_size, RW_X_LATCH, &mtr);
1037
1038 ut_ad(page_is_root(block->frame));
1039
1040 btr_free_but_not_root(block, MTR_LOG_NO_REDO);
1041 btr_free_root(block, &mtr);
1042 mtr.commit();
1043 }
1044
1045 /** Truncate an index tree. We just free all except the root.
1046 Currently, this function is only specific for clustered indexes and the only
1047 caller is DDTableBuffer which manages a table with only a clustered index.
1048 It is up to the caller to ensure atomicity and to ensure correct recovery by
1049 calling btr_truncate_recover().
1050 @param[in] index clustered index */
btr_truncate(const dict_index_t * index)1051 void btr_truncate(const dict_index_t *index) {
1052 ut_ad(index->is_clustered());
1053 ut_ad(index->next() == nullptr);
1054
1055 page_no_t root_page_no = index->page;
1056 space_id_t space_id = index->space;
1057 fil_space_t *space = fil_space_acquire(space_id);
1058
1059 if (space == nullptr) {
1060 return;
1061 }
1062
1063 page_size_t page_size(space->flags);
1064 const page_id_t page_id(space_id, root_page_no);
1065 mtr_t mtr;
1066 buf_block_t *block;
1067
1068 mtr.start();
1069
1070 mtr_x_lock(&space->latch, &mtr);
1071
1072 block = buf_page_get(page_id, page_size, RW_X_LATCH, &mtr);
1073
1074 page_t *page = buf_block_get_frame(block);
1075 ut_ad(page_is_root(page));
1076
1077 /* Mark that we are going to truncate the index tree
1078 We use PAGE_MAX_TRX_ID as it should be always 0 for clustered
1079 index. */
1080 mlog_write_ull(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), IB_ID_MAX, &mtr);
1081
1082 mtr.commit();
1083
1084 mtr.start();
1085
1086 block = buf_page_get(page_id, page_size, RW_X_LATCH, &mtr);
1087
1088 /* Free all except the root, we don't want to change it. */
1089 btr_free_but_not_root(block, MTR_LOG_ALL);
1090
1091 /* Reset the mark saying that we have finished the truncate.
1092 The PAGE_MAX_TRX_ID would be reset here. */
1093 page_create(block, &mtr, dict_table_is_comp(index->table), false);
1094 ut_ad(buf_block_get_frame(block) + (PAGE_HEADER + PAGE_MAX_TRX_ID) ==
1095 nullptr);
1096
1097 mtr.commit();
1098
1099 rw_lock_x_unlock(&space->latch);
1100
1101 fil_space_release(space);
1102 }
1103
1104 /** Recovery function for btr_truncate. We will check if there is a
1105 crash during btr_truncate, if so, do recover it, if not, do nothing.
1106 @param[in] index clustered index */
btr_truncate_recover(const dict_index_t * index)1107 void btr_truncate_recover(const dict_index_t *index) {
1108 ut_ad(index->is_clustered());
1109 ut_ad(index->next() == nullptr);
1110
1111 page_no_t root_page_no = index->page;
1112 space_id_t space_id = index->space;
1113 fil_space_t *space = fil_space_acquire(space_id);
1114
1115 if (space == nullptr) {
1116 return;
1117 }
1118
1119 page_size_t page_size(space->flags);
1120 const page_id_t page_id(space_id, root_page_no);
1121 mtr_t mtr;
1122 buf_block_t *block;
1123
1124 mtr.start();
1125
1126 block = buf_page_get(page_id, page_size, RW_X_LATCH, &mtr);
1127
1128 page_t *page = buf_block_get_frame(block);
1129
1130 trx_id_t trx_id = page_get_max_trx_id(page);
1131 ut_ad(trx_id == 0 || trx_id == IB_ID_MAX);
1132
1133 mtr.commit();
1134
1135 fil_space_release(space);
1136
1137 if (trx_id == IB_ID_MAX) {
1138 /* -1 means there is a half-done btr_truncate,
1139 we can simple redo it again. */
1140 btr_truncate(index);
1141 }
1142 }
1143 #endif /* !UNIV_HOTBACKUP */
1144
1145 /** Reorganizes an index page.
1146
1147 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1148 if this is a compressed leaf page in a secondary index. This has to
1149 be done either within the same mini-transaction, or by invoking
1150 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1151 IBUF_BITMAP_FREE is unaffected by reorganization.
1152
1153 @retval true if the operation was successful
1154 @retval false if it is a compressed page, and recompression failed */
btr_page_reorganize_low(bool recovery,ulint z_level,page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1155 bool btr_page_reorganize_low(
1156 bool recovery, /*!< in: true if called in recovery:
1157 locks should not be updated, i.e.,
1158 there cannot exist locks on the
1159 page, and a hash index should not be
1160 dropped: it cannot exist */
1161 ulint z_level, /*!< in: compression level to be used
1162 if dealing with compressed page */
1163 page_cur_t *cursor, /*!< in/out: page cursor */
1164 dict_index_t *index, /*!< in: the index tree of the page */
1165 mtr_t *mtr) /*!< in/out: mini-transaction */
1166 {
1167 buf_block_t *block = page_cur_get_block(cursor);
1168 #ifndef UNIV_HOTBACKUP
1169 buf_pool_t *buf_pool = buf_pool_from_bpage(&block->page);
1170 #endif /* !UNIV_HOTBACKUP */
1171 page_t *page = buf_block_get_frame(block);
1172 page_zip_des_t *page_zip = buf_block_get_page_zip(block);
1173 buf_block_t *temp_block;
1174 page_t *temp_page;
1175 ulint data_size1;
1176 ulint data_size2;
1177 ulint max_ins_size1;
1178 ulint max_ins_size2;
1179 bool success = false;
1180 ulint pos;
1181 bool log_compressed;
1182
1183 #ifndef UNIV_HOTBACKUP
1184 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1185 #endif /* !UNIV_HOTBACKUP */
1186 btr_assert_not_corrupted(block, index);
1187 #ifdef UNIV_ZIP_DEBUG
1188 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1189 #endif /* UNIV_ZIP_DEBUG */
1190 data_size1 = page_get_data_size(page);
1191 max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);
1192
1193 /* Turn logging off */
1194 mtr_log_t log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1195
1196 #ifndef UNIV_HOTBACKUP
1197 temp_block = buf_block_alloc(buf_pool);
1198 #else /* !UNIV_HOTBACKUP */
1199 temp_block = back_block2;
1200 #endif /* !UNIV_HOTBACKUP */
1201 temp_page = temp_block->frame;
1202
1203 #ifndef UNIV_HOTBACKUP
1204 MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
1205 #endif /* !UNIV_HOTBACKUP */
1206
1207 /* Copy the old page to temporary space */
1208 buf_frame_copy(temp_page, page);
1209
1210 #ifndef UNIV_HOTBACKUP
1211 if (!recovery) {
1212 btr_search_drop_page_hash_index(block);
1213 }
1214 #endif /* !UNIV_HOTBACKUP */
1215
1216 /* Save the cursor position. */
1217 pos = page_rec_get_n_recs_before(page_cur_get_rec(cursor));
1218
1219 /* Recreate the page: note that global data on page (possible
1220 segment headers, next page-field, etc.) is preserved intact */
1221
1222 page_create(block, mtr, dict_table_is_comp(index->table),
1223 fil_page_get_type(page));
1224
1225 /* Copy the records from the temporary space to the recreated page;
1226 do not copy the lock bits yet */
1227
1228 page_copy_rec_list_end_no_locks(block, temp_block,
1229 page_get_infimum_rec(temp_page), index, mtr);
1230
1231 /* Multiple transactions cannot simultaneously operate on the
1232 same temp-table in parallel.
1233 max_trx_id is ignored for temp tables because it not required
1234 for MVCC. */
1235 if (dict_index_is_sec_or_ibuf(index) && page_is_leaf(page) &&
1236 !index->table->is_temporary()) {
1237 /* Copy max trx id to recreated page */
1238 trx_id_t max_trx_id = page_get_max_trx_id(temp_page);
1239 page_set_max_trx_id(block, nullptr, max_trx_id, mtr);
1240 /* In crash recovery, dict_index_is_sec_or_ibuf() always
1241 holds, even for clustered indexes. max_trx_id is
1242 unused in clustered index pages. */
1243 ut_ad(max_trx_id != 0 || recovery);
1244 }
1245
1246 /* If innodb_log_compressed_pages is ON, page reorganize should log the
1247 compressed page image.*/
1248 log_compressed = page_zip && page_zip_log_pages;
1249
1250 if (log_compressed) {
1251 mtr_set_log_mode(mtr, log_mode);
1252 }
1253
1254 if (page_zip && !page_zip_compress(page_zip, page, index, z_level, mtr)) {
1255 /* Restore the old page and exit. */
1256 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1257 /* Check that the bytes that we skip are identical. */
1258 ut_a(!memcmp(page, temp_page, PAGE_HEADER));
1259 ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
1260 PAGE_HEADER + PAGE_N_RECS + temp_page,
1261 PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
1262 ut_a(!memcmp(UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + page,
1263 UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + temp_page,
1264 FIL_PAGE_DATA_END));
1265 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1266
1267 memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
1268 PAGE_N_RECS - PAGE_N_DIR_SLOTS);
1269 memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
1270 UNIV_PAGE_SIZE - PAGE_DATA - FIL_PAGE_DATA_END);
1271
1272 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1273 ut_a(!memcmp(page, temp_page, UNIV_PAGE_SIZE));
1274 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1275
1276 goto func_exit;
1277 }
1278
1279 #ifndef UNIV_HOTBACKUP
1280 /* No locks are acquried for intrinsic tables. */
1281 if (!recovery && !dict_table_is_locking_disabled(index->table)) {
1282 /* Update the record lock bitmaps */
1283 lock_move_reorganize_page(block, temp_block);
1284 }
1285 #endif /* !UNIV_HOTBACKUP */
1286
1287 data_size2 = page_get_data_size(page);
1288 max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);
1289
1290 if (data_size1 != data_size2 || max_ins_size1 != max_ins_size2) {
1291 ib::error(ER_IB_MSG_30)
1292 << "Page old data size " << data_size1 << " new data size "
1293 << data_size2 << ", page old max ins size " << max_ins_size1
1294 << " new max ins size " << max_ins_size2;
1295
1296 ib::error(ER_IB_MSG_31) << BUG_REPORT_MSG;
1297 ut_ad(0);
1298 } else {
1299 success = true;
1300 }
1301
1302 /* Restore the cursor position. */
1303 if (pos > 0) {
1304 cursor->rec = page_rec_get_nth(page, pos);
1305 } else {
1306 ut_ad(cursor->rec == page_get_infimum_rec(page));
1307 }
1308
1309 func_exit:
1310 #ifdef UNIV_ZIP_DEBUG
1311 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1312 #endif /* UNIV_ZIP_DEBUG */
1313 #ifndef UNIV_HOTBACKUP
1314 buf_block_free(temp_block);
1315 #endif /* !UNIV_HOTBACKUP */
1316
1317 /* Restore logging mode */
1318 mtr_set_log_mode(mtr, log_mode);
1319
1320 #ifndef UNIV_HOTBACKUP
1321 if (success) {
1322 mlog_id_t type;
1323 byte *log_ptr = nullptr;
1324
1325 /* Write the log record */
1326 if (page_zip) {
1327 ut_ad(page_is_comp(page));
1328 type = MLOG_ZIP_PAGE_REORGANIZE;
1329 } else if (page_is_comp(page)) {
1330 type = MLOG_COMP_PAGE_REORGANIZE;
1331 } else {
1332 type = MLOG_PAGE_REORGANIZE;
1333 }
1334
1335 bool opened = false;
1336
1337 if (!log_compressed) {
1338 opened = mlog_open_and_write_index(mtr, page, index, type,
1339 page_zip ? 1 : 0, log_ptr);
1340 }
1341
1342 /* For compressed pages write the compression level. */
1343 if (opened && page_zip) {
1344 mach_write_to_1(log_ptr, z_level);
1345 mlog_close(mtr, log_ptr + 1);
1346 }
1347
1348 MONITOR_INC(MONITOR_INDEX_REORG_SUCCESSFUL);
1349 }
1350 #endif /* !UNIV_HOTBACKUP */
1351
1352 return (success);
1353 }
1354
1355 /** Reorganizes an index page.
1356
1357 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1358 if this is a compressed leaf page in a secondary index. This has to
1359 be done either within the same mini-transaction, or by invoking
1360 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1361 IBUF_BITMAP_FREE is unaffected by reorganization.
1362
1363 @retval true if the operation was successful
1364 @retval false if it is a compressed page, and recompression failed */
btr_page_reorganize_block(bool recovery,ulint z_level,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1365 static bool btr_page_reorganize_block(
1366 bool recovery, /*!< in: true if called in recovery:
1367 locks should not be updated, i.e.,
1368 there cannot exist locks on the
1369 page, and a hash index should not be
1370 dropped: it cannot exist */
1371 ulint z_level, /*!< in: compression level to be used
1372 if dealing with compressed page */
1373 buf_block_t *block, /*!< in/out: B-tree page */
1374 dict_index_t *index, /*!< in: the index tree of the page */
1375 mtr_t *mtr) /*!< in/out: mini-transaction */
1376 {
1377 page_cur_t cur;
1378 page_cur_set_before_first(block, &cur);
1379
1380 return (btr_page_reorganize_low(recovery, z_level, &cur, index, mtr));
1381 }
1382
1383 /** Reorganizes an index page.
1384
1385 IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1386 if this is a compressed leaf page in a secondary index. This has to
1387 be done either within the same mini-transaction, or by invoking
1388 ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1389 IBUF_BITMAP_FREE is unaffected by reorganization.
1390
1391 @retval true if the operation was successful
1392 @retval false if it is a compressed page, and recompression failed */
btr_page_reorganize(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1393 bool btr_page_reorganize(
1394 page_cur_t *cursor, /*!< in/out: page cursor */
1395 dict_index_t *index, /*!< in: the index tree of the page */
1396 mtr_t *mtr) /*!< in/out: mini-transaction */
1397 {
1398 return (btr_page_reorganize_low(false, page_zip_level, cursor, index, mtr));
1399 }
1400
1401 /** Parses a redo log record of reorganizing a page.
1402 @return end of log record or NULL */
btr_parse_page_reorganize(byte * ptr,byte * end_ptr,dict_index_t * index,bool compressed,buf_block_t * block,mtr_t * mtr)1403 byte *btr_parse_page_reorganize(
1404 byte *ptr, /*!< in: buffer */
1405 byte *end_ptr, /*!< in: buffer end */
1406 dict_index_t *index, /*!< in: record descriptor */
1407 bool compressed, /*!< in: true if compressed page */
1408 buf_block_t *block, /*!< in: page to be reorganized, or NULL */
1409 mtr_t *mtr) /*!< in: mtr or NULL */
1410 {
1411 ulint level;
1412
1413 ut_ad(ptr != nullptr);
1414 ut_ad(end_ptr != nullptr);
1415 ut_ad(index != nullptr);
1416
1417 /* If dealing with a compressed page the record has the
1418 compression level used during original compression written in
1419 one byte. Otherwise record is empty. */
1420 if (compressed) {
1421 if (ptr == end_ptr) {
1422 return (nullptr);
1423 }
1424
1425 level = mach_read_from_1(ptr);
1426
1427 ut_a(level <= 9);
1428 ++ptr;
1429 } else {
1430 level = page_zip_level;
1431 }
1432
1433 if (block != nullptr) {
1434 btr_page_reorganize_block(true, level, block, index, mtr);
1435 }
1436
1437 return (ptr);
1438 }
1439
1440 #ifndef UNIV_HOTBACKUP
1441 /** Empties an index page. @see btr_page_create(). */
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1442 static void btr_page_empty(
1443 buf_block_t *block, /*!< in: page to be emptied */
1444 page_zip_des_t *page_zip, /*!< out: compressed page, or NULL */
1445 dict_index_t *index, /*!< in: index of the page */
1446 ulint level, /*!< in: the B-tree level of the page */
1447 mtr_t *mtr) /*!< in: mtr */
1448 {
1449 page_t *page = buf_block_get_frame(block);
1450
1451 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1452 ut_ad(page_zip == buf_block_get_page_zip(block));
1453 #ifdef UNIV_ZIP_DEBUG
1454 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1455 #endif /* UNIV_ZIP_DEBUG */
1456
1457 btr_search_drop_page_hash_index(block);
1458
1459 uint16_t page_type;
1460 if (dict_index_is_spatial(index)) {
1461 page_type = FIL_PAGE_RTREE;
1462 } else if (dict_index_is_sdi(index)) {
1463 page_type = FIL_PAGE_SDI;
1464 } else {
1465 page_type = FIL_PAGE_INDEX;
1466 }
1467
1468 /* Recreate the page: note that global data on page (possible
1469 segment headers, next page-field, etc.) is preserved intact */
1470
1471 if (page_zip) {
1472 page_create_zip(block, index, level, 0, mtr, page_type);
1473 } else {
1474 page_create(block, mtr, dict_table_is_comp(index->table), page_type);
1475 btr_page_set_level(page, nullptr, level, mtr);
1476 }
1477 }
1478
1479 /** Makes tree one level higher by splitting the root, and inserts
1480 the tuple. It is assumed that mtr contains an x-latch on the tree.
1481 NOTE that the operation of this function must always succeed,
1482 we cannot reverse it: therefore enough free disk space must be
1483 guaranteed to be available before this function is called.
1484 @return inserted record */
btr_root_raise_and_insert(uint32_t flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,mtr_t * mtr)1485 rec_t *btr_root_raise_and_insert(
1486 uint32_t flags, /*!< in: undo logging and locking flags */
1487 btr_cur_t *cursor, /*!< in: cursor at which to insert: must be
1488 on the root page; when the function returns,
1489 the cursor is positioned on the predecessor
1490 of the inserted record */
1491 ulint **offsets, /*!< out: offsets on inserted record */
1492 mem_heap_t **heap, /*!< in/out: pointer to memory heap, or NULL */
1493 const dtuple_t *tuple, /*!< in: tuple to insert */
1494 mtr_t *mtr) /*!< in: mtr */
1495 {
1496 dict_index_t *index;
1497 page_t *root;
1498 page_t *new_page;
1499 page_no_t new_page_no;
1500 rec_t *rec;
1501 dtuple_t *node_ptr;
1502 ulint level;
1503 rec_t *node_ptr_rec;
1504 page_cur_t *page_cursor;
1505 page_zip_des_t *root_page_zip;
1506 page_zip_des_t *new_page_zip;
1507 buf_block_t *root_block;
1508 buf_block_t *new_block;
1509
1510 root = btr_cur_get_page(cursor);
1511 root_block = btr_cur_get_block(cursor);
1512 root_page_zip = buf_block_get_page_zip(root_block);
1513 ut_ad(!page_is_empty(root));
1514 index = btr_cur_get_index(cursor);
1515 #ifdef UNIV_ZIP_DEBUG
1516 ut_a(!root_page_zip || page_zip_validate(root_page_zip, root, index));
1517 #endif /* UNIV_ZIP_DEBUG */
1518 #ifdef UNIV_BTR_DEBUG
1519 if (!dict_index_is_ibuf(index)) {
1520 space_id_t space_id = dict_index_get_space(index);
1521
1522 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + root,
1523 space_id));
1524 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
1525 space_id));
1526 }
1527
1528 ut_a(dict_index_get_page(index) == page_get_page_no(root));
1529 #endif /* UNIV_BTR_DEBUG */
1530 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
1531 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
1532 index->table->is_intrinsic());
1533 ut_ad(mtr_is_block_fix(mtr, root_block, MTR_MEMO_PAGE_X_FIX, index->table));
1534
1535 /* Allocate a new page to the tree. Root splitting is done by first
1536 moving the root records to the new page, emptying the root, putting
1537 a node pointer to the new page, and then splitting the new page. */
1538
1539 level = btr_page_get_level(root, mtr);
1540
1541 new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1542
1543 new_page = buf_block_get_frame(new_block);
1544 new_page_zip = buf_block_get_page_zip(new_block);
1545 ut_a(!new_page_zip == !root_page_zip);
1546 ut_a(!new_page_zip ||
1547 page_zip_get_size(new_page_zip) == page_zip_get_size(root_page_zip));
1548
1549 btr_page_create(new_block, new_page_zip, index, level, mtr);
1550
1551 /* Set the next node and previous node fields of new page */
1552 btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
1553 btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
1554
1555 /* Copy the records from root to the new page one by one. */
1556
1557 if (false
1558 #ifdef UNIV_ZIP_COPY
1559 || new_page_zip
1560 #endif /* UNIV_ZIP_COPY */
1561 || !page_copy_rec_list_end(new_block, root_block,
1562 page_get_infimum_rec(root), index, mtr)) {
1563 ut_a(new_page_zip);
1564
1565 /* Copy the page byte for byte. */
1566 page_zip_copy_recs(new_page_zip, new_page, root_page_zip, root, index, mtr);
1567
1568 /* Update the lock table and possible hash index. */
1569
1570 if (!dict_table_is_locking_disabled(index->table)) {
1571 lock_move_rec_list_end(new_block, root_block, page_get_infimum_rec(root));
1572 }
1573
1574 /* Move any existing predicate locks */
1575 if (dict_index_is_spatial(index)) {
1576 lock_prdt_rec_move(new_block, root_block);
1577 }
1578
1579 btr_search_move_or_delete_hash_entries(new_block, root_block, index);
1580 }
1581
1582 /* If this is a pessimistic insert which is actually done to
1583 perform a pessimistic update then we have stored the lock
1584 information of the record to be inserted on the infimum of the
1585 root page: we cannot discard the lock structs on the root page */
1586
1587 if (!dict_table_is_locking_disabled(index->table)) {
1588 lock_update_root_raise(new_block, root_block);
1589 }
1590
1591 /* Create a memory heap where the node pointer is stored */
1592 if (!*heap) {
1593 *heap = mem_heap_create(1000);
1594 }
1595
1596 rec = page_rec_get_next(page_get_infimum_rec(new_page));
1597 new_page_no = new_block->page.id.page_no();
1598
1599 /* Build the node pointer (= node key and page address) for the
1600 child */
1601 if (dict_index_is_spatial(index)) {
1602 rtr_mbr_t new_mbr;
1603
1604 rtr_page_cal_mbr(index, new_block, &new_mbr, *heap);
1605 node_ptr = rtr_index_build_node_ptr(index, &new_mbr, rec, new_page_no,
1606 *heap, level);
1607 } else {
1608 node_ptr = dict_index_build_node_ptr(index, rec, new_page_no, *heap, level);
1609 }
1610 /* The node pointer must be marked as the predefined minimum record,
1611 as there is no lower alphabetical limit to records in the leftmost
1612 node of a level: */
1613 dtuple_set_info_bits(node_ptr,
1614 dtuple_get_info_bits(node_ptr) | REC_INFO_MIN_REC_FLAG);
1615
1616 /* Rebuild the root page to get free space */
1617 btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
1618
1619 /* Set the next node and previous node fields, although
1620 they should already have been set. The previous node field
1621 must be FIL_NULL if root_page_zip != NULL, because the
1622 REC_INFO_MIN_REC_FLAG (of the first user record) will be
1623 set if and only if btr_page_get_prev() == FIL_NULL. */
1624 btr_page_set_next(root, root_page_zip, FIL_NULL, mtr);
1625 btr_page_set_prev(root, root_page_zip, FIL_NULL, mtr);
1626
1627 page_cursor = btr_cur_get_page_cur(cursor);
1628
1629 /* Insert node pointer to the root */
1630
1631 page_cur_set_before_first(root_block, page_cursor);
1632
1633 node_ptr_rec =
1634 page_cur_tuple_insert(page_cursor, node_ptr, index, offsets, heap, mtr);
1635
1636 /* The root page should only contain the node pointer
1637 to new_page at this point. Thus, the data should fit. */
1638 ut_a(node_ptr_rec);
1639
1640 /* We play safe and reset the free bits for the new page */
1641
1642 if (!index->is_clustered() && !index->table->is_temporary()) {
1643 ibuf_reset_free_bits(new_block);
1644 }
1645
1646 /* Reposition the cursor to the child node */
1647 page_cur_search(new_block, index, tuple, page_cursor);
1648
1649 /* Split the child and insert tuple */
1650 if (dict_index_is_spatial(index)) {
1651 /* Split rtree page and insert tuple */
1652 return (
1653 rtr_page_split_and_insert(flags, cursor, offsets, heap, tuple, mtr));
1654 } else {
1655 return (
1656 btr_page_split_and_insert(flags, cursor, offsets, heap, tuple, mtr));
1657 }
1658 }
1659
1660 /** Decides if the page should be split at the convergence point of inserts
1661 converging to the left.
1662 @return true if split recommended */
btr_page_get_split_rec_to_left(btr_cur_t * cursor,rec_t ** split_rec)1663 ibool btr_page_get_split_rec_to_left(
1664 btr_cur_t *cursor, /*!< in: cursor at which to insert */
1665 rec_t **split_rec) /*!< out: if split recommended,
1666 the first record on upper half page,
1667 or NULL if tuple to be inserted should
1668 be first */
1669 {
1670 page_t *page;
1671 rec_t *insert_point;
1672 rec_t *infimum;
1673
1674 page = btr_cur_get_page(cursor);
1675 insert_point = btr_cur_get_rec(cursor);
1676
1677 if (page_header_get_ptr(page, PAGE_LAST_INSERT) ==
1678 page_rec_get_next(insert_point)) {
1679 infimum = page_get_infimum_rec(page);
1680
1681 /* If the convergence is in the middle of a page, include also
1682 the record immediately before the new insert to the upper
1683 page. Otherwise, we could repeatedly move from page to page
1684 lots of records smaller than the convergence point. */
1685
1686 if (infimum != insert_point && page_rec_get_next(infimum) != insert_point) {
1687 *split_rec = insert_point;
1688 } else {
1689 *split_rec = page_rec_get_next(insert_point);
1690 }
1691
1692 return (TRUE);
1693 }
1694
1695 return (FALSE);
1696 }
1697
1698 /** Decides if the page should be split at the convergence point of inserts
1699 converging to the right.
1700 @return true if split recommended */
btr_page_get_split_rec_to_right(btr_cur_t * cursor,rec_t ** split_rec)1701 ibool btr_page_get_split_rec_to_right(
1702 btr_cur_t *cursor, /*!< in: cursor at which to insert */
1703 rec_t **split_rec) /*!< out: if split recommended,
1704 the first record on upper half page,
1705 or NULL if tuple to be inserted should
1706 be first */
1707 {
1708 page_t *page;
1709 rec_t *insert_point;
1710
1711 page = btr_cur_get_page(cursor);
1712 insert_point = btr_cur_get_rec(cursor);
1713
1714 /* We use eager heuristics: if the new insert would be right after
1715 the previous insert on the same page, we assume that there is a
1716 pattern of sequential inserts here. */
1717
1718 if (page_header_get_ptr(page, PAGE_LAST_INSERT) == insert_point) {
1719 rec_t *next_rec;
1720
1721 next_rec = page_rec_get_next(insert_point);
1722
1723 if (page_rec_is_supremum(next_rec)) {
1724 split_at_new:
1725 /* Split at the new record to insert */
1726 *split_rec = nullptr;
1727 } else {
1728 rec_t *next_next_rec = page_rec_get_next(next_rec);
1729 if (page_rec_is_supremum(next_next_rec)) {
1730 goto split_at_new;
1731 }
1732
1733 /* If there are >= 2 user records up from the insert
1734 point, split all but 1 off. We want to keep one because
1735 then sequential inserts can use the adaptive hash
1736 index, as they can do the necessary checks of the right
1737 search position just by looking at the records on this
1738 page. */
1739
1740 *split_rec = next_next_rec;
1741 }
1742
1743 return (TRUE);
1744 }
1745
1746 return (FALSE);
1747 }
1748
1749 /** Calculates a split record such that the tuple will certainly fit on
1750 its half-page when the split is performed. We assume in this function
1751 only that the cursor page has at least one user record.
1752 @return split record, or NULL if tuple will be the first record on
1753 the lower or upper half-page (determined by btr_page_tuple_smaller()) */
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple)1754 static rec_t *btr_page_get_split_rec(
1755 btr_cur_t *cursor, /*!< in: cursor at which insert should be made */
1756 const dtuple_t *tuple) /*!< in: tuple to insert */
1757 {
1758 page_t *page;
1759 page_zip_des_t *page_zip;
1760 ulint insert_size;
1761 ulint free_space;
1762 ulint total_data;
1763 ulint total_n_recs;
1764 ulint total_space;
1765 ulint incl_data;
1766 rec_t *ins_rec;
1767 rec_t *rec;
1768 rec_t *next_rec;
1769 ulint n;
1770 mem_heap_t *heap;
1771 ulint *offsets;
1772
1773 page = btr_cur_get_page(cursor);
1774
1775 insert_size = rec_get_converted_size(cursor->index, tuple);
1776 free_space = page_get_free_space_of_empty(page_is_comp(page));
1777
1778 page_zip = btr_cur_get_page_zip(cursor);
1779 if (page_zip) {
1780 /* Estimate the free space of an empty compressed page. */
1781 ulint free_space_zip = page_zip_empty_size(cursor->index->n_fields,
1782 page_zip_get_size(page_zip));
1783
1784 if (free_space > (ulint)free_space_zip) {
1785 free_space = (ulint)free_space_zip;
1786 }
1787 }
1788
1789 /* free_space is now the free space of a created new page */
1790
1791 total_data = page_get_data_size(page) + insert_size;
1792 total_n_recs = page_get_n_recs(page) + 1;
1793 ut_ad(total_n_recs >= 2);
1794 total_space = total_data + page_dir_calc_reserved_space(total_n_recs);
1795
1796 n = 0;
1797 incl_data = 0;
1798 ins_rec = btr_cur_get_rec(cursor);
1799 rec = page_get_infimum_rec(page);
1800
1801 heap = nullptr;
1802 offsets = nullptr;
1803
1804 /* We start to include records to the left half, and when the
1805 space reserved by them exceeds half of total_space, then if
1806 the included records fit on the left page, they will be put there
1807 if something was left over also for the right page,
1808 otherwise the last included record will be the first on the right
1809 half page */
1810
1811 do {
1812 /* Decide the next record to include */
1813 if (rec == ins_rec) {
1814 rec = nullptr; /* NULL denotes that tuple is
1815 now included */
1816 } else if (rec == nullptr) {
1817 rec = page_rec_get_next(ins_rec);
1818 } else {
1819 rec = page_rec_get_next(rec);
1820 }
1821
1822 if (rec == nullptr) {
1823 /* Include tuple */
1824 incl_data += insert_size;
1825 } else {
1826 offsets =
1827 rec_get_offsets(rec, cursor->index, offsets, ULINT_UNDEFINED, &heap);
1828 incl_data += rec_offs_size(offsets);
1829 }
1830
1831 n++;
1832 } while (incl_data + page_dir_calc_reserved_space(n) < total_space / 2);
1833
1834 if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
1835 /* The next record will be the first on
1836 the right half page if it is not the
1837 supremum record of page */
1838
1839 if (rec == ins_rec) {
1840 rec = nullptr;
1841
1842 goto func_exit;
1843 } else if (rec == nullptr) {
1844 next_rec = page_rec_get_next(ins_rec);
1845 } else {
1846 next_rec = page_rec_get_next(rec);
1847 }
1848 ut_ad(next_rec);
1849 if (!page_rec_is_supremum(next_rec)) {
1850 rec = next_rec;
1851 }
1852 }
1853
1854 func_exit:
1855 if (heap) {
1856 mem_heap_free(heap);
1857 }
1858 return (rec);
1859 }
1860
1861 /** Returns TRUE if the insert fits on the appropriate half-page with the
1862 chosen split_rec.
1863 @return true if fits */
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,ulint ** offsets,const dtuple_t * tuple,mem_heap_t ** heap)1864 static MY_ATTRIBUTE((warn_unused_result)) bool btr_page_insert_fits(
1865 btr_cur_t *cursor, /*!< in: cursor at which insert
1866 should be made */
1867 const rec_t *split_rec, /*!< in: suggestion for first record
1868 on upper half-page, or NULL if
1869 tuple to be inserted should be first */
1870 ulint **offsets, /*!< in: rec_get_offsets(
1871 split_rec, cursor->index); out: garbage */
1872 const dtuple_t *tuple, /*!< in: tuple to insert */
1873 mem_heap_t **heap) /*!< in: temporary memory heap */
1874 {
1875 page_t *page;
1876 ulint insert_size;
1877 ulint free_space;
1878 ulint total_data;
1879 ulint total_n_recs;
1880 const rec_t *rec;
1881 const rec_t *end_rec;
1882
1883 page = btr_cur_get_page(cursor);
1884
1885 ut_ad(!split_rec || !page_is_comp(page) == !rec_offs_comp(*offsets));
1886 ut_ad(!split_rec || rec_offs_validate(split_rec, cursor->index, *offsets));
1887
1888 insert_size = rec_get_converted_size(cursor->index, tuple);
1889 free_space = page_get_free_space_of_empty(page_is_comp(page));
1890
1891 /* free_space is now the free space of a created new page */
1892
1893 total_data = page_get_data_size(page) + insert_size;
1894 total_n_recs = page_get_n_recs(page) + 1;
1895
1896 /* We determine which records (from rec to end_rec, not including
1897 end_rec) will end up on the other half page from tuple when it is
1898 inserted. */
1899
1900 if (split_rec == nullptr) {
1901 rec = page_rec_get_next(page_get_infimum_rec(page));
1902 end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
1903
1904 } else if (cmp_dtuple_rec(tuple, split_rec, cursor->index, *offsets) >= 0) {
1905 rec = page_rec_get_next(page_get_infimum_rec(page));
1906 end_rec = split_rec;
1907 } else {
1908 rec = split_rec;
1909 end_rec = page_get_supremum_rec(page);
1910 }
1911
1912 if (total_data + page_dir_calc_reserved_space(total_n_recs) <= free_space) {
1913 /* Ok, there will be enough available space on the
1914 half page where the tuple is inserted */
1915
1916 return (true);
1917 }
1918
1919 while (rec != end_rec) {
1920 /* In this loop we calculate the amount of reserved
1921 space after rec is removed from page. */
1922
1923 *offsets =
1924 rec_get_offsets(rec, cursor->index, *offsets, ULINT_UNDEFINED, heap);
1925
1926 total_data -= rec_offs_size(*offsets);
1927 total_n_recs--;
1928
1929 if (total_data + page_dir_calc_reserved_space(total_n_recs) <= free_space) {
1930 /* Ok, there will be enough available space on the
1931 half page where the tuple is inserted */
1932
1933 return (true);
1934 }
1935
1936 rec = page_rec_get_next_const(rec);
1937 }
1938
1939 return (false);
1940 }
1941
1942 /** Inserts a data tuple to a tree on a non-leaf level. It is assumed
1943 that mtr holds an x-latch on the tree. */
btr_insert_on_non_leaf_level_func(uint32_t flags,dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,ulint line,mtr_t * mtr)1944 void btr_insert_on_non_leaf_level_func(
1945 uint32_t flags, /*!< in: undo logging and locking flags */
1946 dict_index_t *index, /*!< in: index */
1947 ulint level, /*!< in: level, must be > 0 */
1948 dtuple_t *tuple, /*!< in: the record to be inserted */
1949 const char *file, /*!< in: file name */
1950 ulint line, /*!< in: line where called */
1951 mtr_t *mtr) /*!< in: mtr */
1952 {
1953 big_rec_t *dummy_big_rec;
1954 btr_cur_t cursor;
1955 dberr_t err;
1956 rec_t *rec;
1957 mem_heap_t *heap = nullptr;
1958 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1959 ulint *offsets = offsets_;
1960 rec_offs_init(offsets_);
1961 rtr_info_t rtr_info;
1962
1963 ut_ad(level > 0);
1964
1965 if (!dict_index_is_spatial(index)) {
1966 if (index->table->is_intrinsic()) {
1967 btr_cur_search_to_nth_level_with_no_latch(
1968 index, level, tuple, PAGE_CUR_LE, &cursor, __FILE__, __LINE__, mtr);
1969 } else {
1970 btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_LE,
1971 BTR_CONT_MODIFY_TREE, &cursor, 0, file, line,
1972 mtr);
1973 }
1974 } else {
1975 /* For spatial index, initialize structures to track
1976 its parents etc. */
1977 rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
1978
1979 rtr_info_update_btr(&cursor, &rtr_info);
1980
1981 btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_RTREE_INSERT,
1982 BTR_CONT_MODIFY_TREE, &cursor, 0, file, line,
1983 mtr);
1984 }
1985
1986 ut_ad(cursor.flag == BTR_CUR_BINARY);
1987
1988 err = btr_cur_optimistic_insert(
1989 flags | BTR_NO_LOCKING_FLAG | BTR_KEEP_SYS_FLAG | BTR_NO_UNDO_LOG_FLAG,
1990 &cursor, &offsets, &heap, tuple, &rec, &dummy_big_rec, nullptr, mtr);
1991
1992 if (err == DB_FAIL) {
1993 err = btr_cur_pessimistic_insert(
1994 flags | BTR_NO_LOCKING_FLAG | BTR_KEEP_SYS_FLAG | BTR_NO_UNDO_LOG_FLAG,
1995 &cursor, &offsets, &heap, tuple, &rec, &dummy_big_rec, nullptr, mtr);
1996 ut_a(err == DB_SUCCESS);
1997 }
1998
1999 if (heap != nullptr) {
2000 mem_heap_free(heap);
2001 }
2002
2003 if (dict_index_is_spatial(index)) {
2004 ut_ad(cursor.rtr_info);
2005
2006 rtr_clean_rtr_info(&rtr_info, true);
2007 }
2008 }
2009
2010 /** Attaches the halves of an index page on the appropriate level in an
2011 index tree. */
btr_attach_half_pages(uint32_t flags,dict_index_t * index,buf_block_t * block,const rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2012 static void btr_attach_half_pages(
2013 uint32_t flags, /*!< in: undo logging and
2014 locking flags */
2015 dict_index_t *index, /*!< in: the index tree */
2016 buf_block_t *block, /*!< in/out: page to be split */
2017 const rec_t *split_rec, /*!< in: first record on upper
2018 half page */
2019 buf_block_t *new_block, /*!< in/out: the new half page */
2020 ulint direction, /*!< in: FSP_UP or FSP_DOWN */
2021 mtr_t *mtr) /*!< in: mtr */
2022 {
2023 page_no_t prev_page_no;
2024 page_no_t next_page_no;
2025 ulint level;
2026 page_t *page = buf_block_get_frame(block);
2027 page_t *lower_page;
2028 page_t *upper_page;
2029 page_no_t lower_page_no;
2030 page_no_t upper_page_no;
2031 page_zip_des_t *lower_page_zip;
2032 page_zip_des_t *upper_page_zip;
2033 dtuple_t *node_ptr_upper;
2034 mem_heap_t *heap;
2035 buf_block_t *prev_block = nullptr;
2036 buf_block_t *next_block = nullptr;
2037
2038 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2039 ut_ad(mtr_is_block_fix(mtr, new_block, MTR_MEMO_PAGE_X_FIX, index->table));
2040
2041 /* Create a memory heap where the data tuple is stored */
2042 heap = mem_heap_create(1024);
2043
2044 /* Based on split direction, decide upper and lower pages */
2045 if (direction == FSP_DOWN) {
2046 btr_cur_t cursor;
2047 ulint *offsets;
2048
2049 lower_page = buf_block_get_frame(new_block);
2050 lower_page_no = new_block->page.id.page_no();
2051 lower_page_zip = buf_block_get_page_zip(new_block);
2052 upper_page = buf_block_get_frame(block);
2053 upper_page_no = block->page.id.page_no();
2054 upper_page_zip = buf_block_get_page_zip(block);
2055
2056 /* Look up the index for the node pointer to page */
2057 offsets =
2058 btr_page_get_father_block(nullptr, heap, index, block, mtr, &cursor);
2059
2060 /* Replace the address of the old child node (= page) with the
2061 address of the new lower half */
2062
2063 btr_node_ptr_set_child_page_no(btr_cur_get_rec(&cursor),
2064 btr_cur_get_page_zip(&cursor), offsets,
2065 lower_page_no, mtr);
2066 mem_heap_empty(heap);
2067 } else {
2068 lower_page = buf_block_get_frame(block);
2069 lower_page_no = block->page.id.page_no();
2070 lower_page_zip = buf_block_get_page_zip(block);
2071 upper_page = buf_block_get_frame(new_block);
2072 upper_page_no = new_block->page.id.page_no();
2073 upper_page_zip = buf_block_get_page_zip(new_block);
2074 }
2075
2076 /* Get the previous and next pages of page */
2077 prev_page_no = btr_page_get_prev(page, mtr);
2078 next_page_no = btr_page_get_next(page, mtr);
2079
2080 const space_id_t space = block->page.id.space();
2081
2082 /* for consistency, both blocks should be locked, before change */
2083 if (prev_page_no != FIL_NULL && direction == FSP_DOWN) {
2084 prev_block = btr_block_get(page_id_t(space, prev_page_no), block->page.size,
2085 RW_X_LATCH, index, mtr);
2086 }
2087 if (next_page_no != FIL_NULL && direction != FSP_DOWN) {
2088 next_block = btr_block_get(page_id_t(space, next_page_no), block->page.size,
2089 RW_X_LATCH, index, mtr);
2090 }
2091
2092 /* Get the level of the split pages */
2093 level = btr_page_get_level(buf_block_get_frame(block), mtr);
2094 ut_ad(level == btr_page_get_level(buf_block_get_frame(new_block), mtr));
2095
2096 /* Build the node pointer (= node key and page address) for the upper
2097 half */
2098
2099 node_ptr_upper =
2100 dict_index_build_node_ptr(index, split_rec, upper_page_no, heap, level);
2101
2102 /* Insert it next to the pointer to the lower half. Note that this
2103 may generate recursion leading to a split on the higher level. */
2104
2105 btr_insert_on_non_leaf_level(flags, index, level + 1, node_ptr_upper, mtr);
2106
2107 /* Free the memory heap */
2108 mem_heap_free(heap);
2109
2110 /* Update page links of the level */
2111
2112 if (prev_block) {
2113 #ifdef UNIV_BTR_DEBUG
2114 ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
2115 ut_a(btr_page_get_next(prev_block->frame, mtr) == block->page.id.page_no());
2116 #endif /* UNIV_BTR_DEBUG */
2117
2118 btr_page_set_next(buf_block_get_frame(prev_block),
2119 buf_block_get_page_zip(prev_block), lower_page_no, mtr);
2120 }
2121
2122 if (next_block) {
2123 #ifdef UNIV_BTR_DEBUG
2124 ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
2125 ut_a(btr_page_get_prev(next_block->frame, mtr) == page_get_page_no(page));
2126 #endif /* UNIV_BTR_DEBUG */
2127
2128 btr_page_set_prev(buf_block_get_frame(next_block),
2129 buf_block_get_page_zip(next_block), upper_page_no, mtr);
2130 }
2131
2132 if (direction == FSP_DOWN) {
2133 /* lower_page is new */
2134 btr_page_set_prev(lower_page, lower_page_zip, prev_page_no, mtr);
2135 } else {
2136 ut_ad(btr_page_get_prev(lower_page, mtr) == prev_page_no);
2137 }
2138
2139 btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
2140 btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
2141
2142 if (direction != FSP_DOWN) {
2143 /* upper_page is new */
2144 btr_page_set_next(upper_page, upper_page_zip, next_page_no, mtr);
2145 } else {
2146 ut_ad(btr_page_get_next(upper_page, mtr) == next_page_no);
2147 }
2148 }
2149
2150 /** Determine if a tuple is smaller than any record on the page.
2151 @return true if smaller */
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,ulint ** offsets,ulint n_uniq,mem_heap_t ** heap)2152 static MY_ATTRIBUTE((warn_unused_result)) bool btr_page_tuple_smaller(
2153 btr_cur_t *cursor, /*!< in: b-tree cursor */
2154 const dtuple_t *tuple, /*!< in: tuple to consider */
2155 ulint **offsets, /*!< in/out: temporary storage */
2156 ulint n_uniq, /*!< in: number of unique fields
2157 in the index page records */
2158 mem_heap_t **heap) /*!< in/out: heap for offsets */
2159 {
2160 buf_block_t *block;
2161 const rec_t *first_rec;
2162 page_cur_t pcur;
2163
2164 /* Read the first user record in the page. */
2165 block = btr_cur_get_block(cursor);
2166 page_cur_set_before_first(block, &pcur);
2167 page_cur_move_to_next(&pcur);
2168 first_rec = page_cur_get_rec(&pcur);
2169
2170 *offsets = rec_get_offsets(first_rec, cursor->index, *offsets, n_uniq, heap);
2171
2172 return (cmp_dtuple_rec(tuple, first_rec, cursor->index, *offsets) < 0);
2173 }
2174
2175 /** Insert the tuple into the right sibling page, if the cursor is at the end
2176 of a page.
2177 @param[in] flags undo logging and locking flags
2178 @param[in,out] cursor cursor at which to insert; when the function succeeds,
2179 the cursor is positioned before the insert point.
2180 @param[out] offsets offsets on inserted record
2181 @param[in,out] heap memory heap for allocating offsets
2182 @param[in] tuple tuple to insert
2183 @param[in,out] mtr mini-transaction
2184 @return inserted record (first record on the right sibling page);
2185 the cursor will be positioned on the page infimum
2186 @retval NULL if the operation was not performed */
btr_insert_into_right_sibling(uint32_t flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * heap,const dtuple_t * tuple,mtr_t * mtr)2187 static rec_t *btr_insert_into_right_sibling(uint32_t flags, btr_cur_t *cursor,
2188 ulint **offsets, mem_heap_t *heap,
2189 const dtuple_t *tuple, mtr_t *mtr) {
2190 buf_block_t *block = btr_cur_get_block(cursor);
2191 page_t *page = buf_block_get_frame(block);
2192 page_no_t next_page_no = btr_page_get_next(page, mtr);
2193
2194 ut_ad(cursor->index->table->is_intrinsic() ||
2195 mtr_memo_contains_flagged(mtr, dict_index_get_lock(cursor->index),
2196 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2197 ut_ad(
2198 mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, cursor->index->table));
2199 ut_ad(heap);
2200
2201 if (next_page_no == FIL_NULL ||
2202 !page_rec_is_supremum(page_rec_get_next(btr_cur_get_rec(cursor)))) {
2203 return (nullptr);
2204 }
2205
2206 page_cur_t next_page_cursor;
2207 buf_block_t *next_block;
2208 page_t *next_page;
2209 btr_cur_t next_father_cursor;
2210 rec_t *rec = nullptr;
2211 ulint max_size;
2212
2213 const space_id_t space = block->page.id.space();
2214
2215 next_block = btr_block_get(page_id_t(space, next_page_no), block->page.size,
2216 RW_X_LATCH, cursor->index, mtr);
2217 next_page = buf_block_get_frame(next_block);
2218
2219 bool is_leaf = page_is_leaf(next_page);
2220
2221 btr_page_get_father(cursor->index, next_block, mtr, &next_father_cursor);
2222
2223 page_cur_search(next_block, cursor->index, tuple, PAGE_CUR_LE,
2224 &next_page_cursor);
2225
2226 max_size = page_get_max_insert_size_after_reorganize(next_page, 1);
2227
2228 /* Extends gap lock for the next page */
2229 if (!dict_table_is_locking_disabled(cursor->index->table)) {
2230 lock_update_split_left(next_block, block);
2231 }
2232
2233 rec = page_cur_tuple_insert(&next_page_cursor, tuple, cursor->index, offsets,
2234 &heap, mtr);
2235
2236 if (rec == nullptr) {
2237 if (is_leaf && next_block->page.size.is_compressed() &&
2238 !cursor->index->is_clustered() &&
2239 !cursor->index->table->is_temporary()) {
2240 /* Reset the IBUF_BITMAP_FREE bits, because
2241 page_cur_tuple_insert() will have attempted page
2242 reorganize before failing. */
2243 ibuf_reset_free_bits(next_block);
2244 }
2245 return (nullptr);
2246 }
2247
2248 ibool compressed;
2249 dberr_t err;
2250 ulint level = btr_page_get_level(next_page, mtr);
2251
2252 /* adjust cursor position */
2253 *btr_cur_get_page_cur(cursor) = next_page_cursor;
2254
2255 ut_ad(btr_cur_get_rec(cursor) == page_get_infimum_rec(next_page));
2256 ut_ad(page_rec_get_next(page_get_infimum_rec(next_page)) == rec);
2257
2258 /* We have to change the parent node pointer */
2259
2260 compressed = btr_cur_pessimistic_delete(&err, TRUE, &next_father_cursor,
2261 BTR_CREATE_FLAG, false, 0, 0, 0, mtr);
2262
2263 ut_a(err == DB_SUCCESS);
2264
2265 if (!compressed) {
2266 btr_cur_compress_if_useful(&next_father_cursor, FALSE, mtr);
2267 }
2268
2269 dtuple_t *node_ptr = dict_index_build_node_ptr(
2270 cursor->index, rec, next_block->page.id.page_no(), heap, level);
2271
2272 btr_insert_on_non_leaf_level(flags, cursor->index, level + 1, node_ptr, mtr);
2273
2274 ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2275
2276 if (is_leaf && !cursor->index->is_clustered() &&
2277 !cursor->index->table->is_temporary()) {
2278 /* Update the free bits of the B-tree page in the
2279 insert buffer bitmap. */
2280
2281 if (next_block->page.size.is_compressed()) {
2282 ibuf_update_free_bits_zip(next_block, mtr);
2283 } else {
2284 ibuf_update_free_bits_if_full(
2285 next_block, max_size, rec_offs_size(*offsets) + PAGE_DIR_SLOT_SIZE);
2286 }
2287 }
2288
2289 return (rec);
2290 }
2291
2292 /** Splits an index page to halves and inserts the tuple. It is assumed
2293 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2294 released within this function! NOTE that the operation of this
2295 function must always succeed, we cannot reverse it: therefore enough
2296 free disk space (2 pages) must be guaranteed to be available before
2297 this function is called.
2298 @return inserted record */
btr_page_split_and_insert(uint32_t flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,mtr_t * mtr)2299 rec_t *btr_page_split_and_insert(
2300 uint32_t flags, /*!< in: undo logging and locking flags */
2301 btr_cur_t *cursor, /*!< in: cursor at which to insert; when the
2302 function returns, the cursor is positioned
2303 on the predecessor of the inserted record */
2304 ulint **offsets, /*!< out: offsets on inserted record */
2305 mem_heap_t **heap, /*!< in/out: pointer to memory heap, or NULL */
2306 const dtuple_t *tuple, /*!< in: tuple to insert */
2307 mtr_t *mtr) /*!< in: mtr */
2308 {
2309 buf_block_t *block;
2310 page_t *page;
2311 page_zip_des_t *page_zip;
2312 page_no_t page_no;
2313 byte direction;
2314 page_no_t hint_page_no;
2315 buf_block_t *new_block;
2316 page_t *new_page;
2317 page_zip_des_t *new_page_zip;
2318 rec_t *split_rec;
2319 buf_block_t *left_block;
2320 buf_block_t *right_block;
2321 buf_block_t *insert_block;
2322 page_cur_t *page_cursor;
2323 rec_t *first_rec;
2324 byte *buf = nullptr; /* remove warning */
2325 rec_t *move_limit;
2326 ibool insert_will_fit;
2327 ibool insert_left;
2328 ulint n_iterations = 0;
2329 rec_t *rec;
2330 ulint n_uniq;
2331 dict_index_t *index;
2332
2333 index = btr_cur_get_index(cursor);
2334
2335 if (dict_index_is_spatial(index)) {
2336 /* Split rtree page and update parent */
2337 return (
2338 rtr_page_split_and_insert(flags, cursor, offsets, heap, tuple, mtr));
2339 }
2340
2341 if (!*heap) {
2342 *heap = mem_heap_create(1024);
2343 }
2344 n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
2345 func_start:
2346 ut_ad(tuple->m_heap != *heap);
2347 mem_heap_empty(*heap);
2348 *offsets = nullptr;
2349
2350 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(cursor->index),
2351 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
2352 cursor->index->table->is_intrinsic());
2353 ut_ad(!dict_index_is_online_ddl(cursor->index) || (flags & BTR_CREATE_FLAG) ||
2354 cursor->index->is_clustered());
2355 ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
2356 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX) ||
2357 cursor->index->table->is_intrinsic());
2358
2359 block = btr_cur_get_block(cursor);
2360 page = buf_block_get_frame(block);
2361 page_zip = buf_block_get_page_zip(block);
2362
2363 ut_ad(
2364 mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, cursor->index->table));
2365 ut_ad(!page_is_empty(page));
2366
2367 /* try to insert to the next page if possible before split */
2368 rec =
2369 btr_insert_into_right_sibling(flags, cursor, offsets, *heap, tuple, mtr);
2370
2371 if (rec != nullptr) {
2372 return (rec);
2373 }
2374
2375 page_no = block->page.id.page_no();
2376
2377 /* 1. Decide the split record; split_rec == NULL means that the
2378 tuple to be inserted should be the first record on the upper
2379 half-page */
2380 insert_left = FALSE;
2381
2382 if (n_iterations > 0) {
2383 direction = FSP_UP;
2384 hint_page_no = page_no + 1;
2385 split_rec = btr_page_get_split_rec(cursor, tuple);
2386
2387 if (split_rec == nullptr) {
2388 insert_left =
2389 btr_page_tuple_smaller(cursor, tuple, offsets, n_uniq, heap);
2390 }
2391 } else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
2392 direction = FSP_UP;
2393 hint_page_no = page_no + 1;
2394
2395 } else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) {
2396 direction = FSP_DOWN;
2397 hint_page_no = page_no - 1;
2398 ut_ad(split_rec);
2399 } else {
2400 direction = FSP_UP;
2401 hint_page_no = page_no + 1;
2402
2403 /* If there is only one record in the index page, we
2404 can't split the node in the middle by default. We need
2405 to determine whether the new record will be inserted
2406 to the left or right. */
2407
2408 if (page_get_n_recs(page) > 1) {
2409 split_rec = page_get_middle_rec(page);
2410 } else if (btr_page_tuple_smaller(cursor, tuple, offsets, n_uniq, heap)) {
2411 split_rec = page_rec_get_next(page_get_infimum_rec(page));
2412 } else {
2413 split_rec = nullptr;
2414 }
2415 }
2416
2417 /* 2. Allocate a new page to the index */
2418 new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2419 btr_page_get_level(page, mtr), mtr, mtr);
2420
2421 new_page = buf_block_get_frame(new_block);
2422 new_page_zip = buf_block_get_page_zip(new_block);
2423 btr_page_create(new_block, new_page_zip, cursor->index,
2424 btr_page_get_level(page, mtr), mtr);
2425
2426 /* 3. Calculate the first record on the upper half-page, and the
2427 first record (move_limit) on original page which ends up on the
2428 upper half */
2429
2430 if (split_rec) {
2431 first_rec = move_limit = split_rec;
2432
2433 *offsets =
2434 rec_get_offsets(split_rec, cursor->index, *offsets, n_uniq, heap);
2435
2436 insert_left = cmp_dtuple_rec(tuple, split_rec, cursor->index, *offsets) < 0;
2437
2438 if (!insert_left && new_page_zip && n_iterations > 0) {
2439 /* If a compressed page has already been split,
2440 avoid further splits by inserting the record
2441 to an empty page. */
2442 split_rec = nullptr;
2443 goto insert_empty;
2444 }
2445 } else if (insert_left) {
2446 ut_a(n_iterations > 0);
2447 first_rec = page_rec_get_next(page_get_infimum_rec(page));
2448 move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2449 } else {
2450 insert_empty:
2451 ut_ad(!split_rec);
2452 ut_ad(!insert_left);
2453 buf =
2454 UT_NEW_ARRAY_NOKEY(byte, rec_get_converted_size(cursor->index, tuple));
2455
2456 first_rec = rec_convert_dtuple_to_rec(buf, cursor->index, tuple);
2457 move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2458 }
2459
2460 /* 4. Do first the modifications in the tree structure */
2461
2462 btr_attach_half_pages(flags, cursor->index, block, first_rec, new_block,
2463 direction, mtr);
2464
2465 /* If the split is made on the leaf level and the insert will fit
2466 on the appropriate half-page, we may release the tree x-latch.
2467 We can then move the records after releasing the tree latch,
2468 thus reducing the tree latch contention. */
2469
2470 if (split_rec) {
2471 insert_will_fit =
2472 !new_page_zip &&
2473 btr_page_insert_fits(cursor, split_rec, offsets, tuple, heap);
2474 } else {
2475 if (!insert_left) {
2476 UT_DELETE_ARRAY(buf);
2477 buf = nullptr;
2478 }
2479
2480 insert_will_fit =
2481 !new_page_zip &&
2482 btr_page_insert_fits(cursor, nullptr, offsets, tuple, heap);
2483 }
2484
2485 if (!srv_read_only_mode && !cursor->index->table->is_intrinsic() &&
2486 insert_will_fit && page_is_leaf(page) &&
2487 !dict_index_is_online_ddl(cursor->index)) {
2488 mtr->memo_release(dict_index_get_lock(cursor->index),
2489 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK);
2490
2491 /* NOTE: We cannot release root block latch here, because it
2492 has segment header and already modified in most of cases.*/
2493 }
2494
2495 /* 5. Move then the records to the new page */
2496 if (direction == FSP_DOWN) {
2497 /* fputs("Split left\n", stderr); */
2498
2499 if (false
2500 #ifdef UNIV_ZIP_COPY
2501 || page_zip
2502 #endif /* UNIV_ZIP_COPY */
2503 || !page_move_rec_list_start(new_block, block, move_limit,
2504 cursor->index, mtr)) {
2505 /* For some reason, compressing new_page failed,
2506 even though it should contain fewer records than
2507 the original page. Copy the page byte for byte
2508 and then delete the records from both pages
2509 as appropriate. Deleting will always succeed. */
2510 ut_a(new_page_zip);
2511
2512 page_zip_copy_recs(new_page_zip, new_page, page_zip, page, cursor->index,
2513 mtr);
2514 page_delete_rec_list_end(move_limit - page + new_page, new_block,
2515 cursor->index, ULINT_UNDEFINED, ULINT_UNDEFINED,
2516 mtr);
2517
2518 /* Update the lock table and possible hash index. */
2519
2520 if (!dict_table_is_locking_disabled(cursor->index->table)) {
2521 lock_move_rec_list_start(new_block, block, move_limit,
2522 new_page + PAGE_NEW_INFIMUM);
2523 }
2524
2525 btr_search_move_or_delete_hash_entries(new_block, block, cursor->index);
2526
2527 /* Delete the records from the source page. */
2528
2529 page_delete_rec_list_start(move_limit, block, cursor->index, mtr);
2530 }
2531
2532 left_block = new_block;
2533 right_block = block;
2534
2535 if (!dict_table_is_locking_disabled(cursor->index->table)) {
2536 lock_update_split_left(right_block, left_block);
2537 }
2538 } else {
2539 /* fputs("Split right\n", stderr); */
2540
2541 if (false
2542 #ifdef UNIV_ZIP_COPY
2543 || page_zip
2544 #endif /* UNIV_ZIP_COPY */
2545 || !page_move_rec_list_end(new_block, block, move_limit, cursor->index,
2546 mtr)) {
2547 /* For some reason, compressing new_page failed,
2548 even though it should contain fewer records than
2549 the original page. Copy the page byte for byte
2550 and then delete the records from both pages
2551 as appropriate. Deleting will always succeed. */
2552 ut_a(new_page_zip);
2553
2554 page_zip_copy_recs(new_page_zip, new_page, page_zip, page, cursor->index,
2555 mtr);
2556 page_delete_rec_list_start(move_limit - page + new_page, new_block,
2557 cursor->index, mtr);
2558
2559 /* Update the lock table and possible hash index. */
2560 if (!dict_table_is_locking_disabled(cursor->index->table)) {
2561 lock_move_rec_list_end(new_block, block, move_limit);
2562 }
2563
2564 ut_ad(!dict_index_is_spatial(index));
2565
2566 btr_search_move_or_delete_hash_entries(new_block, block, cursor->index);
2567
2568 /* Delete the records from the source page. */
2569
2570 page_delete_rec_list_end(move_limit, block, cursor->index,
2571 ULINT_UNDEFINED, ULINT_UNDEFINED, mtr);
2572 }
2573
2574 left_block = block;
2575 right_block = new_block;
2576
2577 if (!dict_table_is_locking_disabled(cursor->index->table)) {
2578 lock_update_split_right(right_block, left_block);
2579 }
2580 }
2581
2582 #ifdef UNIV_ZIP_DEBUG
2583 if (page_zip) {
2584 ut_a(page_zip_validate(page_zip, page, cursor->index));
2585 ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
2586 }
2587 #endif /* UNIV_ZIP_DEBUG */
2588
2589 /* At this point, split_rec, move_limit and first_rec may point
2590 to garbage on the old page. */
2591
2592 /* 6. The split and the tree modification is now completed. Decide the
2593 page where the tuple should be inserted */
2594
2595 if (insert_left) {
2596 insert_block = left_block;
2597 } else {
2598 insert_block = right_block;
2599 }
2600
2601 /* 7. Reposition the cursor for insert and try insertion */
2602 page_cursor = btr_cur_get_page_cur(cursor);
2603
2604 page_cur_search(insert_block, cursor->index, tuple, page_cursor);
2605
2606 rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index, offsets, heap,
2607 mtr);
2608
2609 #ifdef UNIV_ZIP_DEBUG
2610 {
2611 page_t *insert_page = buf_block_get_frame(insert_block);
2612
2613 page_zip_des_t *insert_page_zip = buf_block_get_page_zip(insert_block);
2614
2615 ut_a(!insert_page_zip ||
2616 page_zip_validate(insert_page_zip, insert_page, cursor->index));
2617 }
2618 #endif /* UNIV_ZIP_DEBUG */
2619
2620 if (rec != nullptr) {
2621 goto func_exit;
2622 }
2623
2624 /* 8. If insert did not fit, try page reorganization.
2625 For compressed pages, page_cur_tuple_insert() will have
2626 attempted this already. */
2627
2628 if (page_cur_get_page_zip(page_cursor) ||
2629 !btr_page_reorganize(page_cursor, cursor->index, mtr)) {
2630 goto insert_failed;
2631 }
2632
2633 rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index, offsets, heap,
2634 mtr);
2635
2636 if (rec == nullptr) {
2637 /* The insert did not fit on the page: loop back to the
2638 start of the function for a new split */
2639 insert_failed:
2640 /* We play safe and reset the free bits for new_page */
2641 if (!cursor->index->is_clustered() &&
2642 !cursor->index->table->is_temporary()) {
2643 ibuf_reset_free_bits(new_block);
2644 ibuf_reset_free_bits(block);
2645 }
2646
2647 n_iterations++;
2648 ut_ad(n_iterations < 2 || buf_block_get_page_zip(insert_block));
2649 ut_ad(!insert_will_fit);
2650
2651 goto func_start;
2652 }
2653
2654 func_exit:
2655 /* Insert fit on the page: update the free bits for the
2656 left and right pages in the same mtr */
2657
2658 if (!cursor->index->is_clustered() && !cursor->index->table->is_temporary() &&
2659 page_is_leaf(page)) {
2660 ibuf_update_free_bits_for_two_pages_low(left_block, right_block, mtr);
2661 }
2662
2663 MONITOR_INC(MONITOR_INDEX_SPLIT);
2664
2665 ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
2666 ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
2667
2668 ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
2669 return (rec);
2670 }
2671
2672 /** Removes a page from the level list of pages.
2673 @param[in] space space where removed
2674 @param[in] page_size page size
2675 @param[in,out] page page to remove
2676 @param[in] index index tree
2677 @param[in,out] mtr mini-transaction */
2678 #define btr_level_list_remove(space, page_size, page, index, mtr) \
2679 btr_level_list_remove_func(space, page_size, page, index, mtr)
2680
2681 /** Removes a page from the level list of pages.
2682 @param[in] space space where removed
2683 @param[in] page_size page size
2684 @param[in,out] page page to remove
2685 @param[in] index index tree
2686 @param[in,out] mtr mini-transaction */
btr_level_list_remove_func(space_id_t space,const page_size_t & page_size,page_t * page,const dict_index_t * index,mtr_t * mtr)2687 static void btr_level_list_remove_func(space_id_t space,
2688 const page_size_t &page_size,
2689 page_t *page, const dict_index_t *index,
2690 mtr_t *mtr) {
2691 ut_ad(page != nullptr);
2692 ut_ad(mtr != nullptr);
2693 ut_ad(mtr_is_page_fix(mtr, page, MTR_MEMO_PAGE_X_FIX, index->table));
2694 ut_ad(space == page_get_space_id(page));
2695 /* Get the previous and next page numbers of page */
2696
2697 const page_no_t prev_page_no = btr_page_get_prev(page, mtr);
2698 const page_no_t next_page_no = btr_page_get_next(page, mtr);
2699
2700 /* Update page links of the level */
2701
2702 if (prev_page_no != FIL_NULL) {
2703 buf_block_t *prev_block = btr_block_get(page_id_t(space, prev_page_no),
2704 page_size, RW_X_LATCH, index, mtr);
2705
2706 page_t *prev_page = buf_block_get_frame(prev_block);
2707 #ifdef UNIV_BTR_DEBUG
2708 ut_a(page_is_comp(prev_page) == page_is_comp(page));
2709 ut_a(btr_page_get_next(prev_page, mtr) == page_get_page_no(page));
2710 #endif /* UNIV_BTR_DEBUG */
2711
2712 btr_page_set_next(prev_page, buf_block_get_page_zip(prev_block),
2713 next_page_no, mtr);
2714 }
2715
2716 if (next_page_no != FIL_NULL) {
2717 buf_block_t *next_block = btr_block_get(page_id_t(space, next_page_no),
2718 page_size, RW_X_LATCH, index, mtr);
2719
2720 page_t *next_page = buf_block_get_frame(next_block);
2721 #ifdef UNIV_BTR_DEBUG
2722 ut_a(page_is_comp(next_page) == page_is_comp(page));
2723 ut_a(btr_page_get_prev(next_page, mtr) == page_get_page_no(page));
2724 #endif /* UNIV_BTR_DEBUG */
2725
2726 btr_page_set_prev(next_page, buf_block_get_page_zip(next_block),
2727 prev_page_no, mtr);
2728 }
2729 }
2730
2731 /** Writes the redo log record for setting an index record as the predefined
2732 minimum record. */
2733 UNIV_INLINE
btr_set_min_rec_mark_log(rec_t * rec,mlog_id_t type,mtr_t * mtr)2734 void btr_set_min_rec_mark_log(rec_t *rec, /*!< in: record */
2735 mlog_id_t type, /*!< in: MLOG_COMP_REC_MIN_MARK or
2736 MLOG_REC_MIN_MARK */
2737 mtr_t *mtr) /*!< in: mtr */
2738 {
2739 mlog_write_initial_log_record(rec, type, mtr);
2740
2741 /* Write rec offset as a 2-byte ulint */
2742 mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
2743 }
2744 #else /* !UNIV_HOTBACKUP */
2745 #define btr_set_min_rec_mark_log(rec, comp, mtr) ((void)0)
2746 #endif /* !UNIV_HOTBACKUP */
2747
2748 /** Parses the redo log record for setting an index record as the predefined
2749 minimum record.
2750 @return end of log record or NULL */
btr_parse_set_min_rec_mark(byte * ptr,byte * end_ptr,ulint comp,page_t * page,mtr_t * mtr)2751 byte *btr_parse_set_min_rec_mark(
2752 byte *ptr, /*!< in: buffer */
2753 byte *end_ptr, /*!< in: buffer end */
2754 ulint comp, /*!< in: nonzero=compact page format */
2755 page_t *page, /*!< in: page or NULL */
2756 mtr_t *mtr) /*!< in: mtr or NULL */
2757 {
2758 rec_t *rec;
2759
2760 if (end_ptr < ptr + 2) {
2761 return (nullptr);
2762 }
2763
2764 if (page) {
2765 ut_a(!page_is_comp(page) == !comp);
2766
2767 rec = page + mach_read_from_2(ptr);
2768
2769 btr_set_min_rec_mark(rec, mtr);
2770 }
2771
2772 return (ptr + 2);
2773 }
2774
2775 /** Sets a record as the predefined minimum record. */
btr_set_min_rec_mark(rec_t * rec,mtr_t * mtr)2776 void btr_set_min_rec_mark(rec_t *rec, /*!< in: record */
2777 mtr_t *mtr) /*!< in: mtr */
2778 {
2779 ulint info_bits;
2780
2781 if (page_rec_is_comp(rec)) {
2782 info_bits = rec_get_info_bits(rec, TRUE);
2783
2784 rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
2785
2786 btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
2787 } else {
2788 info_bits = rec_get_info_bits(rec, FALSE);
2789
2790 rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);
2791
2792 btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
2793 }
2794 }
2795
2796 #ifndef UNIV_HOTBACKUP
2797 /** Deletes on the upper level the node pointer to a page. */
btr_node_ptr_delete(dict_index_t * index,buf_block_t * block,mtr_t * mtr)2798 void btr_node_ptr_delete(
2799 dict_index_t *index, /*!< in: index tree */
2800 buf_block_t *block, /*!< in: page whose node pointer is deleted */
2801 mtr_t *mtr) /*!< in: mtr */
2802 {
2803 btr_cur_t cursor;
2804 ibool compressed;
2805 dberr_t err;
2806
2807 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2808
2809 /* Delete node pointer on father page */
2810 btr_page_get_father(index, block, mtr, &cursor);
2811
2812 compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor, BTR_CREATE_FLAG,
2813 false, 0, 0, 0, mtr);
2814 ut_a(err == DB_SUCCESS);
2815
2816 if (!compressed) {
2817 btr_cur_compress_if_useful(&cursor, FALSE, mtr);
2818 }
2819 }
2820
2821 /** If page is the only on its level, this function moves its records to the
2822 father page, thus reducing the tree height.
2823 @return father block */
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)2824 static buf_block_t *btr_lift_page_up(
2825 dict_index_t *index, /*!< in: index tree */
2826 buf_block_t *block, /*!< in: page which is the only on its level;
2827 must not be empty: use
2828 btr_discard_only_page_on_level if the last
2829 record from the page should be removed */
2830 mtr_t *mtr) /*!< in: mtr */
2831 {
2832 buf_block_t *father_block;
2833 page_t *father_page;
2834 ulint page_level;
2835 page_zip_des_t *father_page_zip;
2836 page_t *page = buf_block_get_frame(block);
2837 page_no_t root_page_no;
2838 buf_block_t *blocks[BTR_MAX_LEVELS];
2839 ulint n_blocks; /*!< last used index in blocks[] */
2840 ulint i;
2841 bool lift_father_up;
2842 buf_block_t *block_orig = block;
2843
2844 ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
2845 ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
2846 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2847
2848 page_level = btr_page_get_level(page, mtr);
2849 root_page_no = dict_index_get_page(index);
2850
2851 {
2852 btr_cur_t cursor;
2853 ulint *offsets = nullptr;
2854 mem_heap_t *heap = mem_heap_create(
2855 sizeof(*offsets) * (REC_OFFS_HEADER_SIZE + 1 + 1 + index->n_fields));
2856 buf_block_t *b;
2857
2858 if (dict_index_is_spatial(index)) {
2859 offsets = rtr_page_get_father_block(nullptr, heap, index, block, mtr,
2860 nullptr, &cursor);
2861 } else {
2862 offsets =
2863 btr_page_get_father_block(offsets, heap, index, block, mtr, &cursor);
2864 }
2865 father_block = btr_cur_get_block(&cursor);
2866 father_page_zip = buf_block_get_page_zip(father_block);
2867 father_page = buf_block_get_frame(father_block);
2868
2869 n_blocks = 0;
2870
2871 /* Store all ancestor pages so we can reset their
2872 levels later on. We have to do all the searches on
2873 the tree now because later on, after we've replaced
2874 the first level, the tree is in an inconsistent state
2875 and can not be searched. */
2876 for (b = father_block; b->page.id.page_no() != root_page_no;) {
2877 ut_a(n_blocks < BTR_MAX_LEVELS);
2878
2879 if (dict_index_is_spatial(index)) {
2880 offsets = rtr_page_get_father_block(nullptr, heap, index, b, mtr,
2881 nullptr, &cursor);
2882 } else {
2883 offsets =
2884 btr_page_get_father_block(offsets, heap, index, b, mtr, &cursor);
2885 }
2886
2887 blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
2888 }
2889
2890 lift_father_up = (n_blocks && page_level == 0);
2891 if (lift_father_up) {
2892 /* The father page also should be the only on its level (not
2893 root). We should lift up the father page at first.
2894 Because the leaf page should be lifted up only for root page.
2895 The freeing page is based on page_level (==0 or !=0)
2896 to choose segment. If the page_level is changed ==0 from !=0,
2897 later freeing of the page doesn't find the page allocation
2898 to be freed.*/
2899
2900 block = father_block;
2901 page = buf_block_get_frame(block);
2902 page_level = btr_page_get_level(page, mtr);
2903
2904 ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
2905 ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
2906 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2907
2908 father_block = blocks[0];
2909 father_page_zip = buf_block_get_page_zip(father_block);
2910 father_page = buf_block_get_frame(father_block);
2911 }
2912
2913 mem_heap_free(heap);
2914 }
2915
2916 btr_search_drop_page_hash_index(block);
2917
2918 /* Make the father empty */
2919 btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
2920 page_level++;
2921
2922 /* Copy the records to the father page one by one. */
2923 if (false
2924 #ifdef UNIV_ZIP_COPY
2925 || father_page_zip
2926 #endif /* UNIV_ZIP_COPY */
2927 || !page_copy_rec_list_end(father_block, block,
2928 page_get_infimum_rec(page), index, mtr)) {
2929 const page_zip_des_t *page_zip = buf_block_get_page_zip(block);
2930 ut_a(father_page_zip);
2931 ut_a(page_zip);
2932
2933 /* Copy the page byte for byte. */
2934 page_zip_copy_recs(father_page_zip, father_page, page_zip, page, index,
2935 mtr);
2936
2937 /* Update the lock table and possible hash index. */
2938
2939 if (!dict_table_is_locking_disabled(index->table)) {
2940 lock_move_rec_list_end(father_block, block, page_get_infimum_rec(page));
2941 }
2942
2943 /* Also update the predicate locks */
2944 if (dict_index_is_spatial(index)) {
2945 lock_prdt_rec_move(father_block, block);
2946 }
2947
2948 btr_search_move_or_delete_hash_entries(father_block, block, index);
2949 }
2950
2951 if (!dict_table_is_locking_disabled(index->table)) {
2952 /* Free predicate page locks on the block */
2953 if (dict_index_is_spatial(index)) {
2954 locksys::Shard_latch_guard guard{block->get_page_id()};
2955 lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
2956 }
2957 lock_update_copy_and_discard(father_block, block);
2958 }
2959
2960 /* Go upward to root page, decrementing levels by one. */
2961 for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
2962 page_t *page = buf_block_get_frame(blocks[i]);
2963 page_zip_des_t *page_zip = buf_block_get_page_zip(blocks[i]);
2964
2965 ut_ad(btr_page_get_level(page, mtr) == page_level + 1);
2966
2967 btr_page_set_level(page, page_zip, page_level, mtr);
2968 #ifdef UNIV_ZIP_DEBUG
2969 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2970 #endif /* UNIV_ZIP_DEBUG */
2971 }
2972
2973 if (dict_index_is_spatial(index)) {
2974 rtr_check_discard_page(index, nullptr, block);
2975 }
2976
2977 /* Free the file page */
2978 btr_page_free(index, block, mtr);
2979
2980 /* We play it safe and reset the free bits for the father */
2981 if (!index->is_clustered() && !index->table->is_temporary()) {
2982 ibuf_reset_free_bits(father_block);
2983 }
2984 ut_ad(page_validate(father_page, index));
2985 ut_ad(btr_check_node_ptr(index, father_block, mtr));
2986
2987 return (lift_father_up ? block_orig : father_block);
2988 }
2989
2990 /** Tries to merge the page first to the left immediate brother if such a
2991 brother exists, and the node pointers to the current page and to the brother
2992 reside on the same page. If the left brother does not satisfy these
2993 conditions, looks at the right brother. If the page is the only one on that
2994 level lifts the records of the page to the father page, thus reducing the
2995 tree height. It is assumed that mtr holds an x-latch on the tree and on the
2996 page. If cursor is on the leaf level, mtr must also hold x-latches to the
2997 brothers, if they exist.
2998 @return true on success */
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)2999 ibool btr_compress(
3000 btr_cur_t *cursor, /*!< in/out: cursor on the page to merge
3001 or lift; the page must not be empty:
3002 when deleting records, use btr_discard_page()
3003 if the page would become empty */
3004 ibool adjust, /*!< in: TRUE if should adjust the
3005 cursor position even if compression occurs */
3006 mtr_t *mtr) /*!< in/out: mini-transaction */
3007 {
3008 dict_index_t *index;
3009 space_id_t space;
3010 page_no_t left_page_no;
3011 page_no_t right_page_no;
3012 buf_block_t *merge_block;
3013 page_t *merge_page = nullptr;
3014 page_zip_des_t *merge_page_zip;
3015 ibool is_left;
3016 buf_block_t *block;
3017 page_t *page;
3018 btr_cur_t father_cursor;
3019 mem_heap_t *heap;
3020 ulint *offsets;
3021 ulint nth_rec = 0; /* remove bogus warning */
3022 bool mbr_changed = false;
3023 #ifdef UNIV_DEBUG
3024 bool leftmost_child;
3025 #endif
3026 DBUG_TRACE;
3027
3028 block = btr_cur_get_block(cursor);
3029 page = btr_cur_get_page(cursor);
3030 index = btr_cur_get_index(cursor);
3031
3032 btr_assert_not_corrupted(block, index);
3033
3034 #ifdef UNIV_DEBUG
3035 if (dict_index_is_spatial(index)) {
3036 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3037 MTR_MEMO_X_LOCK));
3038 } else {
3039 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3040 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
3041 index->table->is_intrinsic());
3042 }
3043 #endif /* UNIV_DEBUG */
3044
3045 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3046 space = dict_index_get_space(index);
3047
3048 const page_size_t page_size(dict_table_page_size(index->table));
3049
3050 MONITOR_INC(MONITOR_INDEX_MERGE_ATTEMPTS);
3051
3052 left_page_no = btr_page_get_prev(page, mtr);
3053 right_page_no = btr_page_get_next(page, mtr);
3054
3055 #ifdef UNIV_DEBUG
3056 if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3057 ut_a(REC_INFO_MIN_REC_FLAG &
3058 rec_get_info_bits(page_rec_get_next(page_get_infimum_rec(page)),
3059 page_is_comp(page)));
3060 }
3061 #endif /* UNIV_DEBUG */
3062
3063 heap = mem_heap_create(100);
3064
3065 if (dict_index_is_spatial(index)) {
3066 offsets = rtr_page_get_father_block(nullptr, heap, index, block, mtr,
3067 cursor, &father_cursor);
3068 ut_ad(cursor->page_cur.block->page.id.page_no() ==
3069 block->page.id.page_no());
3070 rec_t *my_rec = father_cursor.page_cur.rec;
3071
3072 page_no_t page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3073
3074 if (page_no != block->page.id.page_no()) {
3075 ib::info(ER_IB_MSG_32) << "father positioned on page " << page_no
3076 << "instead of " << block->page.id.page_no();
3077 offsets = btr_page_get_father_block(nullptr, heap, index, block, mtr,
3078 &father_cursor);
3079 }
3080 } else {
3081 offsets = btr_page_get_father_block(nullptr, heap, index, block, mtr,
3082 &father_cursor);
3083 }
3084
3085 if (adjust) {
3086 nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3087 ut_ad(nth_rec > 0);
3088 }
3089
3090 if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3091 /* The page is the only one on the level, lift the records
3092 to the father */
3093
3094 merge_block = btr_lift_page_up(index, block, mtr);
3095 goto func_exit;
3096 }
3097
3098 ut_d(leftmost_child =
3099 left_page_no != FIL_NULL &&
3100 (page_rec_get_next(page_get_infimum_rec(btr_cur_get_page(
3101 &father_cursor))) == btr_cur_get_rec(&father_cursor)));
3102
3103 /* Decide the page to which we try to merge and which will inherit
3104 the locks */
3105
3106 is_left = btr_can_merge_with_page(cursor, left_page_no, &merge_block, mtr);
3107
3108 DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3109 retry:
3110 if (!is_left &&
3111 !btr_can_merge_with_page(cursor, right_page_no, &merge_block, mtr)) {
3112 if (!merge_block) {
3113 merge_page = nullptr;
3114 }
3115 goto err_exit;
3116 }
3117
3118 merge_page = buf_block_get_frame(merge_block);
3119
3120 #ifdef UNIV_BTR_DEBUG
3121 if (is_left) {
3122 ut_a(btr_page_get_next(merge_page, mtr) == block->page.id.page_no());
3123 } else {
3124 ut_a(btr_page_get_prev(merge_page, mtr) == block->page.id.page_no());
3125 }
3126 #endif /* UNIV_BTR_DEBUG */
3127
3128 #ifdef UNIV_GIS_DEBUG
3129 if (dict_index_is_spatial(index)) {
3130 if (is_left) {
3131 fprintf(stderr, "GIS_DIAG: merge left %ld to %ld \n",
3132 (long)block->page.id.page_no(), left_page_no);
3133 } else {
3134 fprintf(stderr, "GIS_DIAG: merge right %ld to %ld\n",
3135 (long)block->page.id.page_no(), right_page_no);
3136 }
3137 }
3138 #endif /* UNIV_GIS_DEBUG */
3139
3140 ut_ad(page_validate(merge_page, index));
3141
3142 merge_page_zip = buf_block_get_page_zip(merge_block);
3143 #ifdef UNIV_ZIP_DEBUG
3144 if (merge_page_zip) {
3145 const page_zip_des_t *page_zip = buf_block_get_page_zip(block);
3146 ut_a(page_zip);
3147 ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3148 ut_a(page_zip_validate(page_zip, page, index));
3149 }
3150 #endif /* UNIV_ZIP_DEBUG */
3151
3152 /* Move records to the merge page */
3153 if (is_left) {
3154 btr_cur_t cursor2;
3155 rtr_mbr_t new_mbr;
3156 ulint *offsets2 = nullptr;
3157
3158 /* For rtree, we need to update father's mbr. */
3159 if (dict_index_is_spatial(index)) {
3160 /* We only support merge pages with the same parent
3161 page */
3162 if (!rtr_check_same_block(index, &cursor2,
3163 btr_cur_get_block(&father_cursor), merge_block,
3164 heap)) {
3165 is_left = false;
3166 goto retry;
3167 }
3168
3169 /* Set rtr_info for cursor2, since it is
3170 necessary in recursive page merge. */
3171 cursor2.rtr_info = cursor->rtr_info;
3172 cursor2.tree_height = cursor->tree_height;
3173
3174 offsets2 = rec_get_offsets(btr_cur_get_rec(&cursor2), index, nullptr,
3175 ULINT_UNDEFINED, &heap);
3176
3177 /* Check if parent entry needs to be updated */
3178 mbr_changed =
3179 rtr_merge_mbr_changed(&cursor2, &father_cursor, offsets2, offsets,
3180 &new_mbr, merge_block, block, index);
3181 }
3182
3183 rec_t *orig_pred = page_copy_rec_list_start(
3184 merge_block, block, page_get_supremum_rec(page), index, mtr);
3185
3186 if (!orig_pred) {
3187 goto err_exit;
3188 }
3189
3190 btr_search_drop_page_hash_index(block);
3191
3192 /* Remove the page from the level list */
3193 btr_level_list_remove(space, page_size, page, index, mtr);
3194
3195 if (dict_index_is_spatial(index)) {
3196 rec_t *my_rec = father_cursor.page_cur.rec;
3197
3198 page_no_t page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3199
3200 if (page_no != block->page.id.page_no()) {
3201 ib::fatal(ER_IB_MSG_33) << "father positioned on " << page_no
3202 << " instead of " << block->page.id.page_no();
3203
3204 ut_ad(0);
3205 }
3206
3207 if (mbr_changed) {
3208 #ifdef UNIV_DEBUG
3209 bool success = rtr_update_mbr_field(&cursor2, offsets2, &father_cursor,
3210 merge_page, &new_mbr, nullptr, mtr);
3211
3212 ut_ad(success);
3213 #else
3214 rtr_update_mbr_field(&cursor2, offsets2, &father_cursor, merge_page,
3215 &new_mbr, NULL, mtr);
3216 #endif
3217 } else {
3218 rtr_node_ptr_delete(index, &father_cursor, block, mtr);
3219 }
3220
3221 /* No GAP lock needs to be worrying about */
3222 locksys::Shard_latch_guard guard{block->get_page_id()};
3223 lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
3224 lock_rec_free_all_from_discard_page(block);
3225 } else {
3226 btr_node_ptr_delete(index, block, mtr);
3227 if (!dict_table_is_locking_disabled(index->table)) {
3228 lock_update_merge_left(merge_block, orig_pred, block);
3229 }
3230 }
3231
3232 if (adjust) {
3233 nth_rec += page_rec_get_n_recs_before(orig_pred);
3234 }
3235 } else {
3236 rec_t *orig_succ;
3237 ibool compressed;
3238 dberr_t err;
3239 btr_cur_t cursor2;
3240 /* father cursor pointing to node ptr
3241 of the right sibling */
3242 #ifdef UNIV_BTR_DEBUG
3243 byte fil_page_prev[4];
3244 #endif /* UNIV_BTR_DEBUG */
3245
3246 if (dict_index_is_spatial(index)) {
3247 cursor2.rtr_info = nullptr;
3248
3249 /* For spatial index, we disallow merge of blocks
3250 with different parents, since the merge would need
3251 to update entry (for MBR and Primary key) in the
3252 parent of block being merged */
3253 if (!rtr_check_same_block(index, &cursor2,
3254 btr_cur_get_block(&father_cursor), merge_block,
3255 heap)) {
3256 goto err_exit;
3257 }
3258
3259 /* Set rtr_info for cursor2, since it is
3260 necessary in recursive page merge. */
3261 cursor2.rtr_info = cursor->rtr_info;
3262 cursor2.tree_height = cursor->tree_height;
3263 } else {
3264 btr_page_get_father(index, merge_block, mtr, &cursor2);
3265 }
3266
3267 if (merge_page_zip && left_page_no == FIL_NULL) {
3268 /* The function page_zip_compress(), which will be
3269 invoked by page_copy_rec_list_end() below,
3270 requires that FIL_PAGE_PREV be FIL_NULL.
3271 Clear the field, but prepare to restore it. */
3272 #ifdef UNIV_BTR_DEBUG
3273 memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3274 #endif /* UNIV_BTR_DEBUG */
3275 static_assert(FIL_NULL == 0xffffffff, "FIL_NULL != 0xffffffff");
3276 memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
3277 }
3278
3279 orig_succ = page_copy_rec_list_end(
3280 merge_block, block, page_get_infimum_rec(page), cursor->index, mtr);
3281
3282 if (!orig_succ) {
3283 ut_a(merge_page_zip);
3284 #ifdef UNIV_BTR_DEBUG
3285 if (left_page_no == FIL_NULL) {
3286 /* FIL_PAGE_PREV was restored from
3287 merge_page_zip. */
3288 ut_a(!memcmp(fil_page_prev, merge_page + FIL_PAGE_PREV, 4));
3289 }
3290 #endif /* UNIV_BTR_DEBUG */
3291 goto err_exit;
3292 }
3293
3294 btr_search_drop_page_hash_index(block);
3295
3296 #ifdef UNIV_BTR_DEBUG
3297 if (merge_page_zip && left_page_no == FIL_NULL) {
3298 /* Restore FIL_PAGE_PREV in order to avoid an assertion
3299 failure in btr_level_list_remove(), which will set
3300 the field again to FIL_NULL. Even though this makes
3301 merge_page and merge_page_zip inconsistent for a
3302 split second, it is harmless, because the pages
3303 are X-latched. */
3304 memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3305 }
3306 #endif /* UNIV_BTR_DEBUG */
3307
3308 /* Remove the page from the level list */
3309 btr_level_list_remove(space, page_size, page, index, mtr);
3310
3311 ut_ad(btr_node_ptr_get_child_page_no(btr_cur_get_rec(&father_cursor),
3312 offsets) == block->page.id.page_no());
3313
3314 /* Replace the address of the old child node (= page) with the
3315 address of the merge page to the right */
3316 btr_node_ptr_set_child_page_no(btr_cur_get_rec(&father_cursor),
3317 btr_cur_get_page_zip(&father_cursor),
3318 offsets, right_page_no, mtr);
3319
3320 #ifdef UNIV_DEBUG
3321 if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3322 ut_ad(REC_INFO_MIN_REC_FLAG &
3323 rec_get_info_bits(page_rec_get_next(page_get_infimum_rec(
3324 buf_block_get_frame(merge_block))),
3325 page_is_comp(page)));
3326 }
3327 #endif /* UNIV_DEBUG */
3328
3329 /* For rtree, we need to update father's mbr. */
3330 if (dict_index_is_spatial(index)) {
3331 ulint *offsets2;
3332 ulint rec_info;
3333
3334 offsets2 = rec_get_offsets(btr_cur_get_rec(&cursor2), index, nullptr,
3335 ULINT_UNDEFINED, &heap);
3336
3337 ut_ad(btr_node_ptr_get_child_page_no(btr_cur_get_rec(&cursor2),
3338 offsets2) == right_page_no);
3339
3340 rec_info = rec_get_info_bits(btr_cur_get_rec(&father_cursor),
3341 rec_offs_comp(offsets));
3342 if (rec_info & REC_INFO_MIN_REC_FLAG) {
3343 /* When the father node ptr is minimal rec,
3344 we will keep it and delete the node ptr of
3345 merge page. */
3346 rtr_merge_and_update_mbr(&father_cursor, &cursor2, offsets, offsets2,
3347 merge_page, merge_block, block, index, mtr);
3348 } else {
3349 /* Otherwise, we will keep the node ptr of
3350 merge page and delete the father node ptr.
3351 This is for keeping the rec order in upper
3352 level. */
3353 rtr_merge_and_update_mbr(&cursor2, &father_cursor, offsets2, offsets,
3354 merge_page, merge_block, block, index, mtr);
3355 }
3356 locksys::Shard_latch_guard guard{block->get_page_id()};
3357 lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
3358 lock_rec_free_all_from_discard_page(block);
3359 } else {
3360 compressed = btr_cur_pessimistic_delete(
3361 &err, TRUE, &cursor2, BTR_CREATE_FLAG, false, 0, 0, 0, mtr);
3362 ut_a(err == DB_SUCCESS);
3363
3364 if (!compressed) {
3365 btr_cur_compress_if_useful(&cursor2, FALSE, mtr);
3366 }
3367
3368 if (!dict_table_is_locking_disabled(index->table)) {
3369 lock_update_merge_right(merge_block, orig_succ, block);
3370 }
3371 }
3372 }
3373
3374 if (!index->is_clustered() && !index->table->is_temporary() &&
3375 page_is_leaf(merge_page)) {
3376 /* Update the free bits of the B-tree page in the
3377 insert buffer bitmap. This has to be done in a
3378 separate mini-transaction that is committed before the
3379 main mini-transaction. We cannot update the insert
3380 buffer bitmap in this mini-transaction, because
3381 btr_compress() can be invoked recursively without
3382 committing the mini-transaction in between. Since
3383 insert buffer bitmap pages have a lower rank than
3384 B-tree pages, we must not access other pages in the
3385 same mini-transaction after accessing an insert buffer
3386 bitmap page. */
3387
3388 /* The free bits in the insert buffer bitmap must
3389 never exceed the free space on a page. It is safe to
3390 decrement or reset the bits in the bitmap in a
3391 mini-transaction that is committed before the
3392 mini-transaction that affects the free space. */
3393
3394 /* It is unsafe to increment the bits in a separately
3395 committed mini-transaction, because in crash recovery,
3396 the free bits could momentarily be set too high. */
3397
3398 if (page_size.is_compressed()) {
3399 /* Because the free bits may be incremented
3400 and we cannot update the insert buffer bitmap
3401 in the same mini-transaction, the only safe
3402 thing we can do here is the pessimistic
3403 approach: reset the free bits. */
3404 ibuf_reset_free_bits(merge_block);
3405 } else {
3406 /* On uncompressed pages, the free bits will
3407 never increase here. Thus, it is safe to
3408 write the bits accurately in a separate
3409 mini-transaction. */
3410 ibuf_update_free_bits_if_full(merge_block, UNIV_PAGE_SIZE,
3411 ULINT_UNDEFINED);
3412 }
3413 }
3414
3415 ut_ad(page_validate(merge_page, index));
3416 #ifdef UNIV_ZIP_DEBUG
3417 ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page, index));
3418 #endif /* UNIV_ZIP_DEBUG */
3419
3420 if (dict_index_is_spatial(index)) {
3421 #ifdef UNIV_GIS_DEBUG
3422 fprintf(stderr, "GIS_DIAG: compressed away %ld\n",
3423 (long)block->page.id.page_no());
3424 fprintf(stderr, "GIS_DIAG: merged to %ld\n",
3425 (long)merge_block->page.id.page_no());
3426 #endif
3427
3428 rtr_check_discard_page(index, nullptr, block);
3429 }
3430
3431 /* Free the file page */
3432 btr_page_free(index, block, mtr);
3433
3434 /* btr_check_node_ptr() needs parent block latched.
3435 If the merge_block's parent block is not same,
3436 we cannot use btr_check_node_ptr() */
3437 ut_ad(leftmost_child || btr_check_node_ptr(index, merge_block, mtr));
3438 func_exit:
3439 mem_heap_free(heap);
3440
3441 if (adjust) {
3442 ut_ad(nth_rec > 0);
3443 btr_cur_position(index, page_rec_get_nth(merge_block->frame, nth_rec),
3444 merge_block, cursor);
3445 }
3446
3447 MONITOR_INC(MONITOR_INDEX_MERGE_SUCCESSFUL);
3448
3449 return TRUE;
3450
3451 err_exit:
3452 /* We play it safe and reset the free bits. */
3453 if (page_size.is_compressed() && merge_page && page_is_leaf(merge_page) &&
3454 !index->is_clustered()) {
3455 ibuf_reset_free_bits(merge_block);
3456 }
3457
3458 mem_heap_free(heap);
3459 return FALSE;
3460 }
3461
3462 /** Discards a page that is the only page on its level. This will empty
3463 the whole B-tree, leaving just an empty root page. This function
3464 should never be reached, because btr_compress(), which is invoked in
3465 delete operations, calls btr_lift_page_up() to flatten the B-tree. */
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3466 static void btr_discard_only_page_on_level(
3467 dict_index_t *index, /*!< in: index tree */
3468 buf_block_t *block, /*!< in: page which is the only on its level */
3469 mtr_t *mtr) /*!< in: mtr */
3470 {
3471 ulint page_level = 0;
3472 trx_id_t max_trx_id;
3473
3474 /* Save the PAGE_MAX_TRX_ID from the leaf page. */
3475 max_trx_id = page_get_max_trx_id(buf_block_get_frame(block));
3476
3477 while (block->page.id.page_no() != dict_index_get_page(index)) {
3478 btr_cur_t cursor;
3479 buf_block_t *father;
3480 const page_t *page = buf_block_get_frame(block);
3481
3482 ut_a(page_get_n_recs(page) == 1);
3483 ut_a(page_level == btr_page_get_level(page, mtr));
3484 ut_a(btr_page_get_prev(page, mtr) == FIL_NULL);
3485 ut_a(btr_page_get_next(page, mtr) == FIL_NULL);
3486
3487 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3488 btr_search_drop_page_hash_index(block);
3489
3490 if (dict_index_is_spatial(index)) {
3491 /* Check any concurrent search having this page */
3492 rtr_check_discard_page(index, nullptr, block);
3493 rtr_page_get_father(index, block, mtr, nullptr, &cursor);
3494 } else {
3495 btr_page_get_father(index, block, mtr, &cursor);
3496 }
3497 father = btr_cur_get_block(&cursor);
3498
3499 if (!dict_table_is_locking_disabled(index->table)) {
3500 lock_update_discard(father, PAGE_HEAP_NO_SUPREMUM, block);
3501 }
3502
3503 /* Free the file page */
3504 btr_page_free(index, block, mtr);
3505
3506 block = father;
3507 page_level++;
3508 }
3509
3510 /* block is the root page, which must be empty, except
3511 for the node pointer to the (now discarded) block(s). */
3512
3513 #ifdef UNIV_BTR_DEBUG
3514 if (!dict_index_is_ibuf(index)) {
3515 const page_t *root = buf_block_get_frame(block);
3516 const space_id_t space_id = dict_index_get_space(index);
3517 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + root,
3518 space_id));
3519 ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
3520 space_id));
3521 }
3522 #endif /* UNIV_BTR_DEBUG */
3523
3524 btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
3525 ut_ad(page_is_leaf(buf_block_get_frame(block)));
3526
3527 if (!index->is_clustered() && !index->table->is_temporary()) {
3528 /* We play it safe and reset the free bits for the root */
3529 ibuf_reset_free_bits(block);
3530
3531 ut_a(max_trx_id);
3532 page_set_max_trx_id(block, buf_block_get_page_zip(block), max_trx_id, mtr);
3533 }
3534 }
3535
3536 /** Discards a page from a B-tree. This is used to remove the last record from
3537 a B-tree page: the whole page must be removed at the same time. This cannot
3538 be used for the root page, which is allowed to be empty. */
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)3539 void btr_discard_page(btr_cur_t *cursor, /*!< in: cursor on the page to discard:
3540 not on the root page */
3541 mtr_t *mtr) /*!< in: mtr */
3542 {
3543 dict_index_t *index;
3544 page_no_t left_page_no;
3545 page_no_t right_page_no;
3546 buf_block_t *merge_block;
3547 page_t *merge_page;
3548 buf_block_t *block;
3549 page_t *page;
3550 rec_t *node_ptr;
3551 #ifdef UNIV_DEBUG
3552 btr_cur_t parent_cursor;
3553 bool parent_is_different = false;
3554 #endif
3555
3556 block = btr_cur_get_block(cursor);
3557 index = btr_cur_get_index(cursor);
3558
3559 ut_ad(dict_index_get_page(index) != block->page.id.page_no());
3560
3561 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3562 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
3563 index->table->is_intrinsic());
3564
3565 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3566
3567 const space_id_t space = dict_index_get_space(index);
3568
3569 MONITOR_INC(MONITOR_INDEX_DISCARD);
3570
3571 #ifdef UNIV_DEBUG
3572 if (dict_index_is_spatial(index)) {
3573 rtr_page_get_father(index, block, mtr, cursor, &parent_cursor);
3574 } else {
3575 btr_page_get_father(index, block, mtr, &parent_cursor);
3576 }
3577 #endif
3578
3579 /* Decide the page which will inherit the locks */
3580
3581 left_page_no = btr_page_get_prev(buf_block_get_frame(block), mtr);
3582 right_page_no = btr_page_get_next(buf_block_get_frame(block), mtr);
3583
3584 const page_size_t page_size(dict_table_page_size(index->table));
3585
3586 if (left_page_no != FIL_NULL) {
3587 merge_block = btr_block_get(page_id_t(space, left_page_no), page_size,
3588 RW_X_LATCH, index, mtr);
3589
3590 merge_page = buf_block_get_frame(merge_block);
3591 #ifdef UNIV_BTR_DEBUG
3592 ut_a(btr_page_get_next(merge_page, mtr) == block->page.id.page_no());
3593 #endif /* UNIV_BTR_DEBUG */
3594 ut_d(parent_is_different =
3595 (page_rec_get_next(page_get_infimum_rec(btr_cur_get_page(
3596 &parent_cursor))) == btr_cur_get_rec(&parent_cursor)));
3597 } else if (right_page_no != FIL_NULL) {
3598 merge_block = btr_block_get(page_id_t(space, right_page_no), page_size,
3599 RW_X_LATCH, index, mtr);
3600
3601 merge_page = buf_block_get_frame(merge_block);
3602 #ifdef UNIV_BTR_DEBUG
3603 ut_a(btr_page_get_prev(merge_page, mtr) == block->page.id.page_no());
3604 #endif /* UNIV_BTR_DEBUG */
3605 ut_d(parent_is_different = page_rec_is_supremum(
3606 page_rec_get_next(btr_cur_get_rec(&parent_cursor))));
3607 } else {
3608 btr_discard_only_page_on_level(index, block, mtr);
3609
3610 return;
3611 }
3612
3613 page = buf_block_get_frame(block);
3614 ut_a(page_is_comp(merge_page) == page_is_comp(page));
3615 btr_search_drop_page_hash_index(block);
3616
3617 if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
3618 /* We have to mark the leftmost node pointer on the right
3619 side page as the predefined minimum record */
3620 node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));
3621
3622 ut_ad(page_rec_is_user_rec(node_ptr));
3623
3624 /* This will make page_zip_validate() fail on merge_page
3625 until btr_level_list_remove() completes. This is harmless,
3626 because everything will take place within a single
3627 mini-transaction and because writing to the redo log
3628 is an atomic operation (performed by mtr_commit()). */
3629 btr_set_min_rec_mark(node_ptr, mtr);
3630 }
3631
3632 if (dict_index_is_spatial(index)) {
3633 btr_cur_t father_cursor;
3634
3635 /* Since rtr_node_ptr_delete doesn't contain get father
3636 node ptr, so, we need to get father node ptr first and then
3637 delete it. */
3638 rtr_page_get_father(index, block, mtr, cursor, &father_cursor);
3639 rtr_node_ptr_delete(index, &father_cursor, block, mtr);
3640 } else {
3641 btr_node_ptr_delete(index, block, mtr);
3642 }
3643
3644 /* Remove the page from the level list */
3645 btr_level_list_remove(space, page_size, page, index, mtr);
3646 #ifdef UNIV_ZIP_DEBUG
3647 {
3648 page_zip_des_t *merge_page_zip = buf_block_get_page_zip(merge_block);
3649 ut_a(!merge_page_zip ||
3650 page_zip_validate(merge_page_zip, merge_page, index));
3651 }
3652 #endif /* UNIV_ZIP_DEBUG */
3653
3654 if (!dict_table_is_locking_disabled(index->table)) {
3655 if (left_page_no != FIL_NULL) {
3656 lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM, block);
3657 } else {
3658 lock_update_discard(merge_block, lock_get_min_heap_no(merge_block),
3659 block);
3660 }
3661 }
3662
3663 if (dict_index_is_spatial(index)) {
3664 rtr_check_discard_page(index, cursor, block);
3665 }
3666
3667 /* Free the file page */
3668 btr_page_free(index, block, mtr);
3669
3670 /* btr_check_node_ptr() needs parent block latched.
3671 If the merge_block's parent block is not same,
3672 we cannot use btr_check_node_ptr() */
3673 ut_ad(parent_is_different || btr_check_node_ptr(index, merge_block, mtr));
3674 }
3675
3676 #ifdef UNIV_BTR_PRINT
3677 /** Prints size info of a B-tree. */
btr_print_size(dict_index_t * index)3678 void btr_print_size(dict_index_t *index) /*!< in: index tree */
3679 {
3680 page_t *root;
3681 fseg_header_t *seg;
3682 mtr_t mtr;
3683
3684 if (dict_index_is_ibuf(index)) {
3685 fputs(
3686 "Sorry, cannot print info of an ibuf tree:"
3687 " use ibuf functions\n",
3688 stderr);
3689
3690 return;
3691 }
3692
3693 mtr_start(&mtr);
3694
3695 root = btr_root_get(index, &mtr);
3696
3697 seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
3698
3699 fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
3700 fseg_print(seg, &mtr);
3701
3702 if (!dict_index_is_ibuf(index)) {
3703 seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
3704
3705 fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
3706 fseg_print(seg, &mtr);
3707 }
3708
3709 mtr_commit(&mtr);
3710 }
3711
3712 /** Prints recursively index tree pages. */
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,ulint ** offsets,mtr_t * mtr)3713 static void btr_print_recursive(
3714 dict_index_t *index, /*!< in: index tree */
3715 buf_block_t *block, /*!< in: index page */
3716 ulint width, /*!< in: print this many entries from start
3717 and end */
3718 mem_heap_t **heap, /*!< in/out: heap for rec_get_offsets() */
3719 ulint **offsets, /*!< in/out: buffer for rec_get_offsets() */
3720 mtr_t *mtr) /*!< in: mtr */
3721 {
3722 const page_t *page = buf_block_get_frame(block);
3723 page_cur_t cursor;
3724 ulint n_recs;
3725 ulint i = 0;
3726 mtr_t mtr2;
3727
3728 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_SX_FIX, index->table));
3729
3730 ib::info(ER_IB_MSG_34) << "NODE ON LEVEL " << btr_page_get_level(page, mtr)
3731 << " page " << block->page.id;
3732
3733 page_print(block, index, width, width);
3734
3735 n_recs = page_get_n_recs(page);
3736
3737 page_cur_set_before_first(block, &cursor);
3738 page_cur_move_to_next(&cursor);
3739
3740 while (!page_cur_is_after_last(&cursor)) {
3741 if (page_is_leaf(page)) {
3742 /* If this is the leaf level, do nothing */
3743
3744 } else if ((i <= width) || (i >= n_recs - width)) {
3745 const rec_t *node_ptr;
3746
3747 mtr_start(&mtr2);
3748
3749 node_ptr = page_cur_get_rec(&cursor);
3750
3751 *offsets =
3752 rec_get_offsets(node_ptr, index, *offsets, ULINT_UNDEFINED, heap);
3753 btr_print_recursive(
3754 index, btr_node_ptr_get_child(node_ptr, index, *offsets, &mtr2),
3755 width, heap, offsets, &mtr2);
3756 mtr_commit(&mtr2);
3757 }
3758
3759 page_cur_move_to_next(&cursor);
3760 i++;
3761 }
3762 }
3763
3764 /** Prints directories and other info of all nodes in the tree. */
btr_print_index(dict_index_t * index,ulint width)3765 void btr_print_index(dict_index_t *index, /*!< in: index */
3766 ulint width) /*!< in: print this many entries from start
3767 and end */
3768 {
3769 mtr_t mtr;
3770 buf_block_t *root;
3771 mem_heap_t *heap = NULL;
3772 ulint offsets_[REC_OFFS_NORMAL_SIZE];
3773 ulint *offsets = offsets_;
3774 rec_offs_init(offsets_);
3775
3776 fputs(
3777 "--------------------------\n"
3778 "INDEX TREE PRINT\n",
3779 stderr);
3780
3781 mtr_start(&mtr);
3782
3783 root = btr_root_block_get(index, RW_SX_LATCH, &mtr);
3784
3785 btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
3786 if (heap) {
3787 mem_heap_free(heap);
3788 }
3789
3790 mtr_commit(&mtr);
3791
3792 ut_ad(btr_validate_index(index, 0, false));
3793 }
3794 #endif /* UNIV_BTR_PRINT */
3795
3796 #ifdef UNIV_DEBUG
3797 /** Checks that the node pointer to a page is appropriate.
3798 @return true */
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3799 ibool btr_check_node_ptr(dict_index_t *index, /*!< in: index tree */
3800 buf_block_t *block, /*!< in: index page */
3801 mtr_t *mtr) /*!< in: mtr */
3802 {
3803 mem_heap_t *heap;
3804 dtuple_t *tuple;
3805 ulint *offsets;
3806 btr_cur_t cursor;
3807 page_t *page = buf_block_get_frame(block);
3808
3809 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3810
3811 if (dict_index_get_page(index) == block->page.id.page_no()) {
3812 return (TRUE);
3813 }
3814
3815 heap = mem_heap_create(256);
3816
3817 if (dict_index_is_spatial(index)) {
3818 offsets = rtr_page_get_father_block(nullptr, heap, index, block, mtr,
3819 nullptr, &cursor);
3820 } else {
3821 offsets =
3822 btr_page_get_father_block(nullptr, heap, index, block, mtr, &cursor);
3823 }
3824
3825 if (page_is_leaf(page)) {
3826 goto func_exit;
3827 }
3828
3829 tuple = dict_index_build_node_ptr(
3830 index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
3831 btr_page_get_level(page, mtr));
3832
3833 /* For spatial index, the MBR in the parent rec could be different
3834 with that of first rec of child, their relationship should be
3835 "WITHIN" relationship */
3836 if (dict_index_is_spatial(index)) {
3837 ut_a(!cmp_dtuple_rec_with_gis(tuple, btr_cur_get_rec(&cursor), offsets,
3838 PAGE_CUR_WITHIN, index->rtr_srs.get()));
3839 } else {
3840 ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), index, offsets));
3841 }
3842 func_exit:
3843 mem_heap_free(heap);
3844
3845 return (TRUE);
3846 }
3847 #endif /* UNIV_DEBUG */
3848
3849 /** Display identification information for a record. */
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)3850 static void btr_index_rec_validate_report(
3851 const page_t *page, /*!< in: index page */
3852 const rec_t *rec, /*!< in: index record */
3853 const dict_index_t *index) /*!< in: index */
3854 {
3855 ib::info(ER_IB_MSG_35) << "Record in index " << index->name << " of table "
3856 << index->table->name << ", page "
3857 << page_id_t(page_get_space_id(page),
3858 page_get_page_no(page))
3859 << ", at offset " << page_offset(rec);
3860 }
3861
3862 /** Checks the size and number of fields in a record based on the definition of
3863 the index.
3864 @return true if ok */
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)3865 ibool btr_index_rec_validate(const rec_t *rec, /*!< in: index record */
3866 const dict_index_t *index, /*!< in: index */
3867 ibool dump_on_error) /*!< in: TRUE if the function
3868 should print hex dump of
3869 record and page on error */
3870 {
3871 ulint len;
3872 ulint n;
3873 ulint i;
3874 const page_t *page;
3875 mem_heap_t *heap = nullptr;
3876 ulint offsets_[REC_OFFS_NORMAL_SIZE];
3877 ulint *offsets = offsets_;
3878 rec_offs_init(offsets_);
3879
3880 page = page_align(rec);
3881
3882 if (dict_index_is_ibuf(index)) {
3883 /* The insert buffer index tree can contain records from any
3884 other index: we cannot check the number of fields or
3885 their length */
3886
3887 return (TRUE);
3888 }
3889
3890 #ifdef VIRTUAL_INDEX_DEBUG
3891 if (dict_index_has_virtual(index) || index->is_clustered()) {
3892 fprintf(stderr, "index name is %s\n", index->name());
3893 }
3894 #endif
3895 if ((ibool) !!page_is_comp(page) != dict_table_is_comp(index->table)) {
3896 btr_index_rec_validate_report(page, rec, index);
3897
3898 ib::error(ER_IB_MSG_36)
3899 << "Compact flag=" << !!page_is_comp(page) << ", should be "
3900 << dict_table_is_comp(index->table);
3901
3902 return (FALSE);
3903 }
3904
3905 n = dict_index_get_n_fields(index);
3906
3907 if (!page_is_comp(page) &&
3908 (rec_get_n_fields_old(rec, index) != n
3909 /* a record for older SYS_INDEXES table
3910 (missing merge_threshold column) is acceptable. */
3911 && !(index->id == DICT_INDEXES_ID &&
3912 rec_get_n_fields_old(rec, index) == n - 1))) {
3913 btr_index_rec_validate_report(page, rec, index);
3914
3915 ib::error(ER_IB_MSG_37) << "Has " << rec_get_n_fields_old(rec, index)
3916 << " fields, should have " << n;
3917
3918 if (dump_on_error) {
3919 fputs("InnoDB: corrupt record ", stderr);
3920 rec_print_old(stderr, rec);
3921 putc('\n', stderr);
3922 }
3923 return (FALSE);
3924 }
3925
3926 offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
3927
3928 for (i = 0; i < n; i++) {
3929 dict_field_t *field = index->get_field(i);
3930 const dict_col_t *col = field->col;
3931 ulint fixed_size = col->get_fixed_size(page_is_comp(page));
3932
3933 rec_get_nth_field_offs(offsets, i, &len);
3934
3935 /* Note that if fixed_size != 0, it equals the
3936 length of a fixed-size column in the clustered index,
3937 except the DATA_POINT, whose length would be MBR_LEN
3938 when it's indexed in a R-TREE. We should adjust it here.
3939 A prefix index of the column is of fixed, but different
3940 length. When fixed_size == 0, prefix_len is the maximum
3941 length of the prefix index column. */
3942
3943 if (col->mtype == DATA_POINT) {
3944 ut_ad(fixed_size == DATA_POINT_LEN);
3945 if (dict_index_is_spatial(index)) {
3946 /* For DATA_POINT data, when it has R-tree
3947 index, the fixed_len is the MBR of the point.
3948 But if it's a primary key and on R-TREE
3949 as the PK pointer, the length shall be
3950 DATA_POINT_LEN as well. */
3951 ut_ad((field->fixed_len == DATA_MBR_LEN && i == 0) ||
3952 (field->fixed_len == DATA_POINT_LEN && i != 0));
3953 fixed_size = field->fixed_len;
3954 }
3955 }
3956
3957 if ((field->prefix_len == 0 && rec_field_not_null_not_add_col_def(len) &&
3958 fixed_size && len != fixed_size) ||
3959 (field->prefix_len > 0 && rec_field_not_null_not_add_col_def(len) &&
3960 len > field->prefix_len)) {
3961 btr_index_rec_validate_report(page, rec, index);
3962
3963 ib::error error(ER_IB_MSG_1224);
3964
3965 error << "Field " << i << " len is " << len << ", should be "
3966 << fixed_size;
3967
3968 if (dump_on_error) {
3969 error << "; ";
3970 rec_print(error.m_oss, rec,
3971 rec_get_info_bits(rec, rec_offs_comp(offsets)), offsets);
3972 }
3973 if (heap) {
3974 mem_heap_free(heap);
3975 }
3976 return (FALSE);
3977 }
3978 }
3979
3980 #ifdef VIRTUAL_INDEX_DEBUG
3981 if (dict_index_has_virtual(index) || index->is_clustered()) {
3982 rec_print_new(stderr, rec, offsets);
3983 }
3984 #endif
3985
3986 if (heap) {
3987 mem_heap_free(heap);
3988 }
3989 return (TRUE);
3990 }
3991
3992 /** Checks the size and number of fields in records based on the definition of
3993 the index.
3994 @return true if ok */
btr_index_page_validate(buf_block_t * block,dict_index_t * index)3995 static ibool btr_index_page_validate(buf_block_t *block, /*!< in: index page */
3996 dict_index_t *index) /*!< in: index */
3997 {
3998 page_cur_t cur;
3999 ibool ret = TRUE;
4000 #ifdef UNIV_DEBUG
4001 ulint nth = 1;
4002 #endif /* UNIV_DEBUG */
4003
4004 page_cur_set_before_first(block, &cur);
4005
4006 /* Directory slot 0 should only contain the infimum record. */
4007 DBUG_EXECUTE_IF(
4008 "check_table_rec_next",
4009 ut_a(page_rec_get_nth_const(page_cur_get_page(&cur), 0) == cur.rec);
4010 ut_a(page_dir_slot_get_n_owned(
4011 page_dir_get_nth_slot(page_cur_get_page(&cur), 0)) == 1););
4012
4013 page_cur_move_to_next(&cur);
4014
4015 for (;;) {
4016 if (page_cur_is_after_last(&cur)) {
4017 break;
4018 }
4019
4020 if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4021 return (FALSE);
4022 }
4023
4024 /* Verify that page_rec_get_nth_const() is correctly
4025 retrieving each record. */
4026 DBUG_EXECUTE_IF("check_table_rec_next",
4027 ut_a(cur.rec == page_rec_get_nth_const(
4028 page_cur_get_page(&cur),
4029 page_rec_get_n_recs_before(cur.rec)));
4030 ut_a(nth++ == page_rec_get_n_recs_before(cur.rec)););
4031
4032 page_cur_move_to_next(&cur);
4033 }
4034
4035 return (ret);
4036 }
4037
4038 /** Report an error on one page of an index tree. */
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4039 static void btr_validate_report1(
4040 dict_index_t *index, /*!< in: index */
4041 ulint level, /*!< in: B-tree level */
4042 const buf_block_t *block) /*!< in: index page */
4043 {
4044 ib::error error(ER_IB_MSG_1225);
4045 error << "In page " << block->page.id.page_no() << " of index " << index->name
4046 << " of table " << index->table->name;
4047
4048 if (level > 0) {
4049 error << ", index tree level " << level;
4050 }
4051 }
4052
4053 /** Report an error on two pages of an index tree. */
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4054 static void btr_validate_report2(
4055 const dict_index_t *index, /*!< in: index */
4056 ulint level, /*!< in: B-tree level */
4057 const buf_block_t *block1, /*!< in: first index page */
4058 const buf_block_t *block2) /*!< in: second index page */
4059 {
4060 ib::error error(ER_IB_MSG_1226);
4061 error << "In pages " << block1->page.id << " and " << block2->page.id
4062 << " of index " << index->name << " of table " << index->table->name;
4063
4064 if (level > 0) {
4065 error << ", index tree level " << level;
4066 }
4067 }
4068
4069 /** Validates index tree level.
4070 @return true if ok */
btr_validate_level(dict_index_t * index,const trx_t * trx,ulint level,bool lockout)4071 static bool btr_validate_level(
4072 dict_index_t *index, /*!< in: index tree */
4073 const trx_t *trx, /*!< in: transaction or NULL */
4074 ulint level, /*!< in: level number */
4075 bool lockout) /*!< in: true if X-latch index is intended */
4076 {
4077 buf_block_t *block;
4078 page_t *page;
4079 buf_block_t *right_block = nullptr; /* remove warning */
4080 page_t *right_page = nullptr; /* remove warning */
4081 page_t *father_page;
4082 btr_cur_t node_cur;
4083 btr_cur_t right_node_cur;
4084 rec_t *rec;
4085 page_no_t right_page_no;
4086 page_no_t left_page_no;
4087 page_cur_t cursor;
4088 dtuple_t *node_ptr_tuple;
4089 bool ret = true;
4090 mtr_t mtr;
4091 mem_heap_t *heap = mem_heap_create(256);
4092 fseg_header_t *seg;
4093 ulint *offsets = nullptr;
4094 ulint *offsets2 = nullptr;
4095 #ifdef UNIV_ZIP_DEBUG
4096 page_zip_des_t *page_zip;
4097 #endif /* UNIV_ZIP_DEBUG */
4098 ulint savepoint = 0;
4099 ulint savepoint2 = 0;
4100 page_no_t parent_page_no = FIL_NULL;
4101 page_no_t parent_right_page_no = FIL_NULL;
4102 bool rightmost_child = false;
4103
4104 mtr_start(&mtr);
4105
4106 if (!srv_read_only_mode) {
4107 if (lockout) {
4108 mtr_x_lock(dict_index_get_lock(index), &mtr);
4109 } else {
4110 mtr_sx_lock(dict_index_get_lock(index), &mtr);
4111 }
4112 }
4113
4114 block = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4115 page = buf_block_get_frame(block);
4116 seg = page + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4117
4118 #ifdef UNIV_RTR_DEBUG
4119 if (dict_index_is_spatial(index)) {
4120 fprintf(stderr, "Root page no: %lu\n", (ulong)page_get_page_no(page));
4121 }
4122 #endif
4123
4124 const fil_space_t *space = fil_space_get(index->space);
4125 const page_size_t table_page_size(dict_table_page_size(index->table));
4126 const page_size_t space_page_size(space->flags);
4127
4128 if (!table_page_size.equals_to(space_page_size)) {
4129 ib::warn(ER_IB_MSG_38) << "Flags mismatch: table=" << index->table->flags
4130 << ", tablespace=" << space->flags;
4131
4132 mtr_commit(&mtr);
4133
4134 return (false);
4135 }
4136
4137 while (level != btr_page_get_level(page, &mtr)) {
4138 const rec_t *node_ptr;
4139
4140 if (fseg_page_is_free(seg, block->page.id.space(),
4141 block->page.id.page_no())) {
4142 btr_validate_report1(index, level, block);
4143
4144 ib::warn(ER_IB_MSG_39) << "Page is free";
4145
4146 ret = false;
4147 }
4148
4149 ut_a(index->space == block->page.id.space());
4150 ut_a(index->space == page_get_space_id(page));
4151 #ifdef UNIV_ZIP_DEBUG
4152 page_zip = buf_block_get_page_zip(block);
4153 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4154 #endif /* UNIV_ZIP_DEBUG */
4155 ut_a(!page_is_leaf(page));
4156
4157 page_cur_set_before_first(block, &cursor);
4158 page_cur_move_to_next(&cursor);
4159
4160 node_ptr = page_cur_get_rec(&cursor);
4161 offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
4162
4163 savepoint2 = mtr_set_savepoint(&mtr);
4164 block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4165 page = buf_block_get_frame(block);
4166
4167 /* For R-Tree, since record order might not be the same as
4168 linked index page in the lower level, we need to travers
4169 backwards to get the first page rec in this level.
4170 This is only used for index validation. Spatial index
4171 does not use such scan for any of its DML or query
4172 operations */
4173 if (dict_index_is_spatial(index)) {
4174 left_page_no = btr_page_get_prev(page, &mtr);
4175
4176 while (left_page_no != FIL_NULL) {
4177 page_id_t left_page_id(index->space, left_page_no);
4178 /* To obey latch order of tree blocks,
4179 we should release the right_block once to
4180 obtain lock of the uncle block. */
4181 mtr_release_block_at_savepoint(&mtr, savepoint2, block);
4182
4183 savepoint2 = mtr_set_savepoint(&mtr);
4184 block = btr_block_get(left_page_id, table_page_size, RW_SX_LATCH, index,
4185 &mtr);
4186 page = buf_block_get_frame(block);
4187 left_page_no = btr_page_get_prev(page, &mtr);
4188 }
4189 }
4190 }
4191
4192 /* Now we are on the desired level. Loop through the pages on that
4193 level. */
4194
4195 if (level == 0) {
4196 /* Leaf pages are managed in their own file segment. */
4197 seg -= PAGE_BTR_SEG_TOP - PAGE_BTR_SEG_LEAF;
4198 }
4199
4200 loop:
4201 mem_heap_empty(heap);
4202 offsets = offsets2 = nullptr;
4203 if (!srv_read_only_mode) {
4204 if (lockout) {
4205 mtr_x_lock(dict_index_get_lock(index), &mtr);
4206 } else {
4207 mtr_sx_lock(dict_index_get_lock(index), &mtr);
4208 }
4209 }
4210
4211 #ifdef UNIV_ZIP_DEBUG
4212 page_zip = buf_block_get_page_zip(block);
4213 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4214 #endif /* UNIV_ZIP_DEBUG */
4215
4216 ut_a(block->page.id.space() == index->space);
4217
4218 if (fseg_page_is_free(seg, block->page.id.space(),
4219 block->page.id.page_no())) {
4220 btr_validate_report1(index, level, block);
4221
4222 ib::warn(ER_IB_MSG_40) << "Page is marked as free";
4223 ret = false;
4224
4225 } else if (btr_page_get_index_id(page) != index->id) {
4226 ib::error(ER_IB_MSG_41) << "Page index id " << btr_page_get_index_id(page)
4227 << " != data dictionary index id " << index->id;
4228
4229 ret = false;
4230
4231 } else if (!page_validate(page, index)) {
4232 btr_validate_report1(index, level, block);
4233 ret = false;
4234
4235 } else if (level == 0 && !btr_index_page_validate(block, index)) {
4236 /* We are on level 0. Check that the records have the right
4237 number of fields, and field lengths are right. */
4238
4239 ret = false;
4240 }
4241
4242 ut_a(btr_page_get_level(page, &mtr) == level);
4243
4244 right_page_no = btr_page_get_next(page, &mtr);
4245 left_page_no = btr_page_get_prev(page, &mtr);
4246
4247 ut_a(!page_is_empty(page) ||
4248 (level == 0 && page_get_page_no(page) == dict_index_get_page(index)));
4249
4250 if (right_page_no != FIL_NULL) {
4251 const rec_t *right_rec;
4252 savepoint = mtr_set_savepoint(&mtr);
4253
4254 right_block = btr_block_get(page_id_t(index->space, right_page_no),
4255 table_page_size, RW_SX_LATCH, index, &mtr);
4256
4257 right_page = buf_block_get_frame(right_block);
4258
4259 if (btr_page_get_prev(right_page, &mtr) != page_get_page_no(page)) {
4260 btr_validate_report2(index, level, block, right_block);
4261 fputs(
4262 "InnoDB: broken FIL_PAGE_NEXT"
4263 " or FIL_PAGE_PREV links\n",
4264 stderr);
4265
4266 ret = false;
4267 }
4268
4269 if (page_is_comp(right_page) != page_is_comp(page)) {
4270 btr_validate_report2(index, level, block, right_block);
4271 fputs("InnoDB: 'compact' flag mismatch\n", stderr);
4272
4273 ret = false;
4274
4275 goto node_ptr_fails;
4276 }
4277
4278 rec = page_rec_get_prev(page_get_supremum_rec(page));
4279 right_rec = page_rec_get_next(page_get_infimum_rec(right_page));
4280 offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
4281 offsets2 =
4282 rec_get_offsets(right_rec, index, offsets2, ULINT_UNDEFINED, &heap);
4283
4284 /* For spatial index, we cannot guarantee the key ordering
4285 across pages, so skip the record compare verification for
4286 now. Will enhanced in special R-Tree index validation scheme */
4287 if (!dict_index_is_spatial(index) &&
4288 cmp_rec_rec(rec, right_rec, offsets, offsets2, index, false) >= 0) {
4289 btr_validate_report2(index, level, block, right_block);
4290
4291 fputs(
4292 "InnoDB: records in wrong order"
4293 " on adjacent pages\n",
4294 stderr);
4295
4296 fputs("InnoDB: record ", stderr);
4297 rec = page_rec_get_prev(page_get_supremum_rec(page));
4298 rec_print(stderr, rec, index);
4299 putc('\n', stderr);
4300 fputs("InnoDB: record ", stderr);
4301 rec = page_rec_get_next(page_get_infimum_rec(right_page));
4302 rec_print(stderr, rec, index);
4303 putc('\n', stderr);
4304
4305 ret = false;
4306 }
4307 }
4308
4309 if (level > 0 && left_page_no == FIL_NULL) {
4310 ut_a(REC_INFO_MIN_REC_FLAG &
4311 rec_get_info_bits(page_rec_get_next(page_get_infimum_rec(page)),
4312 page_is_comp(page)));
4313 }
4314
4315 /* Similarly skip the father node check for spatial index for now,
4316 for a couple of reasons:
4317 1) As mentioned, there is no ordering relationship between records
4318 in parent level and linked pages in the child level.
4319 2) Search parent from root is very costly for R-tree.
4320 We will add special validation mechanism for R-tree later (WL #7520) */
4321 if (!dict_index_is_spatial(index) &&
4322 block->page.id.page_no() != dict_index_get_page(index)) {
4323 /* Check father node pointers */
4324 rec_t *node_ptr;
4325
4326 btr_cur_position(index, page_rec_get_next(page_get_infimum_rec(page)),
4327 block, &node_cur);
4328 offsets = btr_page_get_father_node_ptr_for_validate(offsets, heap,
4329 &node_cur, &mtr);
4330
4331 father_page = btr_cur_get_page(&node_cur);
4332 node_ptr = btr_cur_get_rec(&node_cur);
4333
4334 parent_page_no = page_get_page_no(father_page);
4335 parent_right_page_no = btr_page_get_next(father_page, &mtr);
4336 rightmost_child = page_rec_is_supremum(page_rec_get_next(node_ptr));
4337
4338 btr_cur_position(index, page_rec_get_prev(page_get_supremum_rec(page)),
4339 block, &node_cur);
4340
4341 offsets = btr_page_get_father_node_ptr_for_validate(offsets, heap,
4342 &node_cur, &mtr);
4343
4344 if (node_ptr != btr_cur_get_rec(&node_cur) ||
4345 btr_node_ptr_get_child_page_no(node_ptr, offsets) !=
4346 block->page.id.page_no()) {
4347 btr_validate_report1(index, level, block);
4348
4349 fputs("InnoDB: node pointer to the page is wrong\n", stderr);
4350
4351 fputs("InnoDB: node ptr ", stderr);
4352 rec_print(stderr, node_ptr, index);
4353
4354 rec = btr_cur_get_rec(&node_cur);
4355 fprintf(stderr,
4356 "\n"
4357 "InnoDB: node ptr child page n:o %lu\n",
4358 (ulong)btr_node_ptr_get_child_page_no(rec, offsets));
4359
4360 fputs("InnoDB: record on page ", stderr);
4361 rec_print_new(stderr, rec, offsets);
4362 putc('\n', stderr);
4363 ret = false;
4364
4365 goto node_ptr_fails;
4366 }
4367
4368 if (!page_is_leaf(page)) {
4369 node_ptr_tuple = dict_index_build_node_ptr(
4370 index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
4371 btr_page_get_level(page, &mtr));
4372
4373 if (cmp_dtuple_rec(node_ptr_tuple, node_ptr, index, offsets)) {
4374 const rec_t *first_rec = page_rec_get_next(page_get_infimum_rec(page));
4375
4376 btr_validate_report1(index, level, block);
4377
4378 ib::error(ER_IB_MSG_42) << "Node ptrs differ on levels > 0";
4379
4380 fputs("InnoDB: node ptr ", stderr);
4381 rec_print_new(stderr, node_ptr, offsets);
4382 fputs("InnoDB: first rec ", stderr);
4383 rec_print(stderr, first_rec, index);
4384 putc('\n', stderr);
4385 ret = false;
4386
4387 goto node_ptr_fails;
4388 }
4389 }
4390
4391 if (left_page_no == FIL_NULL) {
4392 ut_a(node_ptr == page_rec_get_next(page_get_infimum_rec(father_page)));
4393 ut_a(btr_page_get_prev(father_page, &mtr) == FIL_NULL);
4394 }
4395
4396 if (right_page_no == FIL_NULL) {
4397 ut_a(node_ptr == page_rec_get_prev(page_get_supremum_rec(father_page)));
4398 ut_a(btr_page_get_next(father_page, &mtr) == FIL_NULL);
4399 } else {
4400 const rec_t *right_node_ptr;
4401
4402 right_node_ptr = page_rec_get_next(node_ptr);
4403
4404 if (!lockout && rightmost_child) {
4405 /* To obey latch order of tree blocks,
4406 we should release the right_block once to
4407 obtain lock of the uncle block. */
4408 mtr_release_block_at_savepoint(&mtr, savepoint, right_block);
4409
4410 btr_block_get(page_id_t(index->space, parent_right_page_no),
4411 table_page_size, RW_SX_LATCH, index, &mtr);
4412
4413 right_block = btr_block_get(page_id_t(index->space, right_page_no),
4414 table_page_size, RW_SX_LATCH, index, &mtr);
4415 }
4416
4417 btr_cur_position(index,
4418 page_rec_get_next(page_get_infimum_rec(
4419 buf_block_get_frame(right_block))),
4420 right_block, &right_node_cur);
4421
4422 offsets = btr_page_get_father_node_ptr_for_validate(
4423 offsets, heap, &right_node_cur, &mtr);
4424
4425 if (right_node_ptr != page_get_supremum_rec(father_page)) {
4426 if (btr_cur_get_rec(&right_node_cur) != right_node_ptr) {
4427 ret = false;
4428 fputs(
4429 "InnoDB: node pointer to"
4430 " the right page is wrong\n",
4431 stderr);
4432
4433 btr_validate_report1(index, level, block);
4434 }
4435 } else {
4436 page_t *right_father_page = btr_cur_get_page(&right_node_cur);
4437
4438 if (btr_cur_get_rec(&right_node_cur) !=
4439 page_rec_get_next(page_get_infimum_rec(right_father_page))) {
4440 ret = false;
4441 fputs(
4442 "InnoDB: node pointer 2 to"
4443 " the right page is wrong\n",
4444 stderr);
4445
4446 btr_validate_report1(index, level, block);
4447 }
4448
4449 if (page_get_page_no(right_father_page) !=
4450 btr_page_get_next(father_page, &mtr)) {
4451 ret = false;
4452 fputs(
4453 "InnoDB: node pointer 3 to"
4454 " the right page is wrong\n",
4455 stderr);
4456
4457 btr_validate_report1(index, level, block);
4458 }
4459 }
4460 }
4461 }
4462
4463 node_ptr_fails:
4464 /* Commit the mini-transaction to release the latch on 'page'.
4465 Re-acquire the latch on right_page, which will become 'page'
4466 on the next loop. The page has already been checked. */
4467 mtr_commit(&mtr);
4468
4469 if (trx_is_interrupted(trx)) {
4470 /* On interrupt, return the current status. */
4471 } else if (right_page_no != FIL_NULL) {
4472 mtr_start(&mtr);
4473
4474 if (!lockout) {
4475 if (rightmost_child) {
4476 if (parent_right_page_no != FIL_NULL) {
4477 btr_block_get(page_id_t(index->space, parent_right_page_no),
4478 table_page_size, RW_SX_LATCH, index, &mtr);
4479 }
4480 } else if (parent_page_no != FIL_NULL) {
4481 btr_block_get(page_id_t(index->space, parent_page_no), table_page_size,
4482 RW_SX_LATCH, index, &mtr);
4483 }
4484 }
4485
4486 block = btr_block_get(page_id_t(index->space, right_page_no),
4487 table_page_size, RW_SX_LATCH, index, &mtr);
4488
4489 page = buf_block_get_frame(block);
4490
4491 goto loop;
4492 }
4493
4494 mem_heap_free(heap);
4495
4496 return (ret);
4497 }
4498
4499 /** Do an index level validation of spaital index tree.
4500 @return true if no error found */
btr_validate_spatial_index(dict_index_t * index,const trx_t * trx)4501 static bool btr_validate_spatial_index(
4502 dict_index_t *index, /*!< in: index */
4503 const trx_t *trx) /*!< in: transaction or NULL */
4504 {
4505 mtr_t mtr;
4506 bool ok = true;
4507
4508 mtr_start(&mtr);
4509
4510 mtr_x_lock(dict_index_get_lock(index), &mtr);
4511
4512 page_t *root = btr_root_get(index, &mtr);
4513 ulint n = btr_page_get_level(root, &mtr);
4514
4515 #ifdef UNIV_RTR_DEBUG
4516 fprintf(stderr, "R-tree level is %lu\n", n);
4517 #endif /* UNIV_RTR_DEBUG */
4518
4519 for (ulint i = 0; i <= n; ++i) {
4520 #ifdef UNIV_RTR_DEBUG
4521 fprintf(stderr, "Level %lu:\n", n - i);
4522 #endif /* UNIV_RTR_DEBUG */
4523
4524 if (!btr_validate_level(index, trx, n - i, true)) {
4525 ok = false;
4526 break;
4527 }
4528 }
4529
4530 mtr_commit(&mtr);
4531
4532 return (ok);
4533 }
4534
4535 /** Checks the consistency of an index tree.
4536 @return true if ok */
btr_validate_index(dict_index_t * index,const trx_t * trx,bool lockout)4537 bool btr_validate_index(
4538 dict_index_t *index, /*!< in: index */
4539 const trx_t *trx, /*!< in: transaction or NULL */
4540 bool lockout) /*!< in: true if X-latch index is intended */
4541 {
4542 /* Full Text index are implemented by auxiliary tables,
4543 not the B-tree */
4544 if (dict_index_is_online_ddl(index) || (index->type & DICT_FTS)) {
4545 return (true);
4546 }
4547
4548 if (dict_index_is_spatial(index)) {
4549 return (btr_validate_spatial_index(index, trx));
4550 }
4551
4552 mtr_t mtr;
4553
4554 mtr_start(&mtr);
4555
4556 if (!srv_read_only_mode) {
4557 if (lockout) {
4558 mtr_x_lock(dict_index_get_lock(index), &mtr);
4559 } else {
4560 mtr_sx_lock(dict_index_get_lock(index), &mtr);
4561 }
4562 }
4563
4564 bool ok = true;
4565 page_t *root = btr_root_get(index, &mtr);
4566 ulint n = btr_page_get_level(root, &mtr);
4567
4568 for (ulint i = 0; i <= n; ++i) {
4569 if (!btr_validate_level(index, trx, n - i, lockout)) {
4570 ok = false;
4571 break;
4572 }
4573 }
4574
4575 mtr_commit(&mtr);
4576
4577 return (ok);
4578 }
4579
4580 /** Checks if the page in the cursor can be merged with given page.
4581 If necessary, re-organize the merge_page.
4582 @return true if possible to merge. */
btr_can_merge_with_page(btr_cur_t * cursor,page_no_t page_no,buf_block_t ** merge_block,mtr_t * mtr)4583 static bool btr_can_merge_with_page(
4584 btr_cur_t *cursor, /*!< in: cursor on the page to merge */
4585 page_no_t page_no, /*!< in: a sibling page */
4586 buf_block_t **merge_block, /*!< out: the merge block */
4587 mtr_t *mtr) /*!< in: mini-transaction */
4588 {
4589 dict_index_t *index;
4590 page_t *page;
4591 ulint n_recs;
4592 ulint data_size;
4593 ulint max_ins_size_reorg;
4594 ulint max_ins_size;
4595 buf_block_t *mblock;
4596 page_t *mpage;
4597 DBUG_TRACE;
4598
4599 if (page_no == FIL_NULL) {
4600 *merge_block = nullptr;
4601 return false;
4602 }
4603
4604 index = btr_cur_get_index(cursor);
4605 page = btr_cur_get_page(cursor);
4606
4607 const page_id_t page_id(dict_index_get_space(index), page_no);
4608 const page_size_t page_size(dict_table_page_size(index->table));
4609
4610 mblock = btr_block_get(page_id, page_size, RW_X_LATCH, index, mtr);
4611 mpage = buf_block_get_frame(mblock);
4612
4613 n_recs = page_get_n_recs(page);
4614 data_size = page_get_data_size(page);
4615
4616 max_ins_size_reorg = page_get_max_insert_size_after_reorganize(mpage, n_recs);
4617
4618 if (data_size > max_ins_size_reorg) {
4619 goto error;
4620 }
4621
4622 /* If compression padding tells us that merging will result in
4623 too packed up page i.e.: which is likely to cause compression
4624 failure then don't merge the pages. */
4625 if (page_size.is_compressed() && page_is_leaf(mpage) &&
4626 (page_get_data_size(mpage) + data_size >=
4627 dict_index_zip_pad_optimal_page_size(index))) {
4628 goto error;
4629 }
4630
4631 max_ins_size = page_get_max_insert_size(mpage, n_recs);
4632
4633 if (data_size > max_ins_size) {
4634 /* We have to reorganize mpage */
4635
4636 if (!btr_page_reorganize_block(false, page_zip_level, mblock, index, mtr)) {
4637 goto error;
4638 }
4639
4640 max_ins_size = page_get_max_insert_size(mpage, n_recs);
4641
4642 ut_ad(page_validate(mpage, index));
4643 ut_ad(max_ins_size == max_ins_size_reorg);
4644
4645 if (data_size > max_ins_size) {
4646 /* Add fault tolerance, though this should
4647 never happen */
4648
4649 goto error;
4650 }
4651 }
4652
4653 *merge_block = mblock;
4654 return true;
4655
4656 error:
4657 *merge_block = nullptr;
4658 return false;
4659 }
4660
4661 /** Create an SDI Index
4662 @param[in] space_id tablespace id
4663 @param[in] page_size size of page
4664 @param[in,out] mtr mini transaction
4665 @param[in,out] table SDI table
4666 @return root page number of the SDI index created or FIL_NULL on failure */
btr_sdi_create(space_id_t space_id,const page_size_t & page_size,mtr_t * mtr,dict_table_t * table)4667 static page_no_t btr_sdi_create(space_id_t space_id,
4668 const page_size_t &page_size, mtr_t *mtr,
4669 dict_table_t *table) {
4670 dict_index_t *index = table->first_index();
4671 ut_ad(index != nullptr);
4672 ut_ad(UT_LIST_GET_LEN(table->indexes) == 1);
4673
4674 index->page = btr_create(DICT_CLUSTERED | DICT_UNIQUE | DICT_SDI, space_id,
4675 page_size, index->id, index, mtr);
4676
4677 return (index->page);
4678 }
4679
4680 /** Creates SDI index and stores the root page number in page 1 & 2
4681 @param[in] space_id tablespace id
4682 @param[in] dict_locked true if dict_sys mutex is acquired
4683 @return DB_SUCCESS on success, else DB_ERROR on failure */
btr_sdi_create_index(space_id_t space_id,bool dict_locked)4684 dberr_t btr_sdi_create_index(space_id_t space_id, bool dict_locked) {
4685 fil_space_t *space = fil_space_acquire(space_id);
4686 if (space == nullptr) {
4687 ut_ad(0);
4688 return (DB_ERROR);
4689 }
4690
4691 dict_table_t *sdi_table;
4692 page_no_t sdi_root_page_num;
4693
4694 sdi_table = dict_sdi_get_table(space_id, dict_locked, true);
4695 ut_ad(sdi_table != nullptr);
4696
4697 mtr_t mtr;
4698 mtr.start();
4699
4700 const page_size_t page_size = page_size_t(space->flags);
4701
4702 /* Create B-Tree root page for SDI Indexes */
4703
4704 sdi_root_page_num = btr_sdi_create(space_id, page_size, &mtr, sdi_table);
4705
4706 if (sdi_root_page_num == FIL_NULL) {
4707 ib::error(ER_IB_MSG_43) << "Unable to create root index page"
4708 " for SDI table "
4709 << " in tablespace " << space_id;
4710 mtr.commit();
4711 dict_sdi_remove_from_cache(space_id, sdi_table, dict_locked);
4712 fil_space_release(space);
4713 return (DB_ERROR);
4714 } else {
4715 dict_index_t *index = sdi_table->first_index();
4716 index->page = sdi_root_page_num;
4717 }
4718
4719 buf_block_t *block =
4720 buf_page_get(page_id_t(space_id, 0), page_size, RW_SX_LATCH, &mtr);
4721
4722 buf_block_dbg_add_level(block, SYNC_FSP_PAGE);
4723
4724 page_t *page = buf_block_get_frame(block);
4725
4726 /* Write SDI Index root page numbers to Page 0 */
4727 fsp_sdi_write_root_to_page(page, page_size, sdi_root_page_num, &mtr);
4728
4729 /* Space flags from memory */
4730 uint32_t fsp_flags = space->flags;
4731
4732 ut_ad(mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_FLAGS) ==
4733 fsp_flags);
4734
4735 fsp_flags_set_sdi(fsp_flags);
4736 mlog_write_ulint(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + page, fsp_flags,
4737 MLOG_4BYTES, &mtr);
4738
4739 mtr.commit();
4740
4741 fil_space_set_flags(space, fsp_flags);
4742
4743 dict_table_close(sdi_table, dict_locked, false);
4744
4745 fil_space_release(space);
4746 return (DB_SUCCESS);
4747 }
4748 #endif /* !UNIV_HOTBACKUP */
4749