1 /*****************************************************************************
2 
3 Copyright (c) 1994, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License, version 2.0, as published by the
8 Free Software Foundation.
9 
10 This program is also distributed with certain software (including but not
11 limited to OpenSSL) that is licensed under separate terms, as designated in a
12 particular file or component or in included license documentation. The authors
13 of MySQL hereby grant you an additional permission to link the program and
14 your derivative works with the separately licensed software that they have
15 included with MySQL.
16 
17 This program is distributed in the hope that it will be useful, but WITHOUT
18 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
19 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
20 for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
25 
26 *****************************************************************************/
27 
28 /** @file btr/btr0btr.cc
29  The B-tree
30 
31  Created 6/2/1994 Heikki Tuuri
32  *******************************************************/
33 
34 #include "btr0btr.h"
35 
36 #include <sys/types.h>
37 
38 #ifndef UNIV_HOTBACKUP
39 #include "btr0cur.h"
40 #include "btr0pcur.h"
41 #include "btr0sea.h"
42 #include "buf0stats.h"
43 #include "dict0boot.h"
44 #include "fsp0sysspace.h"
45 #include "gis0geo.h"
46 #include "gis0rtree.h"
47 #include "ibuf0ibuf.h"
48 #include "lock0lock.h"
49 #include "my_dbug.h"
50 #include "page0page.h"
51 #include "page0zip.h"
52 #include "rem0cmp.h"
53 #include "srv0mon.h"
54 #include "trx0trx.h"
55 #include "ut0new.h"
56 #endif /* !UNIV_HOTBACKUP */
57 
58 #ifndef UNIV_HOTBACKUP
59 /** Checks if the page in the cursor can be merged with given page.
60  If necessary, re-organize the merge_page.
61  @return	true if possible to merge. */
62 static bool btr_can_merge_with_page(
63     btr_cur_t *cursor,         /*!< in: cursor on the page to merge */
64     page_no_t page_no,         /*!< in: a sibling page */
65     buf_block_t **merge_block, /*!< out: the merge block */
66     mtr_t *mtr);               /*!< in: mini-transaction */
67 #endif                         /* !UNIV_HOTBACKUP */
68 
69 /** Report that an index page is corrupted. */
btr_corruption_report(const buf_block_t * block,const dict_index_t * index)70 void btr_corruption_report(const buf_block_t *block, /*!< in: corrupted block */
71                            const dict_index_t *index) /*!< in: index tree */
72 {
73   ib::error(ER_IB_MSG_27) << "Flag mismatch in page " << block->page.id
74                           << " index " << index->name << " of table "
75                           << index->table->name;
76 }
77 
78 #ifndef UNIV_HOTBACKUP
79 /*
80 Latching strategy of the InnoDB B-tree
81 --------------------------------------
82 A tree latch protects all non-leaf nodes of the tree. Each node of a tree
83 also has a latch of its own.
84 
85 A B-tree operation normally first acquires an S-latch on the tree. It
86 searches down the tree and releases the tree latch when it has the
87 leaf node latch. To save CPU time we do not acquire any latch on
88 non-leaf nodes of the tree during a search, those pages are only bufferfixed.
89 
90 If an operation needs to restructure the tree, it acquires an X-latch on
91 the tree before searching to a leaf node. If it needs, for example, to
92 split a leaf,
93 (1) InnoDB decides the split point in the leaf,
94 (2) allocates a new page,
95 (3) inserts the appropriate node pointer to the first non-leaf level,
96 (4) releases the tree X-latch,
97 (5) and then moves records from the leaf to the new allocated page.
98 
99 Node pointers
100 -------------
101 Leaf pages of a B-tree contain the index records stored in the
102 tree. On levels n > 0 we store 'node pointers' to pages on level
103 n - 1. For each page there is exactly one node pointer stored:
104 thus the our tree is an ordinary B-tree, not a B-link tree.
105 
106 A node pointer contains a prefix P of an index record. The prefix
107 is long enough so that it determines an index record uniquely.
108 The file page number of the child page is added as the last
109 field. To the child page we can store node pointers or index records
110 which are >= P in the alphabetical order, but < P1 if there is
111 a next node pointer on the level, and P1 is its prefix.
112 
113 If a node pointer with a prefix P points to a non-leaf child,
114 then the leftmost record in the child must have the same
115 prefix P. If it points to a leaf node, the child is not required
116 to contain any record with a prefix equal to P. The leaf case
117 is decided this way to allow arbitrary deletions in a leaf node
118 without touching upper levels of the tree.
119 
120 We have predefined a special minimum record which we
121 define as the smallest record in any alphabetical order.
122 A minimum record is denoted by setting a bit in the record
123 header. A minimum record acts as the prefix of a node pointer
124 which points to a leftmost node on any level of the tree.
125 
126 File page allocation
127 --------------------
128 In the root node of a B-tree there are two file segment headers.
129 The leaf pages of a tree are allocated from one file segment, to
130 make them consecutive on disk if possible. From the other file segment
131 we allocate pages for the non-leaf levels of the tree.
132 */
133 
134 #ifdef UNIV_BTR_DEBUG
135 /** Checks a file segment header within a B-tree root page.
136  @return true if valid */
btr_root_fseg_validate(const fseg_header_t * seg_header,space_id_t space)137 static ibool btr_root_fseg_validate(
138     const fseg_header_t *seg_header, /*!< in: segment header */
139     space_id_t space)                /*!< in: tablespace identifier */
140 {
141   ulint offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
142 
143   ut_a(mach_read_from_4(seg_header + FSEG_HDR_SPACE) == space);
144   ut_a(offset >= FIL_PAGE_DATA);
145   ut_a(offset <= UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
146   return (TRUE);
147 }
148 #endif /* UNIV_BTR_DEBUG */
149 
150 /** Gets the root node of a tree and x- or s-latches it.
151  @return root page, x- or s-latched */
btr_root_block_get(const dict_index_t * index,ulint mode,mtr_t * mtr)152 buf_block_t *btr_root_block_get(
153     const dict_index_t *index, /*!< in: index tree */
154     ulint mode,                /*!< in: either RW_S_LATCH
155                                or RW_X_LATCH */
156     mtr_t *mtr)                /*!< in: mtr */
157 {
158   const space_id_t space_id = dict_index_get_space(index);
159   const page_id_t page_id(space_id, dict_index_get_page(index));
160   const page_size_t page_size(dict_table_page_size(index->table));
161 
162   buf_block_t *block = btr_block_get(page_id, page_size, mode, index, mtr);
163 
164   btr_assert_not_corrupted(block, index);
165 #ifdef UNIV_BTR_DEBUG
166   if (!dict_index_is_ibuf(index)) {
167     const page_t *root = buf_block_get_frame(block);
168 
169     ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + root,
170                                 space_id));
171     ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
172                                 space_id));
173   }
174 #endif /* UNIV_BTR_DEBUG */
175 
176   return (block);
177 }
178 
179 /** Gets the root node of a tree and sx-latches it for segment access.
180  @return root page, sx-latched */
btr_root_get(const dict_index_t * index,mtr_t * mtr)181 page_t *btr_root_get(const dict_index_t *index, /*!< in: index tree */
182                      mtr_t *mtr)                /*!< in: mtr */
183 {
184   /* Intended to be used for segment list access.
185   SX lock doesn't block reading user data by other threads.
186   And block the segment list access by others.*/
187   return (buf_block_get_frame(btr_root_block_get(index, RW_SX_LATCH, mtr)));
188 }
189 
190 /** Gets the height of the B-tree (the level of the root, when the leaf
191  level is assumed to be 0). The caller must hold an S or X latch on
192  the index.
193  @return tree height (level of the root) */
btr_height_get(dict_index_t * index,mtr_t * mtr)194 ulint btr_height_get(dict_index_t *index, /*!< in: index tree */
195                      mtr_t *mtr)          /*!< in/out: mini-transaction */
196 {
197   ulint height;
198   buf_block_t *root_block;
199 
200   ut_ad(srv_read_only_mode ||
201         mtr_memo_contains_flagged(
202             mtr, dict_index_get_lock(index),
203             MTR_MEMO_S_LOCK | MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
204         index->table->is_intrinsic());
205 
206   /* S latches the page */
207   root_block = btr_root_block_get(index, RW_S_LATCH, mtr);
208 
209   height = btr_page_get_level(buf_block_get_frame(root_block), mtr);
210 
211   /* Release the S latch on the root page. */
212   mtr->memo_release(root_block, MTR_MEMO_PAGE_S_FIX);
213 
214   ut_d(sync_check_unlock(&root_block->lock));
215 
216   return (height);
217 }
218 
219 /** Checks a file segment header within a B-tree root page and updates
220  the segment header space id.
221  @return true if valid */
btr_root_fseg_adjust_on_import(fseg_header_t * seg_header,page_zip_des_t * page_zip,space_id_t space,mtr_t * mtr)222 static bool btr_root_fseg_adjust_on_import(
223     fseg_header_t *seg_header, /*!< in/out: segment header */
224     page_zip_des_t *page_zip,  /*!< in/out: compressed page,
225                                or NULL */
226     space_id_t space,          /*!< in: tablespace identifier */
227     mtr_t *mtr)                /*!< in/out: mini-transaction */
228 {
229   page_no_t offset = mach_read_from_2(seg_header + FSEG_HDR_OFFSET);
230 
231   if (offset < FIL_PAGE_DATA || offset > UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) {
232     return (FALSE);
233 
234   } else if (page_zip) {
235     mach_write_to_4(seg_header + FSEG_HDR_SPACE, space);
236     page_zip_write_header(page_zip, seg_header + FSEG_HDR_SPACE, 4, mtr);
237   } else {
238     mlog_write_ulint(seg_header + FSEG_HDR_SPACE, space, MLOG_4BYTES, mtr);
239   }
240 
241   return (TRUE);
242 }
243 
244 /** Checks and adjusts the root node of a tree during IMPORT TABLESPACE.
245  @return error code, or DB_SUCCESS */
btr_root_adjust_on_import(const dict_index_t * index)246 dberr_t btr_root_adjust_on_import(
247     const dict_index_t *index) /*!< in: index tree */
248 {
249   dberr_t err;
250   mtr_t mtr;
251   page_t *page;
252   buf_block_t *block;
253   page_zip_des_t *page_zip;
254   dict_table_t *table = index->table;
255   const space_id_t space_id = dict_index_get_space(index);
256   const page_id_t page_id(space_id, dict_index_get_page(index));
257   const page_size_t page_size(dict_table_page_size(table));
258 
259   DBUG_EXECUTE_IF("ib_import_trigger_corruption_3", return (DB_CORRUPTION););
260 
261   mtr_start(&mtr);
262 
263   mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);
264 
265   block = btr_block_get(page_id, page_size, RW_X_LATCH, index, &mtr);
266 
267   page = buf_block_get_frame(block);
268   page_zip = buf_block_get_page_zip(block);
269 
270   if (!page_is_root(page)) {
271     err = DB_CORRUPTION;
272 
273   } else if (index->is_clustered()) {
274     bool page_is_compact_format;
275 
276     page_is_compact_format = page_is_comp(page) > 0;
277 
278     /* Check if the page format and table format agree. */
279     if (page_is_compact_format != !!dict_table_is_comp(table)) {
280       err = DB_CORRUPTION;
281     } else {
282       /* Check that the table flags and the tablespace
283       flags match. */
284       uint32_t flags = dict_tf_to_fsp_flags(table->flags);
285       uint32_t fsp_flags = fil_space_get_flags(table->space);
286 
287       /* We remove SDI flag from space flags temporarily for
288       comparison because the space flags derived from table
289       flags will not have SDI flag */
290       fsp_flags_unset_sdi(fsp_flags);
291 
292       /* As encryption is not a table property, we don't keep
293       any encryption property related flag in table. Thus
294       exclude encryption flag as well. */
295       fsp_flags_unset_encryption(fsp_flags);
296 
297       err = fsp_flags_are_equal(flags, fsp_flags) ? DB_SUCCESS : DB_CORRUPTION;
298     }
299   } else {
300     err = DB_SUCCESS;
301   }
302 
303   /* Check and adjust the file segment headers, if all OK so far. */
304   if (err == DB_SUCCESS &&
305       (!btr_root_fseg_adjust_on_import(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + page,
306                                        page_zip, space_id, &mtr) ||
307        !btr_root_fseg_adjust_on_import(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + page,
308                                        page_zip, space_id, &mtr))) {
309     err = DB_CORRUPTION;
310   }
311 
312   mtr_commit(&mtr);
313 
314   return (err);
315 }
316 
317 /** Creates a new index page (not the root, and also not
318  used in page reorganization).  @see btr_page_empty(). */
btr_page_create(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)319 void btr_page_create(
320     buf_block_t *block,       /*!< in/out: page to be created */
321     page_zip_des_t *page_zip, /*!< in/out: compressed page, or NULL */
322     dict_index_t *index,      /*!< in: index */
323     ulint level,              /*!< in: the B-tree level of the page */
324     mtr_t *mtr)               /*!< in: mtr */
325 {
326   page_t *page = buf_block_get_frame(block);
327 
328   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
329 
330   uint16_t page_create_type;
331   if (dict_index_is_spatial(index)) {
332     page_create_type = FIL_PAGE_RTREE;
333   } else if (dict_index_is_sdi(index)) {
334     page_create_type = FIL_PAGE_SDI;
335   } else {
336     page_create_type = FIL_PAGE_INDEX;
337   }
338 
339   if (page_zip) {
340     page_create_zip(block, index, level, 0, mtr, page_create_type);
341   } else {
342     page_create(block, mtr, dict_table_is_comp(index->table), page_create_type);
343     /* Set the level of the new index page */
344     btr_page_set_level(page, nullptr, level, mtr);
345   }
346 
347   /* For Spatial Index, initialize the Split Sequence Number */
348   if (dict_index_is_spatial(index)) {
349     page_set_ssn_id(block, page_zip, 0, mtr);
350   }
351 
352   btr_page_set_index_id(page, page_zip, index->id, mtr);
353 
354   if (level == 0) {
355     buf_stat_per_index->inc(index_id_t(index->space, index->id));
356   }
357 }
358 
359 /** Allocates a new file page to be used in an ibuf tree. Takes the page from
360  the free list of the tree, which must contain pages!
361  @return new allocated block, x-latched */
btr_page_alloc_for_ibuf(dict_index_t * index,mtr_t * mtr)362 static buf_block_t *btr_page_alloc_for_ibuf(
363     dict_index_t *index, /*!< in: index tree */
364     mtr_t *mtr)          /*!< in: mtr */
365 {
366   fil_addr_t node_addr;
367   page_t *root;
368   page_t *new_page;
369   buf_block_t *new_block;
370 
371   root = btr_root_get(index, mtr);
372 
373   node_addr = flst_get_first(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
374   ut_a(node_addr.page != FIL_NULL);
375 
376   new_block =
377       buf_page_get(page_id_t(dict_index_get_space(index), node_addr.page),
378                    dict_table_page_size(index->table), RW_X_LATCH, mtr);
379 
380   new_page = buf_block_get_frame(new_block);
381   buf_block_dbg_add_level(new_block, SYNC_IBUF_TREE_NODE_NEW);
382 
383   flst_remove(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
384               new_page + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
385   ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr));
386 
387   return (new_block);
388 }
389 
390 /** Allocates a new file page to be used in an index tree. NOTE: we assume
391  that the caller has made the reservation for free extents!
392  @retval NULL if no page could be allocated
393  @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
394  (init_mtr == mtr, or the page was not previously freed in mtr)
395  @retval block (not allocated or initialized) otherwise */
btr_page_alloc_low(dict_index_t * index,page_no_t hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)396 static MY_ATTRIBUTE((warn_unused_result)) buf_block_t *btr_page_alloc_low(
397     dict_index_t *index,    /*!< in: index */
398     page_no_t hint_page_no, /*!< in: hint of a good page */
399     byte file_direction,    /*!< in: direction where a possible
400                             page split is made */
401     ulint level,            /*!< in: level where the page is placed
402                             in the tree */
403     mtr_t *mtr,             /*!< in/out: mini-transaction
404                             for the allocation */
405     mtr_t *init_mtr)        /*!< in/out: mtr or another
406                             mini-transaction in which the
407                             page should be initialized.
408                             If init_mtr!=mtr, but the page
409                             is already X-latched in mtr, do
410                             not initialize the page. */
411 {
412   fseg_header_t *seg_header;
413   page_t *root;
414 
415   root = btr_root_get(index, mtr);
416 
417   if (level == 0) {
418     seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
419   } else {
420     seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
421   }
422 
423   /* Parameter TRUE below states that the caller has made the
424   reservation for free extents, and thus we know that a page can
425   be allocated: */
426 
427   return (fseg_alloc_free_page_general(seg_header, hint_page_no, file_direction,
428                                        TRUE, mtr, init_mtr));
429 }
430 
431 /** Allocates a new file page to be used in an index tree. NOTE: we assume
432  that the caller has made the reservation for free extents!
433  @retval NULL if no page could be allocated
434  @retval block, rw_lock_x_lock_count(&block->lock) == 1 if allocation succeeded
435  (init_mtr == mtr, or the page was not previously freed in mtr)
436  @retval block (not allocated or initialized) otherwise */
btr_page_alloc(dict_index_t * index,page_no_t hint_page_no,byte file_direction,ulint level,mtr_t * mtr,mtr_t * init_mtr)437 buf_block_t *btr_page_alloc(
438     dict_index_t *index,    /*!< in: index */
439     page_no_t hint_page_no, /*!< in: hint of a good page */
440     byte file_direction,    /*!< in: direction where a possible
441                             page split is made */
442     ulint level,            /*!< in: level where the page is placed
443                             in the tree */
444     mtr_t *mtr,             /*!< in/out: mini-transaction
445                             for the allocation */
446     mtr_t *init_mtr)        /*!< in/out: mini-transaction
447                             for x-latching and initializing
448                             the page */
449 {
450   buf_block_t *new_block;
451 
452   if (dict_index_is_ibuf(index)) {
453     return (btr_page_alloc_for_ibuf(index, mtr));
454   }
455 
456   new_block = btr_page_alloc_low(index, hint_page_no, file_direction, level,
457                                  mtr, init_mtr);
458 
459   if (new_block) {
460     buf_block_dbg_add_level(new_block, SYNC_TREE_NODE_NEW);
461   }
462 
463   return (new_block);
464 }
465 
466 /** Gets the number of pages in a B-tree.
467  @return number of pages, or ULINT_UNDEFINED if the index is unavailable */
btr_get_size(dict_index_t * index,ulint flag,mtr_t * mtr)468 ulint btr_get_size(dict_index_t *index, /*!< in: index */
469                    ulint flag, /*!< in: BTR_N_LEAF_PAGES or BTR_TOTAL_SIZE */
470                    mtr_t *mtr) /*!< in/out: mini-transaction where index
471                                is s-latched */
472 {
473   fseg_header_t *seg_header;
474   page_t *root;
475   ulint n;
476   ulint dummy;
477 
478   ut_ad(srv_read_only_mode ||
479         mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_S_LOCK) ||
480         index->table->is_intrinsic());
481   ut_ad(index->page >= FSP_FIRST_INODE_PAGE_NO);
482 
483   if (index->page == FIL_NULL || dict_index_is_online_ddl(index) ||
484       !index->is_committed()) {
485     return (ULINT_UNDEFINED);
486   }
487 
488   root = btr_root_get(index, mtr);
489 
490   if (flag == BTR_N_LEAF_PAGES) {
491     seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
492 
493     fseg_n_reserved_pages(seg_header, &n, mtr);
494 
495   } else if (flag == BTR_TOTAL_SIZE) {
496     seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
497 
498     n = fseg_n_reserved_pages(seg_header, &dummy, mtr);
499 
500     seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
501 
502     n += fseg_n_reserved_pages(seg_header, &dummy, mtr);
503   } else {
504     ut_error;
505   }
506 
507   return (n);
508 }
509 
510 /** Frees a page used in an ibuf tree. Puts the page to the free list of the
511  ibuf tree. */
btr_page_free_for_ibuf(dict_index_t * index,buf_block_t * block,mtr_t * mtr)512 static void btr_page_free_for_ibuf(
513     dict_index_t *index, /*!< in: index tree */
514     buf_block_t *block,  /*!< in: block to be freed, x-latched */
515     mtr_t *mtr)          /*!< in: mtr */
516 {
517   page_t *root;
518 
519   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
520   root = btr_root_get(index, mtr);
521 
522   flst_add_first(
523       root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
524       buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
525       mtr);
526 
527   ut_ad(flst_validate(root + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr));
528 }
529 
530 /** Frees a file page used in an index tree. Can be used also to (BLOB)
531  external storage pages. */
btr_page_free_low(dict_index_t * index,buf_block_t * block,ulint level,mtr_t * mtr)532 void btr_page_free_low(
533     dict_index_t *index, /*!< in: index tree */
534     buf_block_t *block,  /*!< in: block to be freed, x-latched */
535     ulint level,         /*!< in: page level (ULINT_UNDEFINED=BLOB) */
536     mtr_t *mtr)          /*!< in: mtr */
537 {
538   fseg_header_t *seg_header;
539   page_t *root;
540 
541   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
542   /* The page gets invalid for optimistic searches: increment the frame
543   modify clock */
544 
545   buf_block_modify_clock_inc(block);
546 
547   if (dict_index_is_ibuf(index)) {
548     btr_page_free_for_ibuf(index, block, mtr);
549 
550     return;
551   }
552 
553   root = btr_root_get(index, mtr);
554 
555   if (level == 0 || level == ULINT_UNDEFINED) {
556     seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
557   } else {
558     seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
559   }
560 
561 #ifdef UNIV_GIS_DEBUG
562   if (dict_index_is_spatial(index)) {
563     fprintf(stderr, "GIS_DIAG: Freed  %ld\n", (long)block->page.id.page_no());
564   }
565 #endif
566 
567   fseg_free_page(seg_header, block->page.id.space(), block->page.id.page_no(),
568                  level != ULINT_UNDEFINED, mtr);
569 
570   /* The page was marked free in the allocation bitmap, but it
571   should remain buffer-fixed until mtr_commit(mtr) or until it
572   is explicitly freed from the mini-transaction. */
573   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
574   /* TODO: Discard any operations on the page from the redo log
575   and remove the block from the flush list and the buffer pool.
576   This would free up buffer pool earlier and reduce writes to
577   both the tablespace and the redo log. */
578 }
579 
580 /** Frees a file page used in an index tree. NOTE: cannot free field external
581  storage pages because the page must contain info on its level. */
btr_page_free(dict_index_t * index,buf_block_t * block,mtr_t * mtr)582 void btr_page_free(dict_index_t *index, /*!< in: index tree */
583                    buf_block_t *block,  /*!< in: block to be freed, x-latched */
584                    mtr_t *mtr)          /*!< in: mtr */
585 {
586   const page_t *page = buf_block_get_frame(block);
587   ulint level = btr_page_get_level(page, mtr);
588 
589   ut_ad(fil_page_index_page_check(block->frame));
590   ut_ad(level != ULINT_UNDEFINED);
591   btr_page_free_low(index, block, level, mtr);
592 }
593 
594 /** Sets the child node file address in a node pointer. */
595 UNIV_INLINE
btr_node_ptr_set_child_page_no(rec_t * rec,page_zip_des_t * page_zip,const ulint * offsets,page_no_t page_no,mtr_t * mtr)596 void btr_node_ptr_set_child_page_no(
597     rec_t *rec,               /*!< in: node pointer record */
598     page_zip_des_t *page_zip, /*!< in/out: compressed page whose uncompressed
599                              part will be updated, or NULL */
600     const ulint *offsets,     /*!< in: array returned by rec_get_offsets() */
601     page_no_t page_no,        /*!< in: child node address */
602     mtr_t *mtr)               /*!< in: mtr */
603 {
604   byte *field;
605   ulint len;
606 
607   ut_ad(rec_offs_validate(rec, nullptr, offsets));
608   ut_ad(!page_is_leaf(page_align(rec)));
609   ut_ad(!rec_offs_comp(offsets) || rec_get_node_ptr_flag(rec));
610 
611   /* The child address is in the last field */
612   field = const_cast<byte *>(
613       rec_get_nth_field(rec, offsets, rec_offs_n_fields(offsets) - 1, &len));
614 
615   ut_ad(len == REC_NODE_PTR_SIZE);
616 
617   if (page_zip) {
618     page_zip_write_node_ptr(page_zip, rec, rec_offs_data_size(offsets), page_no,
619                             mtr);
620   } else {
621     mlog_write_ulint(field, page_no, MLOG_4BYTES, mtr);
622   }
623 }
624 
btr_node_ptr_get_child(const rec_t * node_ptr,dict_index_t * index,const ulint * offsets,mtr_t * mtr,rw_lock_type_t type)625 buf_block_t *btr_node_ptr_get_child(const rec_t *node_ptr, dict_index_t *index,
626                                     const ulint *offsets, mtr_t *mtr,
627                                     rw_lock_type_t type) {
628   ut_ad(rec_offs_validate(node_ptr, index, offsets));
629 
630   const page_id_t page_id(page_get_space_id(page_align(node_ptr)),
631                           btr_node_ptr_get_child_page_no(node_ptr, offsets));
632 
633   return (btr_block_get(page_id, dict_table_page_size(index->table), type,
634                         index, mtr));
635 }
636 
637 /** Returns the upper level node pointer to a page. It is assumed that mtr holds
638  an sx-latch on the tree.
639  @return rec_get_offsets() of the node pointer record */
btr_page_get_father_node_ptr_func(ulint * offsets,mem_heap_t * heap,btr_cur_t * cursor,ulint latch_mode,const char * file,ulint line,mtr_t * mtr)640 static ulint *btr_page_get_father_node_ptr_func(
641     ulint *offsets,    /*!< in: work area for the return value */
642     mem_heap_t *heap,  /*!< in: memory heap to use */
643     btr_cur_t *cursor, /*!< in: cursor pointing to user record,
644                        out: cursor on node pointer record,
645                        its page x-latched */
646     ulint latch_mode,  /*!< in: BTR_CONT_MODIFY_TREE
647                     or BTR_CONT_SEARCH_TREE */
648     const char *file,  /*!< in: file name */
649     ulint line,        /*!< in: line where called */
650     mtr_t *mtr)        /*!< in: mtr */
651 {
652   dtuple_t *tuple;
653   rec_t *user_rec;
654   rec_t *node_ptr;
655   ulint level;
656   page_no_t page_no;
657   dict_index_t *index;
658 
659   ut_ad(latch_mode == BTR_CONT_MODIFY_TREE ||
660         latch_mode == BTR_CONT_SEARCH_TREE);
661 
662   page_no = btr_cur_get_block(cursor)->page.id.page_no();
663   index = btr_cur_get_index(cursor);
664   ut_ad(!dict_index_is_spatial(index));
665 
666   ut_ad(srv_read_only_mode ||
667         mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
668                                   MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
669         index->table->is_intrinsic());
670 
671   ut_ad(dict_index_get_page(index) != page_no);
672 
673   level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
674 
675   user_rec = btr_cur_get_rec(cursor);
676   ut_a(page_rec_is_user_rec(user_rec));
677 
678   tuple = dict_index_build_node_ptr(index, user_rec, 0, heap, level);
679   if (index->table->is_intrinsic()) {
680     btr_cur_search_to_nth_level_with_no_latch(
681         index, level + 1, tuple, PAGE_CUR_LE, cursor, file, line, mtr);
682   } else {
683     btr_cur_search_to_nth_level(index, level + 1, tuple, PAGE_CUR_LE,
684                                 latch_mode, cursor, 0, file, line, mtr);
685   }
686 
687   node_ptr = btr_cur_get_rec(cursor);
688   ut_ad(!page_rec_is_comp(node_ptr) ||
689         rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
690   offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
691 
692   if (btr_node_ptr_get_child_page_no(node_ptr, offsets) != page_no) {
693     rec_t *print_rec;
694 
695     ib::error(ER_IB_MSG_28)
696         << "Corruption of an index tree: table " << index->table->name
697         << " index " << index->name << ", father ptr page no "
698         << btr_node_ptr_get_child_page_no(node_ptr, offsets)
699         << ", child page no " << page_no;
700 
701     print_rec = page_rec_get_next(page_get_infimum_rec(page_align(user_rec)));
702     offsets =
703         rec_get_offsets(print_rec, index, offsets, ULINT_UNDEFINED, &heap);
704     page_rec_print(print_rec, offsets);
705     offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
706     page_rec_print(node_ptr, offsets);
707 
708     ib::fatal(ER_IB_MSG_29) << "You should dump + drop + reimport the table to"
709                             << " fix the corruption. If the crash happens at"
710                             << " database startup. " << FORCE_RECOVERY_MSG
711                             << " Then dump + drop + reimport.";
712   }
713 
714   return (offsets);
715 }
716 
717 #define btr_page_get_father_node_ptr(of, heap, cur, mtr)                 \
718   btr_page_get_father_node_ptr_func(of, heap, cur, BTR_CONT_MODIFY_TREE, \
719                                     __FILE__, __LINE__, mtr)
720 
721 #define btr_page_get_father_node_ptr_for_validate(of, heap, cur, mtr)    \
722   btr_page_get_father_node_ptr_func(of, heap, cur, BTR_CONT_SEARCH_TREE, \
723                                     __FILE__, __LINE__, mtr)
724 
725 /** Returns the upper level node pointer to a page. It is assumed that mtr holds
726  an x-latch on the tree.
727  @return rec_get_offsets() of the node pointer record */
btr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)728 static ulint *btr_page_get_father_block(
729     ulint *offsets,      /*!< in: work area for the return value */
730     mem_heap_t *heap,    /*!< in: memory heap to use */
731     dict_index_t *index, /*!< in: b-tree index */
732     buf_block_t *block,  /*!< in: child page in the index */
733     mtr_t *mtr,          /*!< in: mtr */
734     btr_cur_t *cursor)   /*!< out: cursor on node pointer record,
735                          its page x-latched */
736 {
737   rec_t *rec =
738       page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(block)));
739   btr_cur_position(index, rec, block, cursor);
740   return (btr_page_get_father_node_ptr(offsets, heap, cursor, mtr));
741 }
742 
743 /** Seeks to the upper level node pointer to a page.
744  It is assumed that mtr holds an x-latch on the tree. */
btr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * cursor)745 static void btr_page_get_father(
746     dict_index_t *index, /*!< in: b-tree index */
747     buf_block_t *block,  /*!< in: child page in the index */
748     mtr_t *mtr,          /*!< in: mtr */
749     btr_cur_t *cursor)   /*!< out: cursor on node pointer record,
750                          its page x-latched */
751 {
752   mem_heap_t *heap;
753   rec_t *rec =
754       page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(block)));
755   btr_cur_position(index, rec, block, cursor);
756 
757   heap = mem_heap_create(100);
758   btr_page_get_father_node_ptr(nullptr, heap, cursor, mtr);
759   mem_heap_free(heap);
760 }
761 
762 /** Free a B-tree root page. btr_free_but_not_root() must already
763 have been called.
764 In a persistent tablespace, the caller must invoke fsp_init_file_page()
765 before mtr.commit().
766 @param[in,out]	block	index root page
767 @param[in,out]	mtr	mini-transaction */
btr_free_root(buf_block_t * block,mtr_t * mtr)768 static void btr_free_root(buf_block_t *block, mtr_t *mtr) {
769   fseg_header_t *header;
770 
771   ut_ad(mtr_memo_contains_flagged(mtr, block, MTR_MEMO_PAGE_X_FIX));
772 
773   btr_search_drop_page_hash_index(block);
774 
775   header = buf_block_get_frame(block) + PAGE_HEADER + PAGE_BTR_SEG_TOP;
776 #ifdef UNIV_BTR_DEBUG
777   ut_a(btr_root_fseg_validate(header, block->page.id.space()));
778 #endif /* UNIV_BTR_DEBUG */
779 
780   while (!fseg_free_step(header, true, mtr)) {
781     /* Free the entire segment in small steps. */
782   }
783 }
784 
785 /** PAGE_INDEX_ID value for freed index B-trees */
786 static const space_index_t BTR_FREED_INDEX_ID = 0;
787 
788 /** Invalidate an index root page so that btr_free_root_check()
789 will not find it.
790 @param[in,out]	block	index root page
791 @param[in,out]	mtr	mini-transaction */
btr_free_root_invalidate(buf_block_t * block,mtr_t * mtr)792 static void btr_free_root_invalidate(buf_block_t *block, mtr_t *mtr) {
793   ut_ad(page_is_root(block->frame));
794 
795   btr_page_set_index_id(buf_block_get_frame(block),
796                         buf_block_get_page_zip(block), BTR_FREED_INDEX_ID, mtr);
797 }
798 
799 /** Prepare to free a B-tree.
800 @param[in]	page_id		page id
801 @param[in]	page_size	page size
802 @param[in]	index_id	PAGE_INDEX_ID contents
803 @param[in,out]	mtr		mini-transaction
804 @return root block, to invoke btr_free_but_not_root() and btr_free_root()
805 @retval NULL if the page is no longer a matching B-tree page */
btr_free_root_check(const page_id_t & page_id,const page_size_t & page_size,space_index_t index_id,mtr_t * mtr)806 static MY_ATTRIBUTE((warn_unused_result)) buf_block_t *btr_free_root_check(
807     const page_id_t &page_id, const page_size_t &page_size,
808     space_index_t index_id, mtr_t *mtr) {
809   ut_ad(!fsp_is_system_temporary(page_id.space()));
810   ut_ad(index_id != BTR_FREED_INDEX_ID);
811 
812   buf_block_t *block = buf_page_get(page_id, page_size, RW_X_LATCH, mtr);
813   buf_block_dbg_add_level(block, SYNC_TREE_NODE);
814 
815   if (fil_page_index_page_check(block->frame) &&
816       index_id == btr_page_get_index_id(block->frame)) {
817     /* This should be a root page.
818     It should not be possible to reassign the same
819     index_id for some other index in the tablespace. */
820     ut_ad(page_is_root(block->frame));
821   } else {
822     block = nullptr;
823   }
824 
825   return (block);
826 }
827 
828 /** Create the root node for a new index tree.
829 @param[in]	type			type of the index
830 @param[in]	space			space where created
831 @param[in]	page_size		page size
832 @param[in]	index_id		index id
833 @param[in]	index			index tree
834 @param[in,out]	mtr			mini-transaction
835 @return page number of the created root
836 @retval FIL_NULL if did not succeed */
btr_create(ulint type,space_id_t space,const page_size_t & page_size,space_index_t index_id,dict_index_t * index,mtr_t * mtr)837 ulint btr_create(ulint type, space_id_t space, const page_size_t &page_size,
838                  space_index_t index_id, dict_index_t *index, mtr_t *mtr) {
839   page_no_t page_no;
840   buf_block_t *block;
841   buf_frame_t *frame;
842   page_t *page;
843   page_zip_des_t *page_zip;
844 
845   ut_ad(index_id != BTR_FREED_INDEX_ID);
846 
847   /* Create the two new segments (one, in the case of an ibuf tree) for
848   the index tree; the segment headers are put on the allocated root page
849   (for an ibuf tree, not in the root, but on a separate ibuf header
850   page) */
851 
852   if (type & DICT_IBUF) {
853     /* Allocate first the ibuf header page */
854     buf_block_t *ibuf_hdr_block =
855         fseg_create(space, 0, IBUF_HEADER + IBUF_TREE_SEG_HEADER, mtr);
856 
857     if (ibuf_hdr_block == nullptr) {
858       return (FIL_NULL);
859     }
860 
861     buf_block_dbg_add_level(ibuf_hdr_block, SYNC_IBUF_TREE_NODE_NEW);
862 
863     ut_ad(ibuf_hdr_block->page.id.page_no() == IBUF_HEADER_PAGE_NO);
864     /* Allocate then the next page to the segment: it will be the
865     tree root page */
866 
867     block = fseg_alloc_free_page(buf_block_get_frame(ibuf_hdr_block) +
868                                      IBUF_HEADER + IBUF_TREE_SEG_HEADER,
869                                  IBUF_TREE_ROOT_PAGE_NO, FSP_UP, mtr);
870     ut_ad(block->page.id.page_no() == IBUF_TREE_ROOT_PAGE_NO);
871   } else {
872     block = fseg_create(space, 0, PAGE_HEADER + PAGE_BTR_SEG_TOP, mtr);
873   }
874 
875   if (block == nullptr) {
876     return (FIL_NULL);
877   }
878 
879   page_no = block->page.id.page_no();
880   frame = buf_block_get_frame(block);
881 
882   if (type & DICT_IBUF) {
883     /* It is an insert buffer tree: initialize the free list */
884     buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE_NEW);
885 
886     ut_ad(page_no == IBUF_TREE_ROOT_PAGE_NO);
887 
888     flst_init(frame + PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr);
889   } else {
890     /* It is a non-ibuf tree: create a file segment for leaf
891     pages */
892     buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
893 
894     if (!fseg_create(space, page_no, PAGE_HEADER + PAGE_BTR_SEG_LEAF, mtr)) {
895       /* Not enough space for new segment, free root
896       segment before return. */
897       btr_free_root(block, mtr);
898       if (!index->table->is_temporary()) {
899         btr_free_root_invalidate(block, mtr);
900       }
901 
902       return (FIL_NULL);
903     }
904 
905     /* The fseg create acquires a second latch on the page,
906     therefore we must declare it: */
907     buf_block_dbg_add_level(block, SYNC_TREE_NODE_NEW);
908   }
909 
910   uint16_t page_create_type;
911   if (dict_index_is_spatial(index)) {
912     page_create_type = FIL_PAGE_RTREE;
913   } else if (dict_index_is_sdi(index)) {
914     page_create_type = FIL_PAGE_SDI;
915   } else {
916     page_create_type = FIL_PAGE_INDEX;
917   }
918 
919   /* Create a new index page on the allocated segment page */
920   page_zip = buf_block_get_page_zip(block);
921 
922   if (page_zip) {
923     page = page_create_zip(block, index, 0, 0, mtr, page_create_type);
924   } else {
925     page = page_create(block, mtr, dict_table_is_comp(index->table),
926                        page_create_type);
927     /* Set the level of the new index page */
928     btr_page_set_level(page, nullptr, 0, mtr);
929   }
930 
931   /* Set the index id of the page */
932   btr_page_set_index_id(page, page_zip, index_id, mtr);
933 
934   /* Set the next node and previous node fields */
935   btr_page_set_next(page, page_zip, FIL_NULL, mtr);
936   btr_page_set_prev(page, page_zip, FIL_NULL, mtr);
937 
938   /* We reset the free bits for the page to allow creation of several
939   trees in the same mtr, otherwise the latch on a bitmap page would
940   prevent it because of the latching order.
941 
942   Note: Insert Buffering is disabled for temporary tables given that
943   most temporary tables are smaller in size and short-lived. */
944   if (!(type & DICT_CLUSTERED) && !index->table->is_temporary()) {
945     ibuf_reset_free_bits(block);
946   }
947 
948   /* In the following assertion we test that two records of maximum
949   allowed size fit on the root page: this fact is needed to ensure
950   correctness of split algorithms */
951 
952   ut_ad(page_get_max_insert_size(page, 2) > 2 * BTR_PAGE_MAX_REC_SIZE);
953 
954   buf_stat_per_index->inc(index_id_t(space, index_id));
955 
956   return (page_no);
957 }
958 
959 /** Free a B-tree except the root page. The root page MUST be freed after
960 this by calling btr_free_root.
961 @param[in,out]	block		root page
962 @param[in]	log_mode	mtr logging mode */
btr_free_but_not_root(buf_block_t * block,mtr_log_t log_mode)963 static void btr_free_but_not_root(buf_block_t *block, mtr_log_t log_mode) {
964   ibool finished;
965   mtr_t mtr;
966 
967   ut_ad(page_is_root(block->frame));
968 leaf_loop:
969   mtr_start(&mtr);
970   mtr_set_log_mode(&mtr, log_mode);
971 
972   page_t *root = block->frame;
973 
974 #ifdef UNIV_BTR_DEBUG
975   ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + root,
976                               block->page.id.space()));
977   ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
978                               block->page.id.space()));
979 #endif /* UNIV_BTR_DEBUG */
980 
981   /* NOTE: page hash indexes are dropped when a page is freed inside
982   fsp0fsp. */
983 
984   finished = fseg_free_step(root + PAGE_HEADER + PAGE_BTR_SEG_LEAF, true, &mtr);
985   mtr_commit(&mtr);
986 
987   if (!finished) {
988     goto leaf_loop;
989   }
990 top_loop:
991   mtr_start(&mtr);
992   mtr_set_log_mode(&mtr, log_mode);
993 
994   root = block->frame;
995 
996 #ifdef UNIV_BTR_DEBUG
997   ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
998                               block->page.id.space()));
999 #endif /* UNIV_BTR_DEBUG */
1000 
1001   finished = fseg_free_step_not_header(root + PAGE_HEADER + PAGE_BTR_SEG_TOP,
1002                                        true, &mtr);
1003   mtr_commit(&mtr);
1004 
1005   if (!finished) {
1006     goto top_loop;
1007   }
1008 }
1009 
1010 /** Free a persistent index tree if it exists.
1011 @param[in]	page_id		root page id
1012 @param[in]	page_size	page size
1013 @param[in]	index_id	PAGE_INDEX_ID contents
1014 @param[in,out]	mtr		mini-transaction */
btr_free_if_exists(const page_id_t & page_id,const page_size_t & page_size,space_index_t index_id,mtr_t * mtr)1015 void btr_free_if_exists(const page_id_t &page_id, const page_size_t &page_size,
1016                         space_index_t index_id, mtr_t *mtr) {
1017   buf_block_t *root = btr_free_root_check(page_id, page_size, index_id, mtr);
1018 
1019   if (root == nullptr) {
1020     return;
1021   }
1022 
1023   btr_free_but_not_root(root, mtr->get_log_mode());
1024   btr_free_root(root, mtr);
1025   btr_free_root_invalidate(root, mtr);
1026 }
1027 
1028 /** Free an index tree in a temporary tablespace.
1029 @param[in]	page_id		root page id
1030 @param[in]	page_size	page size */
btr_free(const page_id_t & page_id,const page_size_t & page_size)1031 void btr_free(const page_id_t &page_id, const page_size_t &page_size) {
1032   mtr_t mtr;
1033   mtr.start();
1034   mtr.set_log_mode(MTR_LOG_NO_REDO);
1035 
1036   buf_block_t *block = buf_page_get(page_id, page_size, RW_X_LATCH, &mtr);
1037 
1038   ut_ad(page_is_root(block->frame));
1039 
1040   btr_free_but_not_root(block, MTR_LOG_NO_REDO);
1041   btr_free_root(block, &mtr);
1042   mtr.commit();
1043 }
1044 
1045 /** Truncate an index tree. We just free all except the root.
1046 Currently, this function is only specific for clustered indexes and the only
1047 caller is DDTableBuffer which manages a table with only a clustered index.
1048 It is up to the caller to ensure atomicity and to ensure correct recovery by
1049 calling btr_truncate_recover().
1050 @param[in]	index		clustered index */
btr_truncate(const dict_index_t * index)1051 void btr_truncate(const dict_index_t *index) {
1052   ut_ad(index->is_clustered());
1053   ut_ad(index->next() == nullptr);
1054 
1055   page_no_t root_page_no = index->page;
1056   space_id_t space_id = index->space;
1057   fil_space_t *space = fil_space_acquire(space_id);
1058 
1059   if (space == nullptr) {
1060     return;
1061   }
1062 
1063   page_size_t page_size(space->flags);
1064   const page_id_t page_id(space_id, root_page_no);
1065   mtr_t mtr;
1066   buf_block_t *block;
1067 
1068   mtr.start();
1069 
1070   mtr_x_lock(&space->latch, &mtr);
1071 
1072   block = buf_page_get(page_id, page_size, RW_X_LATCH, &mtr);
1073 
1074   page_t *page = buf_block_get_frame(block);
1075   ut_ad(page_is_root(page));
1076 
1077   /* Mark that we are going to truncate the index tree
1078   We use PAGE_MAX_TRX_ID as it should be always 0 for clustered
1079   index. */
1080   mlog_write_ull(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), IB_ID_MAX, &mtr);
1081 
1082   mtr.commit();
1083 
1084   mtr.start();
1085 
1086   block = buf_page_get(page_id, page_size, RW_X_LATCH, &mtr);
1087 
1088   /* Free all except the root, we don't want to change it. */
1089   btr_free_but_not_root(block, MTR_LOG_ALL);
1090 
1091   /* Reset the mark saying that we have finished the truncate.
1092   The PAGE_MAX_TRX_ID would be reset here. */
1093   page_create(block, &mtr, dict_table_is_comp(index->table), false);
1094   ut_ad(buf_block_get_frame(block) + (PAGE_HEADER + PAGE_MAX_TRX_ID) ==
1095         nullptr);
1096 
1097   mtr.commit();
1098 
1099   rw_lock_x_unlock(&space->latch);
1100 
1101   fil_space_release(space);
1102 }
1103 
1104 /** Recovery function for btr_truncate. We will check if there is a
1105 crash during btr_truncate, if so, do recover it, if not, do nothing.
1106 @param[in]	index		clustered index */
btr_truncate_recover(const dict_index_t * index)1107 void btr_truncate_recover(const dict_index_t *index) {
1108   ut_ad(index->is_clustered());
1109   ut_ad(index->next() == nullptr);
1110 
1111   page_no_t root_page_no = index->page;
1112   space_id_t space_id = index->space;
1113   fil_space_t *space = fil_space_acquire(space_id);
1114 
1115   if (space == nullptr) {
1116     return;
1117   }
1118 
1119   page_size_t page_size(space->flags);
1120   const page_id_t page_id(space_id, root_page_no);
1121   mtr_t mtr;
1122   buf_block_t *block;
1123 
1124   mtr.start();
1125 
1126   block = buf_page_get(page_id, page_size, RW_X_LATCH, &mtr);
1127 
1128   page_t *page = buf_block_get_frame(block);
1129 
1130   trx_id_t trx_id = page_get_max_trx_id(page);
1131   ut_ad(trx_id == 0 || trx_id == IB_ID_MAX);
1132 
1133   mtr.commit();
1134 
1135   fil_space_release(space);
1136 
1137   if (trx_id == IB_ID_MAX) {
1138     /* -1 means there is a half-done btr_truncate,
1139     we can simple redo it again. */
1140     btr_truncate(index);
1141   }
1142 }
1143 #endif /* !UNIV_HOTBACKUP */
1144 
1145 /** Reorganizes an index page.
1146 
1147  IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1148  if this is a compressed leaf page in a secondary index. This has to
1149  be done either within the same mini-transaction, or by invoking
1150  ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1151  IBUF_BITMAP_FREE is unaffected by reorganization.
1152 
1153  @retval true if the operation was successful
1154  @retval false if it is a compressed page, and recompression failed */
btr_page_reorganize_low(bool recovery,ulint z_level,page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1155 bool btr_page_reorganize_low(
1156     bool recovery,       /*!< in: true if called in recovery:
1157                         locks should not be updated, i.e.,
1158                         there cannot exist locks on the
1159                         page, and a hash index should not be
1160                         dropped: it cannot exist */
1161     ulint z_level,       /*!< in: compression level to be used
1162                          if dealing with compressed page */
1163     page_cur_t *cursor,  /*!< in/out: page cursor */
1164     dict_index_t *index, /*!< in: the index tree of the page */
1165     mtr_t *mtr)          /*!< in/out: mini-transaction */
1166 {
1167   buf_block_t *block = page_cur_get_block(cursor);
1168 #ifndef UNIV_HOTBACKUP
1169   buf_pool_t *buf_pool = buf_pool_from_bpage(&block->page);
1170 #endif /* !UNIV_HOTBACKUP */
1171   page_t *page = buf_block_get_frame(block);
1172   page_zip_des_t *page_zip = buf_block_get_page_zip(block);
1173   buf_block_t *temp_block;
1174   page_t *temp_page;
1175   ulint data_size1;
1176   ulint data_size2;
1177   ulint max_ins_size1;
1178   ulint max_ins_size2;
1179   bool success = false;
1180   ulint pos;
1181   bool log_compressed;
1182 
1183 #ifndef UNIV_HOTBACKUP
1184   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1185 #endif /* !UNIV_HOTBACKUP */
1186   btr_assert_not_corrupted(block, index);
1187 #ifdef UNIV_ZIP_DEBUG
1188   ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1189 #endif /* UNIV_ZIP_DEBUG */
1190   data_size1 = page_get_data_size(page);
1191   max_ins_size1 = page_get_max_insert_size_after_reorganize(page, 1);
1192 
1193   /* Turn logging off */
1194   mtr_log_t log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
1195 
1196 #ifndef UNIV_HOTBACKUP
1197   temp_block = buf_block_alloc(buf_pool);
1198 #else  /* !UNIV_HOTBACKUP */
1199   temp_block = back_block2;
1200 #endif /* !UNIV_HOTBACKUP */
1201   temp_page = temp_block->frame;
1202 
1203 #ifndef UNIV_HOTBACKUP
1204   MONITOR_INC(MONITOR_INDEX_REORG_ATTEMPTS);
1205 #endif /* !UNIV_HOTBACKUP */
1206 
1207   /* Copy the old page to temporary space */
1208   buf_frame_copy(temp_page, page);
1209 
1210 #ifndef UNIV_HOTBACKUP
1211   if (!recovery) {
1212     btr_search_drop_page_hash_index(block);
1213   }
1214 #endif /* !UNIV_HOTBACKUP */
1215 
1216   /* Save the cursor position. */
1217   pos = page_rec_get_n_recs_before(page_cur_get_rec(cursor));
1218 
1219   /* Recreate the page: note that global data on page (possible
1220   segment headers, next page-field, etc.) is preserved intact */
1221 
1222   page_create(block, mtr, dict_table_is_comp(index->table),
1223               fil_page_get_type(page));
1224 
1225   /* Copy the records from the temporary space to the recreated page;
1226   do not copy the lock bits yet */
1227 
1228   page_copy_rec_list_end_no_locks(block, temp_block,
1229                                   page_get_infimum_rec(temp_page), index, mtr);
1230 
1231   /* Multiple transactions cannot simultaneously operate on the
1232   same temp-table in parallel.
1233   max_trx_id is ignored for temp tables because it not required
1234   for MVCC. */
1235   if (dict_index_is_sec_or_ibuf(index) && page_is_leaf(page) &&
1236       !index->table->is_temporary()) {
1237     /* Copy max trx id to recreated page */
1238     trx_id_t max_trx_id = page_get_max_trx_id(temp_page);
1239     page_set_max_trx_id(block, nullptr, max_trx_id, mtr);
1240     /* In crash recovery, dict_index_is_sec_or_ibuf() always
1241     holds, even for clustered indexes.  max_trx_id is
1242     unused in clustered index pages. */
1243     ut_ad(max_trx_id != 0 || recovery);
1244   }
1245 
1246   /* If innodb_log_compressed_pages is ON, page reorganize should log the
1247   compressed page image.*/
1248   log_compressed = page_zip && page_zip_log_pages;
1249 
1250   if (log_compressed) {
1251     mtr_set_log_mode(mtr, log_mode);
1252   }
1253 
1254   if (page_zip && !page_zip_compress(page_zip, page, index, z_level, mtr)) {
1255     /* Restore the old page and exit. */
1256 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1257     /* Check that the bytes that we skip are identical. */
1258     ut_a(!memcmp(page, temp_page, PAGE_HEADER));
1259     ut_a(!memcmp(PAGE_HEADER + PAGE_N_RECS + page,
1260                  PAGE_HEADER + PAGE_N_RECS + temp_page,
1261                  PAGE_DATA - (PAGE_HEADER + PAGE_N_RECS)));
1262     ut_a(!memcmp(UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + page,
1263                  UNIV_PAGE_SIZE - FIL_PAGE_DATA_END + temp_page,
1264                  FIL_PAGE_DATA_END));
1265 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1266 
1267     memcpy(PAGE_HEADER + page, PAGE_HEADER + temp_page,
1268            PAGE_N_RECS - PAGE_N_DIR_SLOTS);
1269     memcpy(PAGE_DATA + page, PAGE_DATA + temp_page,
1270            UNIV_PAGE_SIZE - PAGE_DATA - FIL_PAGE_DATA_END);
1271 
1272 #if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
1273     ut_a(!memcmp(page, temp_page, UNIV_PAGE_SIZE));
1274 #endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
1275 
1276     goto func_exit;
1277   }
1278 
1279 #ifndef UNIV_HOTBACKUP
1280   /* No locks are acquried for intrinsic tables. */
1281   if (!recovery && !dict_table_is_locking_disabled(index->table)) {
1282     /* Update the record lock bitmaps */
1283     lock_move_reorganize_page(block, temp_block);
1284   }
1285 #endif /* !UNIV_HOTBACKUP */
1286 
1287   data_size2 = page_get_data_size(page);
1288   max_ins_size2 = page_get_max_insert_size_after_reorganize(page, 1);
1289 
1290   if (data_size1 != data_size2 || max_ins_size1 != max_ins_size2) {
1291     ib::error(ER_IB_MSG_30)
1292         << "Page old data size " << data_size1 << " new data size "
1293         << data_size2 << ", page old max ins size " << max_ins_size1
1294         << " new max ins size " << max_ins_size2;
1295 
1296     ib::error(ER_IB_MSG_31) << BUG_REPORT_MSG;
1297     ut_ad(0);
1298   } else {
1299     success = true;
1300   }
1301 
1302   /* Restore the cursor position. */
1303   if (pos > 0) {
1304     cursor->rec = page_rec_get_nth(page, pos);
1305   } else {
1306     ut_ad(cursor->rec == page_get_infimum_rec(page));
1307   }
1308 
1309 func_exit:
1310 #ifdef UNIV_ZIP_DEBUG
1311   ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1312 #endif /* UNIV_ZIP_DEBUG */
1313 #ifndef UNIV_HOTBACKUP
1314   buf_block_free(temp_block);
1315 #endif /* !UNIV_HOTBACKUP */
1316 
1317   /* Restore logging mode */
1318   mtr_set_log_mode(mtr, log_mode);
1319 
1320 #ifndef UNIV_HOTBACKUP
1321   if (success) {
1322     mlog_id_t type;
1323     byte *log_ptr = nullptr;
1324 
1325     /* Write the log record */
1326     if (page_zip) {
1327       ut_ad(page_is_comp(page));
1328       type = MLOG_ZIP_PAGE_REORGANIZE;
1329     } else if (page_is_comp(page)) {
1330       type = MLOG_COMP_PAGE_REORGANIZE;
1331     } else {
1332       type = MLOG_PAGE_REORGANIZE;
1333     }
1334 
1335     bool opened = false;
1336 
1337     if (!log_compressed) {
1338       opened = mlog_open_and_write_index(mtr, page, index, type,
1339                                          page_zip ? 1 : 0, log_ptr);
1340     }
1341 
1342     /* For compressed pages write the compression level. */
1343     if (opened && page_zip) {
1344       mach_write_to_1(log_ptr, z_level);
1345       mlog_close(mtr, log_ptr + 1);
1346     }
1347 
1348     MONITOR_INC(MONITOR_INDEX_REORG_SUCCESSFUL);
1349   }
1350 #endif /* !UNIV_HOTBACKUP */
1351 
1352   return (success);
1353 }
1354 
1355 /** Reorganizes an index page.
1356 
1357  IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1358  if this is a compressed leaf page in a secondary index. This has to
1359  be done either within the same mini-transaction, or by invoking
1360  ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1361  IBUF_BITMAP_FREE is unaffected by reorganization.
1362 
1363  @retval true if the operation was successful
1364  @retval false if it is a compressed page, and recompression failed */
btr_page_reorganize_block(bool recovery,ulint z_level,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1365 static bool btr_page_reorganize_block(
1366     bool recovery,       /*!< in: true if called in recovery:
1367                         locks should not be updated, i.e.,
1368                         there cannot exist locks on the
1369                         page, and a hash index should not be
1370                         dropped: it cannot exist */
1371     ulint z_level,       /*!< in: compression level to be used
1372                          if dealing with compressed page */
1373     buf_block_t *block,  /*!< in/out: B-tree page */
1374     dict_index_t *index, /*!< in: the index tree of the page */
1375     mtr_t *mtr)          /*!< in/out: mini-transaction */
1376 {
1377   page_cur_t cur;
1378   page_cur_set_before_first(block, &cur);
1379 
1380   return (btr_page_reorganize_low(recovery, z_level, &cur, index, mtr));
1381 }
1382 
1383 /** Reorganizes an index page.
1384 
1385  IMPORTANT: On success, the caller will have to update IBUF_BITMAP_FREE
1386  if this is a compressed leaf page in a secondary index. This has to
1387  be done either within the same mini-transaction, or by invoking
1388  ibuf_reset_free_bits() before mtr_commit(). On uncompressed pages,
1389  IBUF_BITMAP_FREE is unaffected by reorganization.
1390 
1391  @retval true if the operation was successful
1392  @retval false if it is a compressed page, and recompression failed */
btr_page_reorganize(page_cur_t * cursor,dict_index_t * index,mtr_t * mtr)1393 bool btr_page_reorganize(
1394     page_cur_t *cursor,  /*!< in/out: page cursor */
1395     dict_index_t *index, /*!< in: the index tree of the page */
1396     mtr_t *mtr)          /*!< in/out: mini-transaction */
1397 {
1398   return (btr_page_reorganize_low(false, page_zip_level, cursor, index, mtr));
1399 }
1400 
1401 /** Parses a redo log record of reorganizing a page.
1402  @return end of log record or NULL */
btr_parse_page_reorganize(byte * ptr,byte * end_ptr,dict_index_t * index,bool compressed,buf_block_t * block,mtr_t * mtr)1403 byte *btr_parse_page_reorganize(
1404     byte *ptr,           /*!< in: buffer */
1405     byte *end_ptr,       /*!< in: buffer end */
1406     dict_index_t *index, /*!< in: record descriptor */
1407     bool compressed,     /*!< in: true if compressed page */
1408     buf_block_t *block,  /*!< in: page to be reorganized, or NULL */
1409     mtr_t *mtr)          /*!< in: mtr or NULL */
1410 {
1411   ulint level;
1412 
1413   ut_ad(ptr != nullptr);
1414   ut_ad(end_ptr != nullptr);
1415   ut_ad(index != nullptr);
1416 
1417   /* If dealing with a compressed page the record has the
1418   compression level used during original compression written in
1419   one byte. Otherwise record is empty. */
1420   if (compressed) {
1421     if (ptr == end_ptr) {
1422       return (nullptr);
1423     }
1424 
1425     level = mach_read_from_1(ptr);
1426 
1427     ut_a(level <= 9);
1428     ++ptr;
1429   } else {
1430     level = page_zip_level;
1431   }
1432 
1433   if (block != nullptr) {
1434     btr_page_reorganize_block(true, level, block, index, mtr);
1435   }
1436 
1437   return (ptr);
1438 }
1439 
1440 #ifndef UNIV_HOTBACKUP
1441 /** Empties an index page.  @see btr_page_create(). */
btr_page_empty(buf_block_t * block,page_zip_des_t * page_zip,dict_index_t * index,ulint level,mtr_t * mtr)1442 static void btr_page_empty(
1443     buf_block_t *block,       /*!< in: page to be emptied */
1444     page_zip_des_t *page_zip, /*!< out: compressed page, or NULL */
1445     dict_index_t *index,      /*!< in: index of the page */
1446     ulint level,              /*!< in: the B-tree level of the page */
1447     mtr_t *mtr)               /*!< in: mtr */
1448 {
1449   page_t *page = buf_block_get_frame(block);
1450 
1451   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1452   ut_ad(page_zip == buf_block_get_page_zip(block));
1453 #ifdef UNIV_ZIP_DEBUG
1454   ut_a(!page_zip || page_zip_validate(page_zip, page, index));
1455 #endif /* UNIV_ZIP_DEBUG */
1456 
1457   btr_search_drop_page_hash_index(block);
1458 
1459   uint16_t page_type;
1460   if (dict_index_is_spatial(index)) {
1461     page_type = FIL_PAGE_RTREE;
1462   } else if (dict_index_is_sdi(index)) {
1463     page_type = FIL_PAGE_SDI;
1464   } else {
1465     page_type = FIL_PAGE_INDEX;
1466   }
1467 
1468   /* Recreate the page: note that global data on page (possible
1469   segment headers, next page-field, etc.) is preserved intact */
1470 
1471   if (page_zip) {
1472     page_create_zip(block, index, level, 0, mtr, page_type);
1473   } else {
1474     page_create(block, mtr, dict_table_is_comp(index->table), page_type);
1475     btr_page_set_level(page, nullptr, level, mtr);
1476   }
1477 }
1478 
1479 /** Makes tree one level higher by splitting the root, and inserts
1480  the tuple. It is assumed that mtr contains an x-latch on the tree.
1481  NOTE that the operation of this function must always succeed,
1482  we cannot reverse it: therefore enough free disk space must be
1483  guaranteed to be available before this function is called.
1484  @return inserted record */
btr_root_raise_and_insert(uint32_t flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,mtr_t * mtr)1485 rec_t *btr_root_raise_and_insert(
1486     uint32_t flags,        /*!< in: undo logging and locking flags */
1487     btr_cur_t *cursor,     /*!< in: cursor at which to insert: must be
1488                            on the root page; when the function returns,
1489                            the cursor is positioned on the predecessor
1490                            of the inserted record */
1491     ulint **offsets,       /*!< out: offsets on inserted record */
1492     mem_heap_t **heap,     /*!< in/out: pointer to memory heap, or NULL */
1493     const dtuple_t *tuple, /*!< in: tuple to insert */
1494     mtr_t *mtr)            /*!< in: mtr */
1495 {
1496   dict_index_t *index;
1497   page_t *root;
1498   page_t *new_page;
1499   page_no_t new_page_no;
1500   rec_t *rec;
1501   dtuple_t *node_ptr;
1502   ulint level;
1503   rec_t *node_ptr_rec;
1504   page_cur_t *page_cursor;
1505   page_zip_des_t *root_page_zip;
1506   page_zip_des_t *new_page_zip;
1507   buf_block_t *root_block;
1508   buf_block_t *new_block;
1509 
1510   root = btr_cur_get_page(cursor);
1511   root_block = btr_cur_get_block(cursor);
1512   root_page_zip = buf_block_get_page_zip(root_block);
1513   ut_ad(!page_is_empty(root));
1514   index = btr_cur_get_index(cursor);
1515 #ifdef UNIV_ZIP_DEBUG
1516   ut_a(!root_page_zip || page_zip_validate(root_page_zip, root, index));
1517 #endif /* UNIV_ZIP_DEBUG */
1518 #ifdef UNIV_BTR_DEBUG
1519   if (!dict_index_is_ibuf(index)) {
1520     space_id_t space_id = dict_index_get_space(index);
1521 
1522     ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + root,
1523                                 space_id));
1524     ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
1525                                 space_id));
1526   }
1527 
1528   ut_a(dict_index_get_page(index) == page_get_page_no(root));
1529 #endif /* UNIV_BTR_DEBUG */
1530   ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
1531                                   MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
1532         index->table->is_intrinsic());
1533   ut_ad(mtr_is_block_fix(mtr, root_block, MTR_MEMO_PAGE_X_FIX, index->table));
1534 
1535   /* Allocate a new page to the tree. Root splitting is done by first
1536   moving the root records to the new page, emptying the root, putting
1537   a node pointer to the new page, and then splitting the new page. */
1538 
1539   level = btr_page_get_level(root, mtr);
1540 
1541   new_block = btr_page_alloc(index, 0, FSP_NO_DIR, level, mtr, mtr);
1542 
1543   new_page = buf_block_get_frame(new_block);
1544   new_page_zip = buf_block_get_page_zip(new_block);
1545   ut_a(!new_page_zip == !root_page_zip);
1546   ut_a(!new_page_zip ||
1547        page_zip_get_size(new_page_zip) == page_zip_get_size(root_page_zip));
1548 
1549   btr_page_create(new_block, new_page_zip, index, level, mtr);
1550 
1551   /* Set the next node and previous node fields of new page */
1552   btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
1553   btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
1554 
1555   /* Copy the records from root to the new page one by one. */
1556 
1557   if (false
1558 #ifdef UNIV_ZIP_COPY
1559       || new_page_zip
1560 #endif /* UNIV_ZIP_COPY */
1561       || !page_copy_rec_list_end(new_block, root_block,
1562                                  page_get_infimum_rec(root), index, mtr)) {
1563     ut_a(new_page_zip);
1564 
1565     /* Copy the page byte for byte. */
1566     page_zip_copy_recs(new_page_zip, new_page, root_page_zip, root, index, mtr);
1567 
1568     /* Update the lock table and possible hash index. */
1569 
1570     if (!dict_table_is_locking_disabled(index->table)) {
1571       lock_move_rec_list_end(new_block, root_block, page_get_infimum_rec(root));
1572     }
1573 
1574     /* Move any existing predicate locks */
1575     if (dict_index_is_spatial(index)) {
1576       lock_prdt_rec_move(new_block, root_block);
1577     }
1578 
1579     btr_search_move_or_delete_hash_entries(new_block, root_block, index);
1580   }
1581 
1582   /* If this is a pessimistic insert which is actually done to
1583   perform a pessimistic update then we have stored the lock
1584   information of the record to be inserted on the infimum of the
1585   root page: we cannot discard the lock structs on the root page */
1586 
1587   if (!dict_table_is_locking_disabled(index->table)) {
1588     lock_update_root_raise(new_block, root_block);
1589   }
1590 
1591   /* Create a memory heap where the node pointer is stored */
1592   if (!*heap) {
1593     *heap = mem_heap_create(1000);
1594   }
1595 
1596   rec = page_rec_get_next(page_get_infimum_rec(new_page));
1597   new_page_no = new_block->page.id.page_no();
1598 
1599   /* Build the node pointer (= node key and page address) for the
1600   child */
1601   if (dict_index_is_spatial(index)) {
1602     rtr_mbr_t new_mbr;
1603 
1604     rtr_page_cal_mbr(index, new_block, &new_mbr, *heap);
1605     node_ptr = rtr_index_build_node_ptr(index, &new_mbr, rec, new_page_no,
1606                                         *heap, level);
1607   } else {
1608     node_ptr = dict_index_build_node_ptr(index, rec, new_page_no, *heap, level);
1609   }
1610   /* The node pointer must be marked as the predefined minimum record,
1611   as there is no lower alphabetical limit to records in the leftmost
1612   node of a level: */
1613   dtuple_set_info_bits(node_ptr,
1614                        dtuple_get_info_bits(node_ptr) | REC_INFO_MIN_REC_FLAG);
1615 
1616   /* Rebuild the root page to get free space */
1617   btr_page_empty(root_block, root_page_zip, index, level + 1, mtr);
1618 
1619   /* Set the next node and previous node fields, although
1620   they should already have been set.  The previous node field
1621   must be FIL_NULL if root_page_zip != NULL, because the
1622   REC_INFO_MIN_REC_FLAG (of the first user record) will be
1623   set if and only if btr_page_get_prev() == FIL_NULL. */
1624   btr_page_set_next(root, root_page_zip, FIL_NULL, mtr);
1625   btr_page_set_prev(root, root_page_zip, FIL_NULL, mtr);
1626 
1627   page_cursor = btr_cur_get_page_cur(cursor);
1628 
1629   /* Insert node pointer to the root */
1630 
1631   page_cur_set_before_first(root_block, page_cursor);
1632 
1633   node_ptr_rec =
1634       page_cur_tuple_insert(page_cursor, node_ptr, index, offsets, heap, mtr);
1635 
1636   /* The root page should only contain the node pointer
1637   to new_page at this point.  Thus, the data should fit. */
1638   ut_a(node_ptr_rec);
1639 
1640   /* We play safe and reset the free bits for the new page */
1641 
1642   if (!index->is_clustered() && !index->table->is_temporary()) {
1643     ibuf_reset_free_bits(new_block);
1644   }
1645 
1646   /* Reposition the cursor to the child node */
1647   page_cur_search(new_block, index, tuple, page_cursor);
1648 
1649   /* Split the child and insert tuple */
1650   if (dict_index_is_spatial(index)) {
1651     /* Split rtree page and insert tuple */
1652     return (
1653         rtr_page_split_and_insert(flags, cursor, offsets, heap, tuple, mtr));
1654   } else {
1655     return (
1656         btr_page_split_and_insert(flags, cursor, offsets, heap, tuple, mtr));
1657   }
1658 }
1659 
1660 /** Decides if the page should be split at the convergence point of inserts
1661  converging to the left.
1662  @return true if split recommended */
btr_page_get_split_rec_to_left(btr_cur_t * cursor,rec_t ** split_rec)1663 ibool btr_page_get_split_rec_to_left(
1664     btr_cur_t *cursor, /*!< in: cursor at which to insert */
1665     rec_t **split_rec) /*!< out: if split recommended,
1666                     the first record on upper half page,
1667                     or NULL if tuple to be inserted should
1668                     be first */
1669 {
1670   page_t *page;
1671   rec_t *insert_point;
1672   rec_t *infimum;
1673 
1674   page = btr_cur_get_page(cursor);
1675   insert_point = btr_cur_get_rec(cursor);
1676 
1677   if (page_header_get_ptr(page, PAGE_LAST_INSERT) ==
1678       page_rec_get_next(insert_point)) {
1679     infimum = page_get_infimum_rec(page);
1680 
1681     /* If the convergence is in the middle of a page, include also
1682     the record immediately before the new insert to the upper
1683     page. Otherwise, we could repeatedly move from page to page
1684     lots of records smaller than the convergence point. */
1685 
1686     if (infimum != insert_point && page_rec_get_next(infimum) != insert_point) {
1687       *split_rec = insert_point;
1688     } else {
1689       *split_rec = page_rec_get_next(insert_point);
1690     }
1691 
1692     return (TRUE);
1693   }
1694 
1695   return (FALSE);
1696 }
1697 
1698 /** Decides if the page should be split at the convergence point of inserts
1699  converging to the right.
1700  @return true if split recommended */
btr_page_get_split_rec_to_right(btr_cur_t * cursor,rec_t ** split_rec)1701 ibool btr_page_get_split_rec_to_right(
1702     btr_cur_t *cursor, /*!< in: cursor at which to insert */
1703     rec_t **split_rec) /*!< out: if split recommended,
1704                     the first record on upper half page,
1705                     or NULL if tuple to be inserted should
1706                     be first */
1707 {
1708   page_t *page;
1709   rec_t *insert_point;
1710 
1711   page = btr_cur_get_page(cursor);
1712   insert_point = btr_cur_get_rec(cursor);
1713 
1714   /* We use eager heuristics: if the new insert would be right after
1715   the previous insert on the same page, we assume that there is a
1716   pattern of sequential inserts here. */
1717 
1718   if (page_header_get_ptr(page, PAGE_LAST_INSERT) == insert_point) {
1719     rec_t *next_rec;
1720 
1721     next_rec = page_rec_get_next(insert_point);
1722 
1723     if (page_rec_is_supremum(next_rec)) {
1724     split_at_new:
1725       /* Split at the new record to insert */
1726       *split_rec = nullptr;
1727     } else {
1728       rec_t *next_next_rec = page_rec_get_next(next_rec);
1729       if (page_rec_is_supremum(next_next_rec)) {
1730         goto split_at_new;
1731       }
1732 
1733       /* If there are >= 2 user records up from the insert
1734       point, split all but 1 off. We want to keep one because
1735       then sequential inserts can use the adaptive hash
1736       index, as they can do the necessary checks of the right
1737       search position just by looking at the records on this
1738       page. */
1739 
1740       *split_rec = next_next_rec;
1741     }
1742 
1743     return (TRUE);
1744   }
1745 
1746   return (FALSE);
1747 }
1748 
1749 /** Calculates a split record such that the tuple will certainly fit on
1750  its half-page when the split is performed. We assume in this function
1751  only that the cursor page has at least one user record.
1752  @return split record, or NULL if tuple will be the first record on
1753  the lower or upper half-page (determined by btr_page_tuple_smaller()) */
btr_page_get_split_rec(btr_cur_t * cursor,const dtuple_t * tuple)1754 static rec_t *btr_page_get_split_rec(
1755     btr_cur_t *cursor,     /*!< in: cursor at which insert should be made */
1756     const dtuple_t *tuple) /*!< in: tuple to insert */
1757 {
1758   page_t *page;
1759   page_zip_des_t *page_zip;
1760   ulint insert_size;
1761   ulint free_space;
1762   ulint total_data;
1763   ulint total_n_recs;
1764   ulint total_space;
1765   ulint incl_data;
1766   rec_t *ins_rec;
1767   rec_t *rec;
1768   rec_t *next_rec;
1769   ulint n;
1770   mem_heap_t *heap;
1771   ulint *offsets;
1772 
1773   page = btr_cur_get_page(cursor);
1774 
1775   insert_size = rec_get_converted_size(cursor->index, tuple);
1776   free_space = page_get_free_space_of_empty(page_is_comp(page));
1777 
1778   page_zip = btr_cur_get_page_zip(cursor);
1779   if (page_zip) {
1780     /* Estimate the free space of an empty compressed page. */
1781     ulint free_space_zip = page_zip_empty_size(cursor->index->n_fields,
1782                                                page_zip_get_size(page_zip));
1783 
1784     if (free_space > (ulint)free_space_zip) {
1785       free_space = (ulint)free_space_zip;
1786     }
1787   }
1788 
1789   /* free_space is now the free space of a created new page */
1790 
1791   total_data = page_get_data_size(page) + insert_size;
1792   total_n_recs = page_get_n_recs(page) + 1;
1793   ut_ad(total_n_recs >= 2);
1794   total_space = total_data + page_dir_calc_reserved_space(total_n_recs);
1795 
1796   n = 0;
1797   incl_data = 0;
1798   ins_rec = btr_cur_get_rec(cursor);
1799   rec = page_get_infimum_rec(page);
1800 
1801   heap = nullptr;
1802   offsets = nullptr;
1803 
1804   /* We start to include records to the left half, and when the
1805   space reserved by them exceeds half of total_space, then if
1806   the included records fit on the left page, they will be put there
1807   if something was left over also for the right page,
1808   otherwise the last included record will be the first on the right
1809   half page */
1810 
1811   do {
1812     /* Decide the next record to include */
1813     if (rec == ins_rec) {
1814       rec = nullptr; /* NULL denotes that tuple is
1815                   now included */
1816     } else if (rec == nullptr) {
1817       rec = page_rec_get_next(ins_rec);
1818     } else {
1819       rec = page_rec_get_next(rec);
1820     }
1821 
1822     if (rec == nullptr) {
1823       /* Include tuple */
1824       incl_data += insert_size;
1825     } else {
1826       offsets =
1827           rec_get_offsets(rec, cursor->index, offsets, ULINT_UNDEFINED, &heap);
1828       incl_data += rec_offs_size(offsets);
1829     }
1830 
1831     n++;
1832   } while (incl_data + page_dir_calc_reserved_space(n) < total_space / 2);
1833 
1834   if (incl_data + page_dir_calc_reserved_space(n) <= free_space) {
1835     /* The next record will be the first on
1836     the right half page if it is not the
1837     supremum record of page */
1838 
1839     if (rec == ins_rec) {
1840       rec = nullptr;
1841 
1842       goto func_exit;
1843     } else if (rec == nullptr) {
1844       next_rec = page_rec_get_next(ins_rec);
1845     } else {
1846       next_rec = page_rec_get_next(rec);
1847     }
1848     ut_ad(next_rec);
1849     if (!page_rec_is_supremum(next_rec)) {
1850       rec = next_rec;
1851     }
1852   }
1853 
1854 func_exit:
1855   if (heap) {
1856     mem_heap_free(heap);
1857   }
1858   return (rec);
1859 }
1860 
1861 /** Returns TRUE if the insert fits on the appropriate half-page with the
1862  chosen split_rec.
1863  @return true if fits */
btr_page_insert_fits(btr_cur_t * cursor,const rec_t * split_rec,ulint ** offsets,const dtuple_t * tuple,mem_heap_t ** heap)1864 static MY_ATTRIBUTE((warn_unused_result)) bool btr_page_insert_fits(
1865     btr_cur_t *cursor,      /*!< in: cursor at which insert
1866                             should be made */
1867     const rec_t *split_rec, /*!< in: suggestion for first record
1868                           on upper half-page, or NULL if
1869                           tuple to be inserted should be first */
1870     ulint **offsets,        /*!< in: rec_get_offsets(
1871                             split_rec, cursor->index); out: garbage */
1872     const dtuple_t *tuple,  /*!< in: tuple to insert */
1873     mem_heap_t **heap)      /*!< in: temporary memory heap */
1874 {
1875   page_t *page;
1876   ulint insert_size;
1877   ulint free_space;
1878   ulint total_data;
1879   ulint total_n_recs;
1880   const rec_t *rec;
1881   const rec_t *end_rec;
1882 
1883   page = btr_cur_get_page(cursor);
1884 
1885   ut_ad(!split_rec || !page_is_comp(page) == !rec_offs_comp(*offsets));
1886   ut_ad(!split_rec || rec_offs_validate(split_rec, cursor->index, *offsets));
1887 
1888   insert_size = rec_get_converted_size(cursor->index, tuple);
1889   free_space = page_get_free_space_of_empty(page_is_comp(page));
1890 
1891   /* free_space is now the free space of a created new page */
1892 
1893   total_data = page_get_data_size(page) + insert_size;
1894   total_n_recs = page_get_n_recs(page) + 1;
1895 
1896   /* We determine which records (from rec to end_rec, not including
1897   end_rec) will end up on the other half page from tuple when it is
1898   inserted. */
1899 
1900   if (split_rec == nullptr) {
1901     rec = page_rec_get_next(page_get_infimum_rec(page));
1902     end_rec = page_rec_get_next(btr_cur_get_rec(cursor));
1903 
1904   } else if (cmp_dtuple_rec(tuple, split_rec, cursor->index, *offsets) >= 0) {
1905     rec = page_rec_get_next(page_get_infimum_rec(page));
1906     end_rec = split_rec;
1907   } else {
1908     rec = split_rec;
1909     end_rec = page_get_supremum_rec(page);
1910   }
1911 
1912   if (total_data + page_dir_calc_reserved_space(total_n_recs) <= free_space) {
1913     /* Ok, there will be enough available space on the
1914     half page where the tuple is inserted */
1915 
1916     return (true);
1917   }
1918 
1919   while (rec != end_rec) {
1920     /* In this loop we calculate the amount of reserved
1921     space after rec is removed from page. */
1922 
1923     *offsets =
1924         rec_get_offsets(rec, cursor->index, *offsets, ULINT_UNDEFINED, heap);
1925 
1926     total_data -= rec_offs_size(*offsets);
1927     total_n_recs--;
1928 
1929     if (total_data + page_dir_calc_reserved_space(total_n_recs) <= free_space) {
1930       /* Ok, there will be enough available space on the
1931       half page where the tuple is inserted */
1932 
1933       return (true);
1934     }
1935 
1936     rec = page_rec_get_next_const(rec);
1937   }
1938 
1939   return (false);
1940 }
1941 
1942 /** Inserts a data tuple to a tree on a non-leaf level. It is assumed
1943  that mtr holds an x-latch on the tree. */
btr_insert_on_non_leaf_level_func(uint32_t flags,dict_index_t * index,ulint level,dtuple_t * tuple,const char * file,ulint line,mtr_t * mtr)1944 void btr_insert_on_non_leaf_level_func(
1945     uint32_t flags,      /*!< in: undo logging and locking flags */
1946     dict_index_t *index, /*!< in: index */
1947     ulint level,         /*!< in: level, must be > 0 */
1948     dtuple_t *tuple,     /*!< in: the record to be inserted */
1949     const char *file,    /*!< in: file name */
1950     ulint line,          /*!< in: line where called */
1951     mtr_t *mtr)          /*!< in: mtr */
1952 {
1953   big_rec_t *dummy_big_rec;
1954   btr_cur_t cursor;
1955   dberr_t err;
1956   rec_t *rec;
1957   mem_heap_t *heap = nullptr;
1958   ulint offsets_[REC_OFFS_NORMAL_SIZE];
1959   ulint *offsets = offsets_;
1960   rec_offs_init(offsets_);
1961   rtr_info_t rtr_info;
1962 
1963   ut_ad(level > 0);
1964 
1965   if (!dict_index_is_spatial(index)) {
1966     if (index->table->is_intrinsic()) {
1967       btr_cur_search_to_nth_level_with_no_latch(
1968           index, level, tuple, PAGE_CUR_LE, &cursor, __FILE__, __LINE__, mtr);
1969     } else {
1970       btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_LE,
1971                                   BTR_CONT_MODIFY_TREE, &cursor, 0, file, line,
1972                                   mtr);
1973     }
1974   } else {
1975     /* For spatial index, initialize structures to track
1976     its parents etc. */
1977     rtr_init_rtr_info(&rtr_info, false, &cursor, index, false);
1978 
1979     rtr_info_update_btr(&cursor, &rtr_info);
1980 
1981     btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_RTREE_INSERT,
1982                                 BTR_CONT_MODIFY_TREE, &cursor, 0, file, line,
1983                                 mtr);
1984   }
1985 
1986   ut_ad(cursor.flag == BTR_CUR_BINARY);
1987 
1988   err = btr_cur_optimistic_insert(
1989       flags | BTR_NO_LOCKING_FLAG | BTR_KEEP_SYS_FLAG | BTR_NO_UNDO_LOG_FLAG,
1990       &cursor, &offsets, &heap, tuple, &rec, &dummy_big_rec, nullptr, mtr);
1991 
1992   if (err == DB_FAIL) {
1993     err = btr_cur_pessimistic_insert(
1994         flags | BTR_NO_LOCKING_FLAG | BTR_KEEP_SYS_FLAG | BTR_NO_UNDO_LOG_FLAG,
1995         &cursor, &offsets, &heap, tuple, &rec, &dummy_big_rec, nullptr, mtr);
1996     ut_a(err == DB_SUCCESS);
1997   }
1998 
1999   if (heap != nullptr) {
2000     mem_heap_free(heap);
2001   }
2002 
2003   if (dict_index_is_spatial(index)) {
2004     ut_ad(cursor.rtr_info);
2005 
2006     rtr_clean_rtr_info(&rtr_info, true);
2007   }
2008 }
2009 
2010 /** Attaches the halves of an index page on the appropriate level in an
2011  index tree. */
btr_attach_half_pages(uint32_t flags,dict_index_t * index,buf_block_t * block,const rec_t * split_rec,buf_block_t * new_block,ulint direction,mtr_t * mtr)2012 static void btr_attach_half_pages(
2013     uint32_t flags,         /*!< in: undo logging and
2014                          locking flags */
2015     dict_index_t *index,    /*!< in: the index tree */
2016     buf_block_t *block,     /*!< in/out: page to be split */
2017     const rec_t *split_rec, /*!< in: first record on upper
2018                             half page */
2019     buf_block_t *new_block, /*!< in/out: the new half page */
2020     ulint direction,        /*!< in: FSP_UP or FSP_DOWN */
2021     mtr_t *mtr)             /*!< in: mtr */
2022 {
2023   page_no_t prev_page_no;
2024   page_no_t next_page_no;
2025   ulint level;
2026   page_t *page = buf_block_get_frame(block);
2027   page_t *lower_page;
2028   page_t *upper_page;
2029   page_no_t lower_page_no;
2030   page_no_t upper_page_no;
2031   page_zip_des_t *lower_page_zip;
2032   page_zip_des_t *upper_page_zip;
2033   dtuple_t *node_ptr_upper;
2034   mem_heap_t *heap;
2035   buf_block_t *prev_block = nullptr;
2036   buf_block_t *next_block = nullptr;
2037 
2038   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2039   ut_ad(mtr_is_block_fix(mtr, new_block, MTR_MEMO_PAGE_X_FIX, index->table));
2040 
2041   /* Create a memory heap where the data tuple is stored */
2042   heap = mem_heap_create(1024);
2043 
2044   /* Based on split direction, decide upper and lower pages */
2045   if (direction == FSP_DOWN) {
2046     btr_cur_t cursor;
2047     ulint *offsets;
2048 
2049     lower_page = buf_block_get_frame(new_block);
2050     lower_page_no = new_block->page.id.page_no();
2051     lower_page_zip = buf_block_get_page_zip(new_block);
2052     upper_page = buf_block_get_frame(block);
2053     upper_page_no = block->page.id.page_no();
2054     upper_page_zip = buf_block_get_page_zip(block);
2055 
2056     /* Look up the index for the node pointer to page */
2057     offsets =
2058         btr_page_get_father_block(nullptr, heap, index, block, mtr, &cursor);
2059 
2060     /* Replace the address of the old child node (= page) with the
2061     address of the new lower half */
2062 
2063     btr_node_ptr_set_child_page_no(btr_cur_get_rec(&cursor),
2064                                    btr_cur_get_page_zip(&cursor), offsets,
2065                                    lower_page_no, mtr);
2066     mem_heap_empty(heap);
2067   } else {
2068     lower_page = buf_block_get_frame(block);
2069     lower_page_no = block->page.id.page_no();
2070     lower_page_zip = buf_block_get_page_zip(block);
2071     upper_page = buf_block_get_frame(new_block);
2072     upper_page_no = new_block->page.id.page_no();
2073     upper_page_zip = buf_block_get_page_zip(new_block);
2074   }
2075 
2076   /* Get the previous and next pages of page */
2077   prev_page_no = btr_page_get_prev(page, mtr);
2078   next_page_no = btr_page_get_next(page, mtr);
2079 
2080   const space_id_t space = block->page.id.space();
2081 
2082   /* for consistency, both blocks should be locked, before change */
2083   if (prev_page_no != FIL_NULL && direction == FSP_DOWN) {
2084     prev_block = btr_block_get(page_id_t(space, prev_page_no), block->page.size,
2085                                RW_X_LATCH, index, mtr);
2086   }
2087   if (next_page_no != FIL_NULL && direction != FSP_DOWN) {
2088     next_block = btr_block_get(page_id_t(space, next_page_no), block->page.size,
2089                                RW_X_LATCH, index, mtr);
2090   }
2091 
2092   /* Get the level of the split pages */
2093   level = btr_page_get_level(buf_block_get_frame(block), mtr);
2094   ut_ad(level == btr_page_get_level(buf_block_get_frame(new_block), mtr));
2095 
2096   /* Build the node pointer (= node key and page address) for the upper
2097   half */
2098 
2099   node_ptr_upper =
2100       dict_index_build_node_ptr(index, split_rec, upper_page_no, heap, level);
2101 
2102   /* Insert it next to the pointer to the lower half. Note that this
2103   may generate recursion leading to a split on the higher level. */
2104 
2105   btr_insert_on_non_leaf_level(flags, index, level + 1, node_ptr_upper, mtr);
2106 
2107   /* Free the memory heap */
2108   mem_heap_free(heap);
2109 
2110   /* Update page links of the level */
2111 
2112   if (prev_block) {
2113 #ifdef UNIV_BTR_DEBUG
2114     ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
2115     ut_a(btr_page_get_next(prev_block->frame, mtr) == block->page.id.page_no());
2116 #endif /* UNIV_BTR_DEBUG */
2117 
2118     btr_page_set_next(buf_block_get_frame(prev_block),
2119                       buf_block_get_page_zip(prev_block), lower_page_no, mtr);
2120   }
2121 
2122   if (next_block) {
2123 #ifdef UNIV_BTR_DEBUG
2124     ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
2125     ut_a(btr_page_get_prev(next_block->frame, mtr) == page_get_page_no(page));
2126 #endif /* UNIV_BTR_DEBUG */
2127 
2128     btr_page_set_prev(buf_block_get_frame(next_block),
2129                       buf_block_get_page_zip(next_block), upper_page_no, mtr);
2130   }
2131 
2132   if (direction == FSP_DOWN) {
2133     /* lower_page is new */
2134     btr_page_set_prev(lower_page, lower_page_zip, prev_page_no, mtr);
2135   } else {
2136     ut_ad(btr_page_get_prev(lower_page, mtr) == prev_page_no);
2137   }
2138 
2139   btr_page_set_next(lower_page, lower_page_zip, upper_page_no, mtr);
2140   btr_page_set_prev(upper_page, upper_page_zip, lower_page_no, mtr);
2141 
2142   if (direction != FSP_DOWN) {
2143     /* upper_page is new */
2144     btr_page_set_next(upper_page, upper_page_zip, next_page_no, mtr);
2145   } else {
2146     ut_ad(btr_page_get_next(upper_page, mtr) == next_page_no);
2147   }
2148 }
2149 
2150 /** Determine if a tuple is smaller than any record on the page.
2151  @return true if smaller */
btr_page_tuple_smaller(btr_cur_t * cursor,const dtuple_t * tuple,ulint ** offsets,ulint n_uniq,mem_heap_t ** heap)2152 static MY_ATTRIBUTE((warn_unused_result)) bool btr_page_tuple_smaller(
2153     btr_cur_t *cursor,     /*!< in: b-tree cursor */
2154     const dtuple_t *tuple, /*!< in: tuple to consider */
2155     ulint **offsets,       /*!< in/out: temporary storage */
2156     ulint n_uniq,          /*!< in: number of unique fields
2157                            in the index page records */
2158     mem_heap_t **heap)     /*!< in/out: heap for offsets */
2159 {
2160   buf_block_t *block;
2161   const rec_t *first_rec;
2162   page_cur_t pcur;
2163 
2164   /* Read the first user record in the page. */
2165   block = btr_cur_get_block(cursor);
2166   page_cur_set_before_first(block, &pcur);
2167   page_cur_move_to_next(&pcur);
2168   first_rec = page_cur_get_rec(&pcur);
2169 
2170   *offsets = rec_get_offsets(first_rec, cursor->index, *offsets, n_uniq, heap);
2171 
2172   return (cmp_dtuple_rec(tuple, first_rec, cursor->index, *offsets) < 0);
2173 }
2174 
2175 /** Insert the tuple into the right sibling page, if the cursor is at the end
2176 of a page.
2177 @param[in]	flags	undo logging and locking flags
2178 @param[in,out]	cursor	cursor at which to insert; when the function succeeds,
2179                         the cursor is positioned before the insert point.
2180 @param[out]	offsets	offsets on inserted record
2181 @param[in,out]	heap	memory heap for allocating offsets
2182 @param[in]	tuple	tuple to insert
2183 @param[in,out]	mtr	mini-transaction
2184 @return	inserted record (first record on the right sibling page);
2185         the cursor will be positioned on the page infimum
2186 @retval	NULL if the operation was not performed */
btr_insert_into_right_sibling(uint32_t flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t * heap,const dtuple_t * tuple,mtr_t * mtr)2187 static rec_t *btr_insert_into_right_sibling(uint32_t flags, btr_cur_t *cursor,
2188                                             ulint **offsets, mem_heap_t *heap,
2189                                             const dtuple_t *tuple, mtr_t *mtr) {
2190   buf_block_t *block = btr_cur_get_block(cursor);
2191   page_t *page = buf_block_get_frame(block);
2192   page_no_t next_page_no = btr_page_get_next(page, mtr);
2193 
2194   ut_ad(cursor->index->table->is_intrinsic() ||
2195         mtr_memo_contains_flagged(mtr, dict_index_get_lock(cursor->index),
2196                                   MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
2197   ut_ad(
2198       mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, cursor->index->table));
2199   ut_ad(heap);
2200 
2201   if (next_page_no == FIL_NULL ||
2202       !page_rec_is_supremum(page_rec_get_next(btr_cur_get_rec(cursor)))) {
2203     return (nullptr);
2204   }
2205 
2206   page_cur_t next_page_cursor;
2207   buf_block_t *next_block;
2208   page_t *next_page;
2209   btr_cur_t next_father_cursor;
2210   rec_t *rec = nullptr;
2211   ulint max_size;
2212 
2213   const space_id_t space = block->page.id.space();
2214 
2215   next_block = btr_block_get(page_id_t(space, next_page_no), block->page.size,
2216                              RW_X_LATCH, cursor->index, mtr);
2217   next_page = buf_block_get_frame(next_block);
2218 
2219   bool is_leaf = page_is_leaf(next_page);
2220 
2221   btr_page_get_father(cursor->index, next_block, mtr, &next_father_cursor);
2222 
2223   page_cur_search(next_block, cursor->index, tuple, PAGE_CUR_LE,
2224                   &next_page_cursor);
2225 
2226   max_size = page_get_max_insert_size_after_reorganize(next_page, 1);
2227 
2228   /* Extends gap lock for the next page */
2229   if (!dict_table_is_locking_disabled(cursor->index->table)) {
2230     lock_update_split_left(next_block, block);
2231   }
2232 
2233   rec = page_cur_tuple_insert(&next_page_cursor, tuple, cursor->index, offsets,
2234                               &heap, mtr);
2235 
2236   if (rec == nullptr) {
2237     if (is_leaf && next_block->page.size.is_compressed() &&
2238         !cursor->index->is_clustered() &&
2239         !cursor->index->table->is_temporary()) {
2240       /* Reset the IBUF_BITMAP_FREE bits, because
2241       page_cur_tuple_insert() will have attempted page
2242       reorganize before failing. */
2243       ibuf_reset_free_bits(next_block);
2244     }
2245     return (nullptr);
2246   }
2247 
2248   ibool compressed;
2249   dberr_t err;
2250   ulint level = btr_page_get_level(next_page, mtr);
2251 
2252   /* adjust cursor position */
2253   *btr_cur_get_page_cur(cursor) = next_page_cursor;
2254 
2255   ut_ad(btr_cur_get_rec(cursor) == page_get_infimum_rec(next_page));
2256   ut_ad(page_rec_get_next(page_get_infimum_rec(next_page)) == rec);
2257 
2258   /* We have to change the parent node pointer */
2259 
2260   compressed = btr_cur_pessimistic_delete(&err, TRUE, &next_father_cursor,
2261                                           BTR_CREATE_FLAG, false, 0, 0, 0, mtr);
2262 
2263   ut_a(err == DB_SUCCESS);
2264 
2265   if (!compressed) {
2266     btr_cur_compress_if_useful(&next_father_cursor, FALSE, mtr);
2267   }
2268 
2269   dtuple_t *node_ptr = dict_index_build_node_ptr(
2270       cursor->index, rec, next_block->page.id.page_no(), heap, level);
2271 
2272   btr_insert_on_non_leaf_level(flags, cursor->index, level + 1, node_ptr, mtr);
2273 
2274   ut_ad(rec_offs_validate(rec, cursor->index, *offsets));
2275 
2276   if (is_leaf && !cursor->index->is_clustered() &&
2277       !cursor->index->table->is_temporary()) {
2278     /* Update the free bits of the B-tree page in the
2279     insert buffer bitmap. */
2280 
2281     if (next_block->page.size.is_compressed()) {
2282       ibuf_update_free_bits_zip(next_block, mtr);
2283     } else {
2284       ibuf_update_free_bits_if_full(
2285           next_block, max_size, rec_offs_size(*offsets) + PAGE_DIR_SLOT_SIZE);
2286     }
2287   }
2288 
2289   return (rec);
2290 }
2291 
2292 /** Splits an index page to halves and inserts the tuple. It is assumed
2293  that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
2294  released within this function! NOTE that the operation of this
2295  function must always succeed, we cannot reverse it: therefore enough
2296  free disk space (2 pages) must be guaranteed to be available before
2297  this function is called.
2298  @return inserted record */
btr_page_split_and_insert(uint32_t flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,mtr_t * mtr)2299 rec_t *btr_page_split_and_insert(
2300     uint32_t flags,        /*!< in: undo logging and locking flags */
2301     btr_cur_t *cursor,     /*!< in: cursor at which to insert; when the
2302                            function returns, the cursor is positioned
2303                            on the predecessor of the inserted record */
2304     ulint **offsets,       /*!< out: offsets on inserted record */
2305     mem_heap_t **heap,     /*!< in/out: pointer to memory heap, or NULL */
2306     const dtuple_t *tuple, /*!< in: tuple to insert */
2307     mtr_t *mtr)            /*!< in: mtr */
2308 {
2309   buf_block_t *block;
2310   page_t *page;
2311   page_zip_des_t *page_zip;
2312   page_no_t page_no;
2313   byte direction;
2314   page_no_t hint_page_no;
2315   buf_block_t *new_block;
2316   page_t *new_page;
2317   page_zip_des_t *new_page_zip;
2318   rec_t *split_rec;
2319   buf_block_t *left_block;
2320   buf_block_t *right_block;
2321   buf_block_t *insert_block;
2322   page_cur_t *page_cursor;
2323   rec_t *first_rec;
2324   byte *buf = nullptr; /* remove warning */
2325   rec_t *move_limit;
2326   ibool insert_will_fit;
2327   ibool insert_left;
2328   ulint n_iterations = 0;
2329   rec_t *rec;
2330   ulint n_uniq;
2331   dict_index_t *index;
2332 
2333   index = btr_cur_get_index(cursor);
2334 
2335   if (dict_index_is_spatial(index)) {
2336     /* Split rtree page and update parent */
2337     return (
2338         rtr_page_split_and_insert(flags, cursor, offsets, heap, tuple, mtr));
2339   }
2340 
2341   if (!*heap) {
2342     *heap = mem_heap_create(1024);
2343   }
2344   n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
2345 func_start:
2346   ut_ad(tuple->m_heap != *heap);
2347   mem_heap_empty(*heap);
2348   *offsets = nullptr;
2349 
2350   ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(cursor->index),
2351                                   MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
2352         cursor->index->table->is_intrinsic());
2353   ut_ad(!dict_index_is_online_ddl(cursor->index) || (flags & BTR_CREATE_FLAG) ||
2354         cursor->index->is_clustered());
2355   ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
2356                             RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX) ||
2357         cursor->index->table->is_intrinsic());
2358 
2359   block = btr_cur_get_block(cursor);
2360   page = buf_block_get_frame(block);
2361   page_zip = buf_block_get_page_zip(block);
2362 
2363   ut_ad(
2364       mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, cursor->index->table));
2365   ut_ad(!page_is_empty(page));
2366 
2367   /* try to insert to the next page if possible before split */
2368   rec =
2369       btr_insert_into_right_sibling(flags, cursor, offsets, *heap, tuple, mtr);
2370 
2371   if (rec != nullptr) {
2372     return (rec);
2373   }
2374 
2375   page_no = block->page.id.page_no();
2376 
2377   /* 1. Decide the split record; split_rec == NULL means that the
2378   tuple to be inserted should be the first record on the upper
2379   half-page */
2380   insert_left = FALSE;
2381 
2382   if (n_iterations > 0) {
2383     direction = FSP_UP;
2384     hint_page_no = page_no + 1;
2385     split_rec = btr_page_get_split_rec(cursor, tuple);
2386 
2387     if (split_rec == nullptr) {
2388       insert_left =
2389           btr_page_tuple_smaller(cursor, tuple, offsets, n_uniq, heap);
2390     }
2391   } else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
2392     direction = FSP_UP;
2393     hint_page_no = page_no + 1;
2394 
2395   } else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) {
2396     direction = FSP_DOWN;
2397     hint_page_no = page_no - 1;
2398     ut_ad(split_rec);
2399   } else {
2400     direction = FSP_UP;
2401     hint_page_no = page_no + 1;
2402 
2403     /* If there is only one record in the index page, we
2404     can't split the node in the middle by default. We need
2405     to determine whether the new record will be inserted
2406     to the left or right. */
2407 
2408     if (page_get_n_recs(page) > 1) {
2409       split_rec = page_get_middle_rec(page);
2410     } else if (btr_page_tuple_smaller(cursor, tuple, offsets, n_uniq, heap)) {
2411       split_rec = page_rec_get_next(page_get_infimum_rec(page));
2412     } else {
2413       split_rec = nullptr;
2414     }
2415   }
2416 
2417   /* 2. Allocate a new page to the index */
2418   new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
2419                              btr_page_get_level(page, mtr), mtr, mtr);
2420 
2421   new_page = buf_block_get_frame(new_block);
2422   new_page_zip = buf_block_get_page_zip(new_block);
2423   btr_page_create(new_block, new_page_zip, cursor->index,
2424                   btr_page_get_level(page, mtr), mtr);
2425 
2426   /* 3. Calculate the first record on the upper half-page, and the
2427   first record (move_limit) on original page which ends up on the
2428   upper half */
2429 
2430   if (split_rec) {
2431     first_rec = move_limit = split_rec;
2432 
2433     *offsets =
2434         rec_get_offsets(split_rec, cursor->index, *offsets, n_uniq, heap);
2435 
2436     insert_left = cmp_dtuple_rec(tuple, split_rec, cursor->index, *offsets) < 0;
2437 
2438     if (!insert_left && new_page_zip && n_iterations > 0) {
2439       /* If a compressed page has already been split,
2440       avoid further splits by inserting the record
2441       to an empty page. */
2442       split_rec = nullptr;
2443       goto insert_empty;
2444     }
2445   } else if (insert_left) {
2446     ut_a(n_iterations > 0);
2447     first_rec = page_rec_get_next(page_get_infimum_rec(page));
2448     move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2449   } else {
2450   insert_empty:
2451     ut_ad(!split_rec);
2452     ut_ad(!insert_left);
2453     buf =
2454         UT_NEW_ARRAY_NOKEY(byte, rec_get_converted_size(cursor->index, tuple));
2455 
2456     first_rec = rec_convert_dtuple_to_rec(buf, cursor->index, tuple);
2457     move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
2458   }
2459 
2460   /* 4. Do first the modifications in the tree structure */
2461 
2462   btr_attach_half_pages(flags, cursor->index, block, first_rec, new_block,
2463                         direction, mtr);
2464 
2465   /* If the split is made on the leaf level and the insert will fit
2466   on the appropriate half-page, we may release the tree x-latch.
2467   We can then move the records after releasing the tree latch,
2468   thus reducing the tree latch contention. */
2469 
2470   if (split_rec) {
2471     insert_will_fit =
2472         !new_page_zip &&
2473         btr_page_insert_fits(cursor, split_rec, offsets, tuple, heap);
2474   } else {
2475     if (!insert_left) {
2476       UT_DELETE_ARRAY(buf);
2477       buf = nullptr;
2478     }
2479 
2480     insert_will_fit =
2481         !new_page_zip &&
2482         btr_page_insert_fits(cursor, nullptr, offsets, tuple, heap);
2483   }
2484 
2485   if (!srv_read_only_mode && !cursor->index->table->is_intrinsic() &&
2486       insert_will_fit && page_is_leaf(page) &&
2487       !dict_index_is_online_ddl(cursor->index)) {
2488     mtr->memo_release(dict_index_get_lock(cursor->index),
2489                       MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK);
2490 
2491     /* NOTE: We cannot release root block latch here, because it
2492     has segment header and already modified in most of cases.*/
2493   }
2494 
2495   /* 5. Move then the records to the new page */
2496   if (direction == FSP_DOWN) {
2497     /*		fputs("Split left\n", stderr); */
2498 
2499     if (false
2500 #ifdef UNIV_ZIP_COPY
2501         || page_zip
2502 #endif /* UNIV_ZIP_COPY */
2503         || !page_move_rec_list_start(new_block, block, move_limit,
2504                                      cursor->index, mtr)) {
2505       /* For some reason, compressing new_page failed,
2506       even though it should contain fewer records than
2507       the original page.  Copy the page byte for byte
2508       and then delete the records from both pages
2509       as appropriate.  Deleting will always succeed. */
2510       ut_a(new_page_zip);
2511 
2512       page_zip_copy_recs(new_page_zip, new_page, page_zip, page, cursor->index,
2513                          mtr);
2514       page_delete_rec_list_end(move_limit - page + new_page, new_block,
2515                                cursor->index, ULINT_UNDEFINED, ULINT_UNDEFINED,
2516                                mtr);
2517 
2518       /* Update the lock table and possible hash index. */
2519 
2520       if (!dict_table_is_locking_disabled(cursor->index->table)) {
2521         lock_move_rec_list_start(new_block, block, move_limit,
2522                                  new_page + PAGE_NEW_INFIMUM);
2523       }
2524 
2525       btr_search_move_or_delete_hash_entries(new_block, block, cursor->index);
2526 
2527       /* Delete the records from the source page. */
2528 
2529       page_delete_rec_list_start(move_limit, block, cursor->index, mtr);
2530     }
2531 
2532     left_block = new_block;
2533     right_block = block;
2534 
2535     if (!dict_table_is_locking_disabled(cursor->index->table)) {
2536       lock_update_split_left(right_block, left_block);
2537     }
2538   } else {
2539     /*		fputs("Split right\n", stderr); */
2540 
2541     if (false
2542 #ifdef UNIV_ZIP_COPY
2543         || page_zip
2544 #endif /* UNIV_ZIP_COPY */
2545         || !page_move_rec_list_end(new_block, block, move_limit, cursor->index,
2546                                    mtr)) {
2547       /* For some reason, compressing new_page failed,
2548       even though it should contain fewer records than
2549       the original page.  Copy the page byte for byte
2550       and then delete the records from both pages
2551       as appropriate.  Deleting will always succeed. */
2552       ut_a(new_page_zip);
2553 
2554       page_zip_copy_recs(new_page_zip, new_page, page_zip, page, cursor->index,
2555                          mtr);
2556       page_delete_rec_list_start(move_limit - page + new_page, new_block,
2557                                  cursor->index, mtr);
2558 
2559       /* Update the lock table and possible hash index. */
2560       if (!dict_table_is_locking_disabled(cursor->index->table)) {
2561         lock_move_rec_list_end(new_block, block, move_limit);
2562       }
2563 
2564       ut_ad(!dict_index_is_spatial(index));
2565 
2566       btr_search_move_or_delete_hash_entries(new_block, block, cursor->index);
2567 
2568       /* Delete the records from the source page. */
2569 
2570       page_delete_rec_list_end(move_limit, block, cursor->index,
2571                                ULINT_UNDEFINED, ULINT_UNDEFINED, mtr);
2572     }
2573 
2574     left_block = block;
2575     right_block = new_block;
2576 
2577     if (!dict_table_is_locking_disabled(cursor->index->table)) {
2578       lock_update_split_right(right_block, left_block);
2579     }
2580   }
2581 
2582 #ifdef UNIV_ZIP_DEBUG
2583   if (page_zip) {
2584     ut_a(page_zip_validate(page_zip, page, cursor->index));
2585     ut_a(page_zip_validate(new_page_zip, new_page, cursor->index));
2586   }
2587 #endif /* UNIV_ZIP_DEBUG */
2588 
2589   /* At this point, split_rec, move_limit and first_rec may point
2590   to garbage on the old page. */
2591 
2592   /* 6. The split and the tree modification is now completed. Decide the
2593   page where the tuple should be inserted */
2594 
2595   if (insert_left) {
2596     insert_block = left_block;
2597   } else {
2598     insert_block = right_block;
2599   }
2600 
2601   /* 7. Reposition the cursor for insert and try insertion */
2602   page_cursor = btr_cur_get_page_cur(cursor);
2603 
2604   page_cur_search(insert_block, cursor->index, tuple, page_cursor);
2605 
2606   rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index, offsets, heap,
2607                               mtr);
2608 
2609 #ifdef UNIV_ZIP_DEBUG
2610   {
2611     page_t *insert_page = buf_block_get_frame(insert_block);
2612 
2613     page_zip_des_t *insert_page_zip = buf_block_get_page_zip(insert_block);
2614 
2615     ut_a(!insert_page_zip ||
2616          page_zip_validate(insert_page_zip, insert_page, cursor->index));
2617   }
2618 #endif /* UNIV_ZIP_DEBUG */
2619 
2620   if (rec != nullptr) {
2621     goto func_exit;
2622   }
2623 
2624   /* 8. If insert did not fit, try page reorganization.
2625   For compressed pages, page_cur_tuple_insert() will have
2626   attempted this already. */
2627 
2628   if (page_cur_get_page_zip(page_cursor) ||
2629       !btr_page_reorganize(page_cursor, cursor->index, mtr)) {
2630     goto insert_failed;
2631   }
2632 
2633   rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index, offsets, heap,
2634                               mtr);
2635 
2636   if (rec == nullptr) {
2637     /* The insert did not fit on the page: loop back to the
2638     start of the function for a new split */
2639   insert_failed:
2640     /* We play safe and reset the free bits for new_page */
2641     if (!cursor->index->is_clustered() &&
2642         !cursor->index->table->is_temporary()) {
2643       ibuf_reset_free_bits(new_block);
2644       ibuf_reset_free_bits(block);
2645     }
2646 
2647     n_iterations++;
2648     ut_ad(n_iterations < 2 || buf_block_get_page_zip(insert_block));
2649     ut_ad(!insert_will_fit);
2650 
2651     goto func_start;
2652   }
2653 
2654 func_exit:
2655   /* Insert fit on the page: update the free bits for the
2656   left and right pages in the same mtr */
2657 
2658   if (!cursor->index->is_clustered() && !cursor->index->table->is_temporary() &&
2659       page_is_leaf(page)) {
2660     ibuf_update_free_bits_for_two_pages_low(left_block, right_block, mtr);
2661   }
2662 
2663   MONITOR_INC(MONITOR_INDEX_SPLIT);
2664 
2665   ut_ad(page_validate(buf_block_get_frame(left_block), cursor->index));
2666   ut_ad(page_validate(buf_block_get_frame(right_block), cursor->index));
2667 
2668   ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
2669   return (rec);
2670 }
2671 
2672 /** Removes a page from the level list of pages.
2673 @param[in]	space		space where removed
2674 @param[in]	page_size	page size
2675 @param[in,out]	page		page to remove
2676 @param[in]	index		index tree
2677 @param[in,out]	mtr		mini-transaction */
2678 #define btr_level_list_remove(space, page_size, page, index, mtr) \
2679   btr_level_list_remove_func(space, page_size, page, index, mtr)
2680 
2681 /** Removes a page from the level list of pages.
2682 @param[in]	space		space where removed
2683 @param[in]	page_size	page size
2684 @param[in,out]	page		page to remove
2685 @param[in]	index		index tree
2686 @param[in,out]	mtr		mini-transaction */
btr_level_list_remove_func(space_id_t space,const page_size_t & page_size,page_t * page,const dict_index_t * index,mtr_t * mtr)2687 static void btr_level_list_remove_func(space_id_t space,
2688                                        const page_size_t &page_size,
2689                                        page_t *page, const dict_index_t *index,
2690                                        mtr_t *mtr) {
2691   ut_ad(page != nullptr);
2692   ut_ad(mtr != nullptr);
2693   ut_ad(mtr_is_page_fix(mtr, page, MTR_MEMO_PAGE_X_FIX, index->table));
2694   ut_ad(space == page_get_space_id(page));
2695   /* Get the previous and next page numbers of page */
2696 
2697   const page_no_t prev_page_no = btr_page_get_prev(page, mtr);
2698   const page_no_t next_page_no = btr_page_get_next(page, mtr);
2699 
2700   /* Update page links of the level */
2701 
2702   if (prev_page_no != FIL_NULL) {
2703     buf_block_t *prev_block = btr_block_get(page_id_t(space, prev_page_no),
2704                                             page_size, RW_X_LATCH, index, mtr);
2705 
2706     page_t *prev_page = buf_block_get_frame(prev_block);
2707 #ifdef UNIV_BTR_DEBUG
2708     ut_a(page_is_comp(prev_page) == page_is_comp(page));
2709     ut_a(btr_page_get_next(prev_page, mtr) == page_get_page_no(page));
2710 #endif /* UNIV_BTR_DEBUG */
2711 
2712     btr_page_set_next(prev_page, buf_block_get_page_zip(prev_block),
2713                       next_page_no, mtr);
2714   }
2715 
2716   if (next_page_no != FIL_NULL) {
2717     buf_block_t *next_block = btr_block_get(page_id_t(space, next_page_no),
2718                                             page_size, RW_X_LATCH, index, mtr);
2719 
2720     page_t *next_page = buf_block_get_frame(next_block);
2721 #ifdef UNIV_BTR_DEBUG
2722     ut_a(page_is_comp(next_page) == page_is_comp(page));
2723     ut_a(btr_page_get_prev(next_page, mtr) == page_get_page_no(page));
2724 #endif /* UNIV_BTR_DEBUG */
2725 
2726     btr_page_set_prev(next_page, buf_block_get_page_zip(next_block),
2727                       prev_page_no, mtr);
2728   }
2729 }
2730 
2731 /** Writes the redo log record for setting an index record as the predefined
2732  minimum record. */
2733 UNIV_INLINE
btr_set_min_rec_mark_log(rec_t * rec,mlog_id_t type,mtr_t * mtr)2734 void btr_set_min_rec_mark_log(rec_t *rec,     /*!< in: record */
2735                               mlog_id_t type, /*!< in: MLOG_COMP_REC_MIN_MARK or
2736                                               MLOG_REC_MIN_MARK */
2737                               mtr_t *mtr)     /*!< in: mtr */
2738 {
2739   mlog_write_initial_log_record(rec, type, mtr);
2740 
2741   /* Write rec offset as a 2-byte ulint */
2742   mlog_catenate_ulint(mtr, page_offset(rec), MLOG_2BYTES);
2743 }
2744 #else /* !UNIV_HOTBACKUP */
2745 #define btr_set_min_rec_mark_log(rec, comp, mtr) ((void)0)
2746 #endif /* !UNIV_HOTBACKUP */
2747 
2748 /** Parses the redo log record for setting an index record as the predefined
2749  minimum record.
2750  @return end of log record or NULL */
btr_parse_set_min_rec_mark(byte * ptr,byte * end_ptr,ulint comp,page_t * page,mtr_t * mtr)2751 byte *btr_parse_set_min_rec_mark(
2752     byte *ptr,     /*!< in: buffer */
2753     byte *end_ptr, /*!< in: buffer end */
2754     ulint comp,    /*!< in: nonzero=compact page format */
2755     page_t *page,  /*!< in: page or NULL */
2756     mtr_t *mtr)    /*!< in: mtr or NULL */
2757 {
2758   rec_t *rec;
2759 
2760   if (end_ptr < ptr + 2) {
2761     return (nullptr);
2762   }
2763 
2764   if (page) {
2765     ut_a(!page_is_comp(page) == !comp);
2766 
2767     rec = page + mach_read_from_2(ptr);
2768 
2769     btr_set_min_rec_mark(rec, mtr);
2770   }
2771 
2772   return (ptr + 2);
2773 }
2774 
2775 /** Sets a record as the predefined minimum record. */
btr_set_min_rec_mark(rec_t * rec,mtr_t * mtr)2776 void btr_set_min_rec_mark(rec_t *rec, /*!< in: record */
2777                           mtr_t *mtr) /*!< in: mtr */
2778 {
2779   ulint info_bits;
2780 
2781   if (page_rec_is_comp(rec)) {
2782     info_bits = rec_get_info_bits(rec, TRUE);
2783 
2784     rec_set_info_bits_new(rec, info_bits | REC_INFO_MIN_REC_FLAG);
2785 
2786     btr_set_min_rec_mark_log(rec, MLOG_COMP_REC_MIN_MARK, mtr);
2787   } else {
2788     info_bits = rec_get_info_bits(rec, FALSE);
2789 
2790     rec_set_info_bits_old(rec, info_bits | REC_INFO_MIN_REC_FLAG);
2791 
2792     btr_set_min_rec_mark_log(rec, MLOG_REC_MIN_MARK, mtr);
2793   }
2794 }
2795 
2796 #ifndef UNIV_HOTBACKUP
2797 /** Deletes on the upper level the node pointer to a page. */
btr_node_ptr_delete(dict_index_t * index,buf_block_t * block,mtr_t * mtr)2798 void btr_node_ptr_delete(
2799     dict_index_t *index, /*!< in: index tree */
2800     buf_block_t *block,  /*!< in: page whose node pointer is deleted */
2801     mtr_t *mtr)          /*!< in: mtr */
2802 {
2803   btr_cur_t cursor;
2804   ibool compressed;
2805   dberr_t err;
2806 
2807   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2808 
2809   /* Delete node pointer on father page */
2810   btr_page_get_father(index, block, mtr, &cursor);
2811 
2812   compressed = btr_cur_pessimistic_delete(&err, TRUE, &cursor, BTR_CREATE_FLAG,
2813                                           false, 0, 0, 0, mtr);
2814   ut_a(err == DB_SUCCESS);
2815 
2816   if (!compressed) {
2817     btr_cur_compress_if_useful(&cursor, FALSE, mtr);
2818   }
2819 }
2820 
2821 /** If page is the only on its level, this function moves its records to the
2822  father page, thus reducing the tree height.
2823  @return father block */
btr_lift_page_up(dict_index_t * index,buf_block_t * block,mtr_t * mtr)2824 static buf_block_t *btr_lift_page_up(
2825     dict_index_t *index, /*!< in: index tree */
2826     buf_block_t *block,  /*!< in: page which is the only on its level;
2827                          must not be empty: use
2828                          btr_discard_only_page_on_level if the last
2829                          record from the page should be removed */
2830     mtr_t *mtr)          /*!< in: mtr */
2831 {
2832   buf_block_t *father_block;
2833   page_t *father_page;
2834   ulint page_level;
2835   page_zip_des_t *father_page_zip;
2836   page_t *page = buf_block_get_frame(block);
2837   page_no_t root_page_no;
2838   buf_block_t *blocks[BTR_MAX_LEVELS];
2839   ulint n_blocks; /*!< last used index in blocks[] */
2840   ulint i;
2841   bool lift_father_up;
2842   buf_block_t *block_orig = block;
2843 
2844   ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
2845   ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
2846   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2847 
2848   page_level = btr_page_get_level(page, mtr);
2849   root_page_no = dict_index_get_page(index);
2850 
2851   {
2852     btr_cur_t cursor;
2853     ulint *offsets = nullptr;
2854     mem_heap_t *heap = mem_heap_create(
2855         sizeof(*offsets) * (REC_OFFS_HEADER_SIZE + 1 + 1 + index->n_fields));
2856     buf_block_t *b;
2857 
2858     if (dict_index_is_spatial(index)) {
2859       offsets = rtr_page_get_father_block(nullptr, heap, index, block, mtr,
2860                                           nullptr, &cursor);
2861     } else {
2862       offsets =
2863           btr_page_get_father_block(offsets, heap, index, block, mtr, &cursor);
2864     }
2865     father_block = btr_cur_get_block(&cursor);
2866     father_page_zip = buf_block_get_page_zip(father_block);
2867     father_page = buf_block_get_frame(father_block);
2868 
2869     n_blocks = 0;
2870 
2871     /* Store all ancestor pages so we can reset their
2872     levels later on.  We have to do all the searches on
2873     the tree now because later on, after we've replaced
2874     the first level, the tree is in an inconsistent state
2875     and can not be searched. */
2876     for (b = father_block; b->page.id.page_no() != root_page_no;) {
2877       ut_a(n_blocks < BTR_MAX_LEVELS);
2878 
2879       if (dict_index_is_spatial(index)) {
2880         offsets = rtr_page_get_father_block(nullptr, heap, index, b, mtr,
2881                                             nullptr, &cursor);
2882       } else {
2883         offsets =
2884             btr_page_get_father_block(offsets, heap, index, b, mtr, &cursor);
2885       }
2886 
2887       blocks[n_blocks++] = b = btr_cur_get_block(&cursor);
2888     }
2889 
2890     lift_father_up = (n_blocks && page_level == 0);
2891     if (lift_father_up) {
2892       /* The father page also should be the only on its level (not
2893       root). We should lift up the father page at first.
2894       Because the leaf page should be lifted up only for root page.
2895       The freeing page is based on page_level (==0 or !=0)
2896       to choose segment. If the page_level is changed ==0 from !=0,
2897       later freeing of the page doesn't find the page allocation
2898       to be freed.*/
2899 
2900       block = father_block;
2901       page = buf_block_get_frame(block);
2902       page_level = btr_page_get_level(page, mtr);
2903 
2904       ut_ad(btr_page_get_prev(page, mtr) == FIL_NULL);
2905       ut_ad(btr_page_get_next(page, mtr) == FIL_NULL);
2906       ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
2907 
2908       father_block = blocks[0];
2909       father_page_zip = buf_block_get_page_zip(father_block);
2910       father_page = buf_block_get_frame(father_block);
2911     }
2912 
2913     mem_heap_free(heap);
2914   }
2915 
2916   btr_search_drop_page_hash_index(block);
2917 
2918   /* Make the father empty */
2919   btr_page_empty(father_block, father_page_zip, index, page_level, mtr);
2920   page_level++;
2921 
2922   /* Copy the records to the father page one by one. */
2923   if (false
2924 #ifdef UNIV_ZIP_COPY
2925       || father_page_zip
2926 #endif /* UNIV_ZIP_COPY */
2927       || !page_copy_rec_list_end(father_block, block,
2928                                  page_get_infimum_rec(page), index, mtr)) {
2929     const page_zip_des_t *page_zip = buf_block_get_page_zip(block);
2930     ut_a(father_page_zip);
2931     ut_a(page_zip);
2932 
2933     /* Copy the page byte for byte. */
2934     page_zip_copy_recs(father_page_zip, father_page, page_zip, page, index,
2935                        mtr);
2936 
2937     /* Update the lock table and possible hash index. */
2938 
2939     if (!dict_table_is_locking_disabled(index->table)) {
2940       lock_move_rec_list_end(father_block, block, page_get_infimum_rec(page));
2941     }
2942 
2943     /* Also update the predicate locks */
2944     if (dict_index_is_spatial(index)) {
2945       lock_prdt_rec_move(father_block, block);
2946     }
2947 
2948     btr_search_move_or_delete_hash_entries(father_block, block, index);
2949   }
2950 
2951   if (!dict_table_is_locking_disabled(index->table)) {
2952     /* Free predicate page locks on the block */
2953     if (dict_index_is_spatial(index)) {
2954       locksys::Shard_latch_guard guard{block->get_page_id()};
2955       lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
2956     }
2957     lock_update_copy_and_discard(father_block, block);
2958   }
2959 
2960   /* Go upward to root page, decrementing levels by one. */
2961   for (i = lift_father_up ? 1 : 0; i < n_blocks; i++, page_level++) {
2962     page_t *page = buf_block_get_frame(blocks[i]);
2963     page_zip_des_t *page_zip = buf_block_get_page_zip(blocks[i]);
2964 
2965     ut_ad(btr_page_get_level(page, mtr) == page_level + 1);
2966 
2967     btr_page_set_level(page, page_zip, page_level, mtr);
2968 #ifdef UNIV_ZIP_DEBUG
2969     ut_a(!page_zip || page_zip_validate(page_zip, page, index));
2970 #endif /* UNIV_ZIP_DEBUG */
2971   }
2972 
2973   if (dict_index_is_spatial(index)) {
2974     rtr_check_discard_page(index, nullptr, block);
2975   }
2976 
2977   /* Free the file page */
2978   btr_page_free(index, block, mtr);
2979 
2980   /* We play it safe and reset the free bits for the father */
2981   if (!index->is_clustered() && !index->table->is_temporary()) {
2982     ibuf_reset_free_bits(father_block);
2983   }
2984   ut_ad(page_validate(father_page, index));
2985   ut_ad(btr_check_node_ptr(index, father_block, mtr));
2986 
2987   return (lift_father_up ? block_orig : father_block);
2988 }
2989 
2990 /** Tries to merge the page first to the left immediate brother if such a
2991  brother exists, and the node pointers to the current page and to the brother
2992  reside on the same page. If the left brother does not satisfy these
2993  conditions, looks at the right brother. If the page is the only one on that
2994  level lifts the records of the page to the father page, thus reducing the
2995  tree height. It is assumed that mtr holds an x-latch on the tree and on the
2996  page. If cursor is on the leaf level, mtr must also hold x-latches to the
2997  brothers, if they exist.
2998  @return true on success */
btr_compress(btr_cur_t * cursor,ibool adjust,mtr_t * mtr)2999 ibool btr_compress(
3000     btr_cur_t *cursor, /*!< in/out: cursor on the page to merge
3001                        or lift; the page must not be empty:
3002                        when deleting records, use btr_discard_page()
3003                        if the page would become empty */
3004     ibool adjust,      /*!< in: TRUE if should adjust the
3005                        cursor position even if compression occurs */
3006     mtr_t *mtr)        /*!< in/out: mini-transaction */
3007 {
3008   dict_index_t *index;
3009   space_id_t space;
3010   page_no_t left_page_no;
3011   page_no_t right_page_no;
3012   buf_block_t *merge_block;
3013   page_t *merge_page = nullptr;
3014   page_zip_des_t *merge_page_zip;
3015   ibool is_left;
3016   buf_block_t *block;
3017   page_t *page;
3018   btr_cur_t father_cursor;
3019   mem_heap_t *heap;
3020   ulint *offsets;
3021   ulint nth_rec = 0; /* remove bogus warning */
3022   bool mbr_changed = false;
3023 #ifdef UNIV_DEBUG
3024   bool leftmost_child;
3025 #endif
3026   DBUG_TRACE;
3027 
3028   block = btr_cur_get_block(cursor);
3029   page = btr_cur_get_page(cursor);
3030   index = btr_cur_get_index(cursor);
3031 
3032   btr_assert_not_corrupted(block, index);
3033 
3034 #ifdef UNIV_DEBUG
3035   if (dict_index_is_spatial(index)) {
3036     ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3037                                     MTR_MEMO_X_LOCK));
3038   } else {
3039     ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3040                                     MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
3041           index->table->is_intrinsic());
3042   }
3043 #endif /* UNIV_DEBUG */
3044 
3045   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3046   space = dict_index_get_space(index);
3047 
3048   const page_size_t page_size(dict_table_page_size(index->table));
3049 
3050   MONITOR_INC(MONITOR_INDEX_MERGE_ATTEMPTS);
3051 
3052   left_page_no = btr_page_get_prev(page, mtr);
3053   right_page_no = btr_page_get_next(page, mtr);
3054 
3055 #ifdef UNIV_DEBUG
3056   if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3057     ut_a(REC_INFO_MIN_REC_FLAG &
3058          rec_get_info_bits(page_rec_get_next(page_get_infimum_rec(page)),
3059                            page_is_comp(page)));
3060   }
3061 #endif /* UNIV_DEBUG */
3062 
3063   heap = mem_heap_create(100);
3064 
3065   if (dict_index_is_spatial(index)) {
3066     offsets = rtr_page_get_father_block(nullptr, heap, index, block, mtr,
3067                                         cursor, &father_cursor);
3068     ut_ad(cursor->page_cur.block->page.id.page_no() ==
3069           block->page.id.page_no());
3070     rec_t *my_rec = father_cursor.page_cur.rec;
3071 
3072     page_no_t page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3073 
3074     if (page_no != block->page.id.page_no()) {
3075       ib::info(ER_IB_MSG_32) << "father positioned on page " << page_no
3076                              << "instead of " << block->page.id.page_no();
3077       offsets = btr_page_get_father_block(nullptr, heap, index, block, mtr,
3078                                           &father_cursor);
3079     }
3080   } else {
3081     offsets = btr_page_get_father_block(nullptr, heap, index, block, mtr,
3082                                         &father_cursor);
3083   }
3084 
3085   if (adjust) {
3086     nth_rec = page_rec_get_n_recs_before(btr_cur_get_rec(cursor));
3087     ut_ad(nth_rec > 0);
3088   }
3089 
3090   if (left_page_no == FIL_NULL && right_page_no == FIL_NULL) {
3091     /* The page is the only one on the level, lift the records
3092     to the father */
3093 
3094     merge_block = btr_lift_page_up(index, block, mtr);
3095     goto func_exit;
3096   }
3097 
3098   ut_d(leftmost_child =
3099            left_page_no != FIL_NULL &&
3100            (page_rec_get_next(page_get_infimum_rec(btr_cur_get_page(
3101                 &father_cursor))) == btr_cur_get_rec(&father_cursor)));
3102 
3103   /* Decide the page to which we try to merge and which will inherit
3104   the locks */
3105 
3106   is_left = btr_can_merge_with_page(cursor, left_page_no, &merge_block, mtr);
3107 
3108   DBUG_EXECUTE_IF("ib_always_merge_right", is_left = FALSE;);
3109 retry:
3110   if (!is_left &&
3111       !btr_can_merge_with_page(cursor, right_page_no, &merge_block, mtr)) {
3112     if (!merge_block) {
3113       merge_page = nullptr;
3114     }
3115     goto err_exit;
3116   }
3117 
3118   merge_page = buf_block_get_frame(merge_block);
3119 
3120 #ifdef UNIV_BTR_DEBUG
3121   if (is_left) {
3122     ut_a(btr_page_get_next(merge_page, mtr) == block->page.id.page_no());
3123   } else {
3124     ut_a(btr_page_get_prev(merge_page, mtr) == block->page.id.page_no());
3125   }
3126 #endif /* UNIV_BTR_DEBUG */
3127 
3128 #ifdef UNIV_GIS_DEBUG
3129   if (dict_index_is_spatial(index)) {
3130     if (is_left) {
3131       fprintf(stderr, "GIS_DIAG: merge left  %ld to %ld \n",
3132               (long)block->page.id.page_no(), left_page_no);
3133     } else {
3134       fprintf(stderr, "GIS_DIAG: merge right %ld to %ld\n",
3135               (long)block->page.id.page_no(), right_page_no);
3136     }
3137   }
3138 #endif /* UNIV_GIS_DEBUG */
3139 
3140   ut_ad(page_validate(merge_page, index));
3141 
3142   merge_page_zip = buf_block_get_page_zip(merge_block);
3143 #ifdef UNIV_ZIP_DEBUG
3144   if (merge_page_zip) {
3145     const page_zip_des_t *page_zip = buf_block_get_page_zip(block);
3146     ut_a(page_zip);
3147     ut_a(page_zip_validate(merge_page_zip, merge_page, index));
3148     ut_a(page_zip_validate(page_zip, page, index));
3149   }
3150 #endif /* UNIV_ZIP_DEBUG */
3151 
3152   /* Move records to the merge page */
3153   if (is_left) {
3154     btr_cur_t cursor2;
3155     rtr_mbr_t new_mbr;
3156     ulint *offsets2 = nullptr;
3157 
3158     /* For rtree, we need to update father's mbr. */
3159     if (dict_index_is_spatial(index)) {
3160       /* We only support merge pages with the same parent
3161       page */
3162       if (!rtr_check_same_block(index, &cursor2,
3163                                 btr_cur_get_block(&father_cursor), merge_block,
3164                                 heap)) {
3165         is_left = false;
3166         goto retry;
3167       }
3168 
3169       /* Set rtr_info for cursor2, since it is
3170       necessary in recursive page merge. */
3171       cursor2.rtr_info = cursor->rtr_info;
3172       cursor2.tree_height = cursor->tree_height;
3173 
3174       offsets2 = rec_get_offsets(btr_cur_get_rec(&cursor2), index, nullptr,
3175                                  ULINT_UNDEFINED, &heap);
3176 
3177       /* Check if parent entry needs to be updated */
3178       mbr_changed =
3179           rtr_merge_mbr_changed(&cursor2, &father_cursor, offsets2, offsets,
3180                                 &new_mbr, merge_block, block, index);
3181     }
3182 
3183     rec_t *orig_pred = page_copy_rec_list_start(
3184         merge_block, block, page_get_supremum_rec(page), index, mtr);
3185 
3186     if (!orig_pred) {
3187       goto err_exit;
3188     }
3189 
3190     btr_search_drop_page_hash_index(block);
3191 
3192     /* Remove the page from the level list */
3193     btr_level_list_remove(space, page_size, page, index, mtr);
3194 
3195     if (dict_index_is_spatial(index)) {
3196       rec_t *my_rec = father_cursor.page_cur.rec;
3197 
3198       page_no_t page_no = btr_node_ptr_get_child_page_no(my_rec, offsets);
3199 
3200       if (page_no != block->page.id.page_no()) {
3201         ib::fatal(ER_IB_MSG_33) << "father positioned on " << page_no
3202                                 << " instead of " << block->page.id.page_no();
3203 
3204         ut_ad(0);
3205       }
3206 
3207       if (mbr_changed) {
3208 #ifdef UNIV_DEBUG
3209         bool success = rtr_update_mbr_field(&cursor2, offsets2, &father_cursor,
3210                                             merge_page, &new_mbr, nullptr, mtr);
3211 
3212         ut_ad(success);
3213 #else
3214         rtr_update_mbr_field(&cursor2, offsets2, &father_cursor, merge_page,
3215                              &new_mbr, NULL, mtr);
3216 #endif
3217       } else {
3218         rtr_node_ptr_delete(index, &father_cursor, block, mtr);
3219       }
3220 
3221       /* No GAP lock needs to be worrying about */
3222       locksys::Shard_latch_guard guard{block->get_page_id()};
3223       lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
3224       lock_rec_free_all_from_discard_page(block);
3225     } else {
3226       btr_node_ptr_delete(index, block, mtr);
3227       if (!dict_table_is_locking_disabled(index->table)) {
3228         lock_update_merge_left(merge_block, orig_pred, block);
3229       }
3230     }
3231 
3232     if (adjust) {
3233       nth_rec += page_rec_get_n_recs_before(orig_pred);
3234     }
3235   } else {
3236     rec_t *orig_succ;
3237     ibool compressed;
3238     dberr_t err;
3239     btr_cur_t cursor2;
3240     /* father cursor pointing to node ptr
3241     of the right sibling */
3242 #ifdef UNIV_BTR_DEBUG
3243     byte fil_page_prev[4];
3244 #endif /* UNIV_BTR_DEBUG */
3245 
3246     if (dict_index_is_spatial(index)) {
3247       cursor2.rtr_info = nullptr;
3248 
3249       /* For spatial index, we disallow merge of blocks
3250       with different parents, since the merge would need
3251       to update entry (for MBR and Primary key) in the
3252       parent of block being merged */
3253       if (!rtr_check_same_block(index, &cursor2,
3254                                 btr_cur_get_block(&father_cursor), merge_block,
3255                                 heap)) {
3256         goto err_exit;
3257       }
3258 
3259       /* Set rtr_info for cursor2, since it is
3260       necessary in recursive page merge. */
3261       cursor2.rtr_info = cursor->rtr_info;
3262       cursor2.tree_height = cursor->tree_height;
3263     } else {
3264       btr_page_get_father(index, merge_block, mtr, &cursor2);
3265     }
3266 
3267     if (merge_page_zip && left_page_no == FIL_NULL) {
3268       /* The function page_zip_compress(), which will be
3269       invoked by page_copy_rec_list_end() below,
3270       requires that FIL_PAGE_PREV be FIL_NULL.
3271       Clear the field, but prepare to restore it. */
3272 #ifdef UNIV_BTR_DEBUG
3273       memcpy(fil_page_prev, merge_page + FIL_PAGE_PREV, 4);
3274 #endif /* UNIV_BTR_DEBUG */
3275       static_assert(FIL_NULL == 0xffffffff, "FIL_NULL != 0xffffffff");
3276       memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
3277     }
3278 
3279     orig_succ = page_copy_rec_list_end(
3280         merge_block, block, page_get_infimum_rec(page), cursor->index, mtr);
3281 
3282     if (!orig_succ) {
3283       ut_a(merge_page_zip);
3284 #ifdef UNIV_BTR_DEBUG
3285       if (left_page_no == FIL_NULL) {
3286         /* FIL_PAGE_PREV was restored from
3287         merge_page_zip. */
3288         ut_a(!memcmp(fil_page_prev, merge_page + FIL_PAGE_PREV, 4));
3289       }
3290 #endif /* UNIV_BTR_DEBUG */
3291       goto err_exit;
3292     }
3293 
3294     btr_search_drop_page_hash_index(block);
3295 
3296 #ifdef UNIV_BTR_DEBUG
3297     if (merge_page_zip && left_page_no == FIL_NULL) {
3298       /* Restore FIL_PAGE_PREV in order to avoid an assertion
3299       failure in btr_level_list_remove(), which will set
3300       the field again to FIL_NULL.  Even though this makes
3301       merge_page and merge_page_zip inconsistent for a
3302       split second, it is harmless, because the pages
3303       are X-latched. */
3304       memcpy(merge_page + FIL_PAGE_PREV, fil_page_prev, 4);
3305     }
3306 #endif /* UNIV_BTR_DEBUG */
3307 
3308     /* Remove the page from the level list */
3309     btr_level_list_remove(space, page_size, page, index, mtr);
3310 
3311     ut_ad(btr_node_ptr_get_child_page_no(btr_cur_get_rec(&father_cursor),
3312                                          offsets) == block->page.id.page_no());
3313 
3314     /* Replace the address of the old child node (= page) with the
3315     address of the merge page to the right */
3316     btr_node_ptr_set_child_page_no(btr_cur_get_rec(&father_cursor),
3317                                    btr_cur_get_page_zip(&father_cursor),
3318                                    offsets, right_page_no, mtr);
3319 
3320 #ifdef UNIV_DEBUG
3321     if (!page_is_leaf(page) && left_page_no == FIL_NULL) {
3322       ut_ad(REC_INFO_MIN_REC_FLAG &
3323             rec_get_info_bits(page_rec_get_next(page_get_infimum_rec(
3324                                   buf_block_get_frame(merge_block))),
3325                               page_is_comp(page)));
3326     }
3327 #endif /* UNIV_DEBUG */
3328 
3329     /* For rtree, we need to update father's mbr. */
3330     if (dict_index_is_spatial(index)) {
3331       ulint *offsets2;
3332       ulint rec_info;
3333 
3334       offsets2 = rec_get_offsets(btr_cur_get_rec(&cursor2), index, nullptr,
3335                                  ULINT_UNDEFINED, &heap);
3336 
3337       ut_ad(btr_node_ptr_get_child_page_no(btr_cur_get_rec(&cursor2),
3338                                            offsets2) == right_page_no);
3339 
3340       rec_info = rec_get_info_bits(btr_cur_get_rec(&father_cursor),
3341                                    rec_offs_comp(offsets));
3342       if (rec_info & REC_INFO_MIN_REC_FLAG) {
3343         /* When the father node ptr is minimal rec,
3344         we will keep it and delete the node ptr of
3345         merge page. */
3346         rtr_merge_and_update_mbr(&father_cursor, &cursor2, offsets, offsets2,
3347                                  merge_page, merge_block, block, index, mtr);
3348       } else {
3349         /* Otherwise, we will keep the node ptr of
3350         merge page and delete the father node ptr.
3351         This is for keeping the rec order in upper
3352         level. */
3353         rtr_merge_and_update_mbr(&cursor2, &father_cursor, offsets2, offsets,
3354                                  merge_page, merge_block, block, index, mtr);
3355       }
3356       locksys::Shard_latch_guard guard{block->get_page_id()};
3357       lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
3358       lock_rec_free_all_from_discard_page(block);
3359     } else {
3360       compressed = btr_cur_pessimistic_delete(
3361           &err, TRUE, &cursor2, BTR_CREATE_FLAG, false, 0, 0, 0, mtr);
3362       ut_a(err == DB_SUCCESS);
3363 
3364       if (!compressed) {
3365         btr_cur_compress_if_useful(&cursor2, FALSE, mtr);
3366       }
3367 
3368       if (!dict_table_is_locking_disabled(index->table)) {
3369         lock_update_merge_right(merge_block, orig_succ, block);
3370       }
3371     }
3372   }
3373 
3374   if (!index->is_clustered() && !index->table->is_temporary() &&
3375       page_is_leaf(merge_page)) {
3376     /* Update the free bits of the B-tree page in the
3377     insert buffer bitmap.  This has to be done in a
3378     separate mini-transaction that is committed before the
3379     main mini-transaction.  We cannot update the insert
3380     buffer bitmap in this mini-transaction, because
3381     btr_compress() can be invoked recursively without
3382     committing the mini-transaction in between.  Since
3383     insert buffer bitmap pages have a lower rank than
3384     B-tree pages, we must not access other pages in the
3385     same mini-transaction after accessing an insert buffer
3386     bitmap page. */
3387 
3388     /* The free bits in the insert buffer bitmap must
3389     never exceed the free space on a page.  It is safe to
3390     decrement or reset the bits in the bitmap in a
3391     mini-transaction that is committed before the
3392     mini-transaction that affects the free space. */
3393 
3394     /* It is unsafe to increment the bits in a separately
3395     committed mini-transaction, because in crash recovery,
3396     the free bits could momentarily be set too high. */
3397 
3398     if (page_size.is_compressed()) {
3399       /* Because the free bits may be incremented
3400       and we cannot update the insert buffer bitmap
3401       in the same mini-transaction, the only safe
3402       thing we can do here is the pessimistic
3403       approach: reset the free bits. */
3404       ibuf_reset_free_bits(merge_block);
3405     } else {
3406       /* On uncompressed pages, the free bits will
3407       never increase here.  Thus, it is safe to
3408       write the bits accurately in a separate
3409       mini-transaction. */
3410       ibuf_update_free_bits_if_full(merge_block, UNIV_PAGE_SIZE,
3411                                     ULINT_UNDEFINED);
3412     }
3413   }
3414 
3415   ut_ad(page_validate(merge_page, index));
3416 #ifdef UNIV_ZIP_DEBUG
3417   ut_a(!merge_page_zip || page_zip_validate(merge_page_zip, merge_page, index));
3418 #endif /* UNIV_ZIP_DEBUG */
3419 
3420   if (dict_index_is_spatial(index)) {
3421 #ifdef UNIV_GIS_DEBUG
3422     fprintf(stderr, "GIS_DIAG: compressed away  %ld\n",
3423             (long)block->page.id.page_no());
3424     fprintf(stderr, "GIS_DIAG: merged to %ld\n",
3425             (long)merge_block->page.id.page_no());
3426 #endif
3427 
3428     rtr_check_discard_page(index, nullptr, block);
3429   }
3430 
3431   /* Free the file page */
3432   btr_page_free(index, block, mtr);
3433 
3434   /* btr_check_node_ptr() needs parent block latched.
3435   If the merge_block's parent block is not same,
3436   we cannot use btr_check_node_ptr() */
3437   ut_ad(leftmost_child || btr_check_node_ptr(index, merge_block, mtr));
3438 func_exit:
3439   mem_heap_free(heap);
3440 
3441   if (adjust) {
3442     ut_ad(nth_rec > 0);
3443     btr_cur_position(index, page_rec_get_nth(merge_block->frame, nth_rec),
3444                      merge_block, cursor);
3445   }
3446 
3447   MONITOR_INC(MONITOR_INDEX_MERGE_SUCCESSFUL);
3448 
3449   return TRUE;
3450 
3451 err_exit:
3452   /* We play it safe and reset the free bits. */
3453   if (page_size.is_compressed() && merge_page && page_is_leaf(merge_page) &&
3454       !index->is_clustered()) {
3455     ibuf_reset_free_bits(merge_block);
3456   }
3457 
3458   mem_heap_free(heap);
3459   return FALSE;
3460 }
3461 
3462 /** Discards a page that is the only page on its level.  This will empty
3463  the whole B-tree, leaving just an empty root page.  This function
3464  should never be reached, because btr_compress(), which is invoked in
3465  delete operations, calls btr_lift_page_up() to flatten the B-tree. */
btr_discard_only_page_on_level(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3466 static void btr_discard_only_page_on_level(
3467     dict_index_t *index, /*!< in: index tree */
3468     buf_block_t *block,  /*!< in: page which is the only on its level */
3469     mtr_t *mtr)          /*!< in: mtr */
3470 {
3471   ulint page_level = 0;
3472   trx_id_t max_trx_id;
3473 
3474   /* Save the PAGE_MAX_TRX_ID from the leaf page. */
3475   max_trx_id = page_get_max_trx_id(buf_block_get_frame(block));
3476 
3477   while (block->page.id.page_no() != dict_index_get_page(index)) {
3478     btr_cur_t cursor;
3479     buf_block_t *father;
3480     const page_t *page = buf_block_get_frame(block);
3481 
3482     ut_a(page_get_n_recs(page) == 1);
3483     ut_a(page_level == btr_page_get_level(page, mtr));
3484     ut_a(btr_page_get_prev(page, mtr) == FIL_NULL);
3485     ut_a(btr_page_get_next(page, mtr) == FIL_NULL);
3486 
3487     ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3488     btr_search_drop_page_hash_index(block);
3489 
3490     if (dict_index_is_spatial(index)) {
3491       /* Check any concurrent search having this page */
3492       rtr_check_discard_page(index, nullptr, block);
3493       rtr_page_get_father(index, block, mtr, nullptr, &cursor);
3494     } else {
3495       btr_page_get_father(index, block, mtr, &cursor);
3496     }
3497     father = btr_cur_get_block(&cursor);
3498 
3499     if (!dict_table_is_locking_disabled(index->table)) {
3500       lock_update_discard(father, PAGE_HEAP_NO_SUPREMUM, block);
3501     }
3502 
3503     /* Free the file page */
3504     btr_page_free(index, block, mtr);
3505 
3506     block = father;
3507     page_level++;
3508   }
3509 
3510   /* block is the root page, which must be empty, except
3511   for the node pointer to the (now discarded) block(s). */
3512 
3513 #ifdef UNIV_BTR_DEBUG
3514   if (!dict_index_is_ibuf(index)) {
3515     const page_t *root = buf_block_get_frame(block);
3516     const space_id_t space_id = dict_index_get_space(index);
3517     ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_LEAF + root,
3518                                 space_id));
3519     ut_a(btr_root_fseg_validate(FIL_PAGE_DATA + PAGE_BTR_SEG_TOP + root,
3520                                 space_id));
3521   }
3522 #endif /* UNIV_BTR_DEBUG */
3523 
3524   btr_page_empty(block, buf_block_get_page_zip(block), index, 0, mtr);
3525   ut_ad(page_is_leaf(buf_block_get_frame(block)));
3526 
3527   if (!index->is_clustered() && !index->table->is_temporary()) {
3528     /* We play it safe and reset the free bits for the root */
3529     ibuf_reset_free_bits(block);
3530 
3531     ut_a(max_trx_id);
3532     page_set_max_trx_id(block, buf_block_get_page_zip(block), max_trx_id, mtr);
3533   }
3534 }
3535 
3536 /** Discards a page from a B-tree. This is used to remove the last record from
3537  a B-tree page: the whole page must be removed at the same time. This cannot
3538  be used for the root page, which is allowed to be empty. */
btr_discard_page(btr_cur_t * cursor,mtr_t * mtr)3539 void btr_discard_page(btr_cur_t *cursor, /*!< in: cursor on the page to discard:
3540                                          not on the root page */
3541                       mtr_t *mtr)        /*!< in: mtr */
3542 {
3543   dict_index_t *index;
3544   page_no_t left_page_no;
3545   page_no_t right_page_no;
3546   buf_block_t *merge_block;
3547   page_t *merge_page;
3548   buf_block_t *block;
3549   page_t *page;
3550   rec_t *node_ptr;
3551 #ifdef UNIV_DEBUG
3552   btr_cur_t parent_cursor;
3553   bool parent_is_different = false;
3554 #endif
3555 
3556   block = btr_cur_get_block(cursor);
3557   index = btr_cur_get_index(cursor);
3558 
3559   ut_ad(dict_index_get_page(index) != block->page.id.page_no());
3560 
3561   ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
3562                                   MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
3563         index->table->is_intrinsic());
3564 
3565   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3566 
3567   const space_id_t space = dict_index_get_space(index);
3568 
3569   MONITOR_INC(MONITOR_INDEX_DISCARD);
3570 
3571 #ifdef UNIV_DEBUG
3572   if (dict_index_is_spatial(index)) {
3573     rtr_page_get_father(index, block, mtr, cursor, &parent_cursor);
3574   } else {
3575     btr_page_get_father(index, block, mtr, &parent_cursor);
3576   }
3577 #endif
3578 
3579   /* Decide the page which will inherit the locks */
3580 
3581   left_page_no = btr_page_get_prev(buf_block_get_frame(block), mtr);
3582   right_page_no = btr_page_get_next(buf_block_get_frame(block), mtr);
3583 
3584   const page_size_t page_size(dict_table_page_size(index->table));
3585 
3586   if (left_page_no != FIL_NULL) {
3587     merge_block = btr_block_get(page_id_t(space, left_page_no), page_size,
3588                                 RW_X_LATCH, index, mtr);
3589 
3590     merge_page = buf_block_get_frame(merge_block);
3591 #ifdef UNIV_BTR_DEBUG
3592     ut_a(btr_page_get_next(merge_page, mtr) == block->page.id.page_no());
3593 #endif /* UNIV_BTR_DEBUG */
3594     ut_d(parent_is_different =
3595              (page_rec_get_next(page_get_infimum_rec(btr_cur_get_page(
3596                   &parent_cursor))) == btr_cur_get_rec(&parent_cursor)));
3597   } else if (right_page_no != FIL_NULL) {
3598     merge_block = btr_block_get(page_id_t(space, right_page_no), page_size,
3599                                 RW_X_LATCH, index, mtr);
3600 
3601     merge_page = buf_block_get_frame(merge_block);
3602 #ifdef UNIV_BTR_DEBUG
3603     ut_a(btr_page_get_prev(merge_page, mtr) == block->page.id.page_no());
3604 #endif /* UNIV_BTR_DEBUG */
3605     ut_d(parent_is_different = page_rec_is_supremum(
3606              page_rec_get_next(btr_cur_get_rec(&parent_cursor))));
3607   } else {
3608     btr_discard_only_page_on_level(index, block, mtr);
3609 
3610     return;
3611   }
3612 
3613   page = buf_block_get_frame(block);
3614   ut_a(page_is_comp(merge_page) == page_is_comp(page));
3615   btr_search_drop_page_hash_index(block);
3616 
3617   if (left_page_no == FIL_NULL && !page_is_leaf(page)) {
3618     /* We have to mark the leftmost node pointer on the right
3619     side page as the predefined minimum record */
3620     node_ptr = page_rec_get_next(page_get_infimum_rec(merge_page));
3621 
3622     ut_ad(page_rec_is_user_rec(node_ptr));
3623 
3624     /* This will make page_zip_validate() fail on merge_page
3625     until btr_level_list_remove() completes.  This is harmless,
3626     because everything will take place within a single
3627     mini-transaction and because writing to the redo log
3628     is an atomic operation (performed by mtr_commit()). */
3629     btr_set_min_rec_mark(node_ptr, mtr);
3630   }
3631 
3632   if (dict_index_is_spatial(index)) {
3633     btr_cur_t father_cursor;
3634 
3635     /* Since rtr_node_ptr_delete doesn't contain get father
3636     node ptr, so, we need to get father node ptr first and then
3637     delete it. */
3638     rtr_page_get_father(index, block, mtr, cursor, &father_cursor);
3639     rtr_node_ptr_delete(index, &father_cursor, block, mtr);
3640   } else {
3641     btr_node_ptr_delete(index, block, mtr);
3642   }
3643 
3644   /* Remove the page from the level list */
3645   btr_level_list_remove(space, page_size, page, index, mtr);
3646 #ifdef UNIV_ZIP_DEBUG
3647   {
3648     page_zip_des_t *merge_page_zip = buf_block_get_page_zip(merge_block);
3649     ut_a(!merge_page_zip ||
3650          page_zip_validate(merge_page_zip, merge_page, index));
3651   }
3652 #endif /* UNIV_ZIP_DEBUG */
3653 
3654   if (!dict_table_is_locking_disabled(index->table)) {
3655     if (left_page_no != FIL_NULL) {
3656       lock_update_discard(merge_block, PAGE_HEAP_NO_SUPREMUM, block);
3657     } else {
3658       lock_update_discard(merge_block, lock_get_min_heap_no(merge_block),
3659                           block);
3660     }
3661   }
3662 
3663   if (dict_index_is_spatial(index)) {
3664     rtr_check_discard_page(index, cursor, block);
3665   }
3666 
3667   /* Free the file page */
3668   btr_page_free(index, block, mtr);
3669 
3670   /* btr_check_node_ptr() needs parent block latched.
3671   If the merge_block's parent block is not same,
3672   we cannot use btr_check_node_ptr() */
3673   ut_ad(parent_is_different || btr_check_node_ptr(index, merge_block, mtr));
3674 }
3675 
3676 #ifdef UNIV_BTR_PRINT
3677 /** Prints size info of a B-tree. */
btr_print_size(dict_index_t * index)3678 void btr_print_size(dict_index_t *index) /*!< in: index tree */
3679 {
3680   page_t *root;
3681   fseg_header_t *seg;
3682   mtr_t mtr;
3683 
3684   if (dict_index_is_ibuf(index)) {
3685     fputs(
3686         "Sorry, cannot print info of an ibuf tree:"
3687         " use ibuf functions\n",
3688         stderr);
3689 
3690     return;
3691   }
3692 
3693   mtr_start(&mtr);
3694 
3695   root = btr_root_get(index, &mtr);
3696 
3697   seg = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
3698 
3699   fputs("INFO OF THE NON-LEAF PAGE SEGMENT\n", stderr);
3700   fseg_print(seg, &mtr);
3701 
3702   if (!dict_index_is_ibuf(index)) {
3703     seg = root + PAGE_HEADER + PAGE_BTR_SEG_LEAF;
3704 
3705     fputs("INFO OF THE LEAF PAGE SEGMENT\n", stderr);
3706     fseg_print(seg, &mtr);
3707   }
3708 
3709   mtr_commit(&mtr);
3710 }
3711 
3712 /** Prints recursively index tree pages. */
btr_print_recursive(dict_index_t * index,buf_block_t * block,ulint width,mem_heap_t ** heap,ulint ** offsets,mtr_t * mtr)3713 static void btr_print_recursive(
3714     dict_index_t *index, /*!< in: index tree */
3715     buf_block_t *block,  /*!< in: index page */
3716     ulint width,         /*!< in: print this many entries from start
3717                          and end */
3718     mem_heap_t **heap,   /*!< in/out: heap for rec_get_offsets() */
3719     ulint **offsets,     /*!< in/out: buffer for rec_get_offsets() */
3720     mtr_t *mtr)          /*!< in: mtr */
3721 {
3722   const page_t *page = buf_block_get_frame(block);
3723   page_cur_t cursor;
3724   ulint n_recs;
3725   ulint i = 0;
3726   mtr_t mtr2;
3727 
3728   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_SX_FIX, index->table));
3729 
3730   ib::info(ER_IB_MSG_34) << "NODE ON LEVEL " << btr_page_get_level(page, mtr)
3731                          << " page " << block->page.id;
3732 
3733   page_print(block, index, width, width);
3734 
3735   n_recs = page_get_n_recs(page);
3736 
3737   page_cur_set_before_first(block, &cursor);
3738   page_cur_move_to_next(&cursor);
3739 
3740   while (!page_cur_is_after_last(&cursor)) {
3741     if (page_is_leaf(page)) {
3742       /* If this is the leaf level, do nothing */
3743 
3744     } else if ((i <= width) || (i >= n_recs - width)) {
3745       const rec_t *node_ptr;
3746 
3747       mtr_start(&mtr2);
3748 
3749       node_ptr = page_cur_get_rec(&cursor);
3750 
3751       *offsets =
3752           rec_get_offsets(node_ptr, index, *offsets, ULINT_UNDEFINED, heap);
3753       btr_print_recursive(
3754           index, btr_node_ptr_get_child(node_ptr, index, *offsets, &mtr2),
3755           width, heap, offsets, &mtr2);
3756       mtr_commit(&mtr2);
3757     }
3758 
3759     page_cur_move_to_next(&cursor);
3760     i++;
3761   }
3762 }
3763 
3764 /** Prints directories and other info of all nodes in the tree. */
btr_print_index(dict_index_t * index,ulint width)3765 void btr_print_index(dict_index_t *index, /*!< in: index */
3766                      ulint width) /*!< in: print this many entries from start
3767                                   and end */
3768 {
3769   mtr_t mtr;
3770   buf_block_t *root;
3771   mem_heap_t *heap = NULL;
3772   ulint offsets_[REC_OFFS_NORMAL_SIZE];
3773   ulint *offsets = offsets_;
3774   rec_offs_init(offsets_);
3775 
3776   fputs(
3777       "--------------------------\n"
3778       "INDEX TREE PRINT\n",
3779       stderr);
3780 
3781   mtr_start(&mtr);
3782 
3783   root = btr_root_block_get(index, RW_SX_LATCH, &mtr);
3784 
3785   btr_print_recursive(index, root, width, &heap, &offsets, &mtr);
3786   if (heap) {
3787     mem_heap_free(heap);
3788   }
3789 
3790   mtr_commit(&mtr);
3791 
3792   ut_ad(btr_validate_index(index, 0, false));
3793 }
3794 #endif /* UNIV_BTR_PRINT */
3795 
3796 #ifdef UNIV_DEBUG
3797 /** Checks that the node pointer to a page is appropriate.
3798  @return true */
btr_check_node_ptr(dict_index_t * index,buf_block_t * block,mtr_t * mtr)3799 ibool btr_check_node_ptr(dict_index_t *index, /*!< in: index tree */
3800                          buf_block_t *block,  /*!< in: index page */
3801                          mtr_t *mtr)          /*!< in: mtr */
3802 {
3803   mem_heap_t *heap;
3804   dtuple_t *tuple;
3805   ulint *offsets;
3806   btr_cur_t cursor;
3807   page_t *page = buf_block_get_frame(block);
3808 
3809   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
3810 
3811   if (dict_index_get_page(index) == block->page.id.page_no()) {
3812     return (TRUE);
3813   }
3814 
3815   heap = mem_heap_create(256);
3816 
3817   if (dict_index_is_spatial(index)) {
3818     offsets = rtr_page_get_father_block(nullptr, heap, index, block, mtr,
3819                                         nullptr, &cursor);
3820   } else {
3821     offsets =
3822         btr_page_get_father_block(nullptr, heap, index, block, mtr, &cursor);
3823   }
3824 
3825   if (page_is_leaf(page)) {
3826     goto func_exit;
3827   }
3828 
3829   tuple = dict_index_build_node_ptr(
3830       index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
3831       btr_page_get_level(page, mtr));
3832 
3833   /* For spatial index, the MBR in the parent rec could be different
3834   with that of first rec of child, their relationship should be
3835   "WITHIN" relationship */
3836   if (dict_index_is_spatial(index)) {
3837     ut_a(!cmp_dtuple_rec_with_gis(tuple, btr_cur_get_rec(&cursor), offsets,
3838                                   PAGE_CUR_WITHIN, index->rtr_srs.get()));
3839   } else {
3840     ut_a(!cmp_dtuple_rec(tuple, btr_cur_get_rec(&cursor), index, offsets));
3841   }
3842 func_exit:
3843   mem_heap_free(heap);
3844 
3845   return (TRUE);
3846 }
3847 #endif /* UNIV_DEBUG */
3848 
3849 /** Display identification information for a record. */
btr_index_rec_validate_report(const page_t * page,const rec_t * rec,const dict_index_t * index)3850 static void btr_index_rec_validate_report(
3851     const page_t *page,        /*!< in: index page */
3852     const rec_t *rec,          /*!< in: index record */
3853     const dict_index_t *index) /*!< in: index */
3854 {
3855   ib::info(ER_IB_MSG_35) << "Record in index " << index->name << " of table "
3856                          << index->table->name << ", page "
3857                          << page_id_t(page_get_space_id(page),
3858                                       page_get_page_no(page))
3859                          << ", at offset " << page_offset(rec);
3860 }
3861 
3862 /** Checks the size and number of fields in a record based on the definition of
3863  the index.
3864  @return true if ok */
btr_index_rec_validate(const rec_t * rec,const dict_index_t * index,ibool dump_on_error)3865 ibool btr_index_rec_validate(const rec_t *rec,          /*!< in: index record */
3866                              const dict_index_t *index, /*!< in: index */
3867                              ibool dump_on_error) /*!< in: TRUE if the function
3868                                                   should print hex dump of
3869                                                   record and page on error */
3870 {
3871   ulint len;
3872   ulint n;
3873   ulint i;
3874   const page_t *page;
3875   mem_heap_t *heap = nullptr;
3876   ulint offsets_[REC_OFFS_NORMAL_SIZE];
3877   ulint *offsets = offsets_;
3878   rec_offs_init(offsets_);
3879 
3880   page = page_align(rec);
3881 
3882   if (dict_index_is_ibuf(index)) {
3883     /* The insert buffer index tree can contain records from any
3884     other index: we cannot check the number of fields or
3885     their length */
3886 
3887     return (TRUE);
3888   }
3889 
3890 #ifdef VIRTUAL_INDEX_DEBUG
3891   if (dict_index_has_virtual(index) || index->is_clustered()) {
3892     fprintf(stderr, "index name is %s\n", index->name());
3893   }
3894 #endif
3895   if ((ibool) !!page_is_comp(page) != dict_table_is_comp(index->table)) {
3896     btr_index_rec_validate_report(page, rec, index);
3897 
3898     ib::error(ER_IB_MSG_36)
3899         << "Compact flag=" << !!page_is_comp(page) << ", should be "
3900         << dict_table_is_comp(index->table);
3901 
3902     return (FALSE);
3903   }
3904 
3905   n = dict_index_get_n_fields(index);
3906 
3907   if (!page_is_comp(page) &&
3908       (rec_get_n_fields_old(rec, index) != n
3909        /* a record for older SYS_INDEXES table
3910        (missing merge_threshold column) is acceptable. */
3911        && !(index->id == DICT_INDEXES_ID &&
3912             rec_get_n_fields_old(rec, index) == n - 1))) {
3913     btr_index_rec_validate_report(page, rec, index);
3914 
3915     ib::error(ER_IB_MSG_37) << "Has " << rec_get_n_fields_old(rec, index)
3916                             << " fields, should have " << n;
3917 
3918     if (dump_on_error) {
3919       fputs("InnoDB: corrupt record ", stderr);
3920       rec_print_old(stderr, rec);
3921       putc('\n', stderr);
3922     }
3923     return (FALSE);
3924   }
3925 
3926   offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
3927 
3928   for (i = 0; i < n; i++) {
3929     dict_field_t *field = index->get_field(i);
3930     const dict_col_t *col = field->col;
3931     ulint fixed_size = col->get_fixed_size(page_is_comp(page));
3932 
3933     rec_get_nth_field_offs(offsets, i, &len);
3934 
3935     /* Note that if fixed_size != 0, it equals the
3936     length of a fixed-size column in the clustered index,
3937     except the DATA_POINT, whose length would be MBR_LEN
3938     when it's indexed in a R-TREE. We should adjust it here.
3939     A prefix index of the column is of fixed, but different
3940     length.  When fixed_size == 0, prefix_len is the maximum
3941     length of the prefix index column. */
3942 
3943     if (col->mtype == DATA_POINT) {
3944       ut_ad(fixed_size == DATA_POINT_LEN);
3945       if (dict_index_is_spatial(index)) {
3946         /* For DATA_POINT data, when it has R-tree
3947         index, the fixed_len is the MBR of the point.
3948         But if it's a primary key and on R-TREE
3949         as the PK pointer, the length shall be
3950         DATA_POINT_LEN as well. */
3951         ut_ad((field->fixed_len == DATA_MBR_LEN && i == 0) ||
3952               (field->fixed_len == DATA_POINT_LEN && i != 0));
3953         fixed_size = field->fixed_len;
3954       }
3955     }
3956 
3957     if ((field->prefix_len == 0 && rec_field_not_null_not_add_col_def(len) &&
3958          fixed_size && len != fixed_size) ||
3959         (field->prefix_len > 0 && rec_field_not_null_not_add_col_def(len) &&
3960          len > field->prefix_len)) {
3961       btr_index_rec_validate_report(page, rec, index);
3962 
3963       ib::error error(ER_IB_MSG_1224);
3964 
3965       error << "Field " << i << " len is " << len << ", should be "
3966             << fixed_size;
3967 
3968       if (dump_on_error) {
3969         error << "; ";
3970         rec_print(error.m_oss, rec,
3971                   rec_get_info_bits(rec, rec_offs_comp(offsets)), offsets);
3972       }
3973       if (heap) {
3974         mem_heap_free(heap);
3975       }
3976       return (FALSE);
3977     }
3978   }
3979 
3980 #ifdef VIRTUAL_INDEX_DEBUG
3981   if (dict_index_has_virtual(index) || index->is_clustered()) {
3982     rec_print_new(stderr, rec, offsets);
3983   }
3984 #endif
3985 
3986   if (heap) {
3987     mem_heap_free(heap);
3988   }
3989   return (TRUE);
3990 }
3991 
3992 /** Checks the size and number of fields in records based on the definition of
3993  the index.
3994  @return true if ok */
btr_index_page_validate(buf_block_t * block,dict_index_t * index)3995 static ibool btr_index_page_validate(buf_block_t *block,  /*!< in: index page */
3996                                      dict_index_t *index) /*!< in: index */
3997 {
3998   page_cur_t cur;
3999   ibool ret = TRUE;
4000 #ifdef UNIV_DEBUG
4001   ulint nth = 1;
4002 #endif /* UNIV_DEBUG */
4003 
4004   page_cur_set_before_first(block, &cur);
4005 
4006   /* Directory slot 0 should only contain the infimum record. */
4007   DBUG_EXECUTE_IF(
4008       "check_table_rec_next",
4009       ut_a(page_rec_get_nth_const(page_cur_get_page(&cur), 0) == cur.rec);
4010       ut_a(page_dir_slot_get_n_owned(
4011                page_dir_get_nth_slot(page_cur_get_page(&cur), 0)) == 1););
4012 
4013   page_cur_move_to_next(&cur);
4014 
4015   for (;;) {
4016     if (page_cur_is_after_last(&cur)) {
4017       break;
4018     }
4019 
4020     if (!btr_index_rec_validate(cur.rec, index, TRUE)) {
4021       return (FALSE);
4022     }
4023 
4024     /* Verify that page_rec_get_nth_const() is correctly
4025     retrieving each record. */
4026     DBUG_EXECUTE_IF("check_table_rec_next",
4027                     ut_a(cur.rec == page_rec_get_nth_const(
4028                                         page_cur_get_page(&cur),
4029                                         page_rec_get_n_recs_before(cur.rec)));
4030                     ut_a(nth++ == page_rec_get_n_recs_before(cur.rec)););
4031 
4032     page_cur_move_to_next(&cur);
4033   }
4034 
4035   return (ret);
4036 }
4037 
4038 /** Report an error on one page of an index tree. */
btr_validate_report1(dict_index_t * index,ulint level,const buf_block_t * block)4039 static void btr_validate_report1(
4040     dict_index_t *index,      /*!< in: index */
4041     ulint level,              /*!< in: B-tree level */
4042     const buf_block_t *block) /*!< in: index page */
4043 {
4044   ib::error error(ER_IB_MSG_1225);
4045   error << "In page " << block->page.id.page_no() << " of index " << index->name
4046         << " of table " << index->table->name;
4047 
4048   if (level > 0) {
4049     error << ", index tree level " << level;
4050   }
4051 }
4052 
4053 /** Report an error on two pages of an index tree. */
btr_validate_report2(const dict_index_t * index,ulint level,const buf_block_t * block1,const buf_block_t * block2)4054 static void btr_validate_report2(
4055     const dict_index_t *index, /*!< in: index */
4056     ulint level,               /*!< in: B-tree level */
4057     const buf_block_t *block1, /*!< in: first index page */
4058     const buf_block_t *block2) /*!< in: second index page */
4059 {
4060   ib::error error(ER_IB_MSG_1226);
4061   error << "In pages " << block1->page.id << " and " << block2->page.id
4062         << " of index " << index->name << " of table " << index->table->name;
4063 
4064   if (level > 0) {
4065     error << ", index tree level " << level;
4066   }
4067 }
4068 
4069 /** Validates index tree level.
4070  @return true if ok */
btr_validate_level(dict_index_t * index,const trx_t * trx,ulint level,bool lockout)4071 static bool btr_validate_level(
4072     dict_index_t *index, /*!< in: index tree */
4073     const trx_t *trx,    /*!< in: transaction or NULL */
4074     ulint level,         /*!< in: level number */
4075     bool lockout)        /*!< in: true if X-latch index is intended */
4076 {
4077   buf_block_t *block;
4078   page_t *page;
4079   buf_block_t *right_block = nullptr; /* remove warning */
4080   page_t *right_page = nullptr;       /* remove warning */
4081   page_t *father_page;
4082   btr_cur_t node_cur;
4083   btr_cur_t right_node_cur;
4084   rec_t *rec;
4085   page_no_t right_page_no;
4086   page_no_t left_page_no;
4087   page_cur_t cursor;
4088   dtuple_t *node_ptr_tuple;
4089   bool ret = true;
4090   mtr_t mtr;
4091   mem_heap_t *heap = mem_heap_create(256);
4092   fseg_header_t *seg;
4093   ulint *offsets = nullptr;
4094   ulint *offsets2 = nullptr;
4095 #ifdef UNIV_ZIP_DEBUG
4096   page_zip_des_t *page_zip;
4097 #endif /* UNIV_ZIP_DEBUG */
4098   ulint savepoint = 0;
4099   ulint savepoint2 = 0;
4100   page_no_t parent_page_no = FIL_NULL;
4101   page_no_t parent_right_page_no = FIL_NULL;
4102   bool rightmost_child = false;
4103 
4104   mtr_start(&mtr);
4105 
4106   if (!srv_read_only_mode) {
4107     if (lockout) {
4108       mtr_x_lock(dict_index_get_lock(index), &mtr);
4109     } else {
4110       mtr_sx_lock(dict_index_get_lock(index), &mtr);
4111     }
4112   }
4113 
4114   block = btr_root_block_get(index, RW_SX_LATCH, &mtr);
4115   page = buf_block_get_frame(block);
4116   seg = page + PAGE_HEADER + PAGE_BTR_SEG_TOP;
4117 
4118 #ifdef UNIV_RTR_DEBUG
4119   if (dict_index_is_spatial(index)) {
4120     fprintf(stderr, "Root page no: %lu\n", (ulong)page_get_page_no(page));
4121   }
4122 #endif
4123 
4124   const fil_space_t *space = fil_space_get(index->space);
4125   const page_size_t table_page_size(dict_table_page_size(index->table));
4126   const page_size_t space_page_size(space->flags);
4127 
4128   if (!table_page_size.equals_to(space_page_size)) {
4129     ib::warn(ER_IB_MSG_38) << "Flags mismatch: table=" << index->table->flags
4130                            << ", tablespace=" << space->flags;
4131 
4132     mtr_commit(&mtr);
4133 
4134     return (false);
4135   }
4136 
4137   while (level != btr_page_get_level(page, &mtr)) {
4138     const rec_t *node_ptr;
4139 
4140     if (fseg_page_is_free(seg, block->page.id.space(),
4141                           block->page.id.page_no())) {
4142       btr_validate_report1(index, level, block);
4143 
4144       ib::warn(ER_IB_MSG_39) << "Page is free";
4145 
4146       ret = false;
4147     }
4148 
4149     ut_a(index->space == block->page.id.space());
4150     ut_a(index->space == page_get_space_id(page));
4151 #ifdef UNIV_ZIP_DEBUG
4152     page_zip = buf_block_get_page_zip(block);
4153     ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4154 #endif /* UNIV_ZIP_DEBUG */
4155     ut_a(!page_is_leaf(page));
4156 
4157     page_cur_set_before_first(block, &cursor);
4158     page_cur_move_to_next(&cursor);
4159 
4160     node_ptr = page_cur_get_rec(&cursor);
4161     offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
4162 
4163     savepoint2 = mtr_set_savepoint(&mtr);
4164     block = btr_node_ptr_get_child(node_ptr, index, offsets, &mtr);
4165     page = buf_block_get_frame(block);
4166 
4167     /* For R-Tree, since record order might not be the same as
4168     linked index page in the lower level, we need to travers
4169     backwards to get the first page rec in this level.
4170     This is only used for index validation. Spatial index
4171     does not use such scan for any of its DML or query
4172     operations  */
4173     if (dict_index_is_spatial(index)) {
4174       left_page_no = btr_page_get_prev(page, &mtr);
4175 
4176       while (left_page_no != FIL_NULL) {
4177         page_id_t left_page_id(index->space, left_page_no);
4178         /* To obey latch order of tree blocks,
4179         we should release the right_block once to
4180         obtain lock of the uncle block. */
4181         mtr_release_block_at_savepoint(&mtr, savepoint2, block);
4182 
4183         savepoint2 = mtr_set_savepoint(&mtr);
4184         block = btr_block_get(left_page_id, table_page_size, RW_SX_LATCH, index,
4185                               &mtr);
4186         page = buf_block_get_frame(block);
4187         left_page_no = btr_page_get_prev(page, &mtr);
4188       }
4189     }
4190   }
4191 
4192   /* Now we are on the desired level. Loop through the pages on that
4193   level. */
4194 
4195   if (level == 0) {
4196     /* Leaf pages are managed in their own file segment. */
4197     seg -= PAGE_BTR_SEG_TOP - PAGE_BTR_SEG_LEAF;
4198   }
4199 
4200 loop:
4201   mem_heap_empty(heap);
4202   offsets = offsets2 = nullptr;
4203   if (!srv_read_only_mode) {
4204     if (lockout) {
4205       mtr_x_lock(dict_index_get_lock(index), &mtr);
4206     } else {
4207       mtr_sx_lock(dict_index_get_lock(index), &mtr);
4208     }
4209   }
4210 
4211 #ifdef UNIV_ZIP_DEBUG
4212   page_zip = buf_block_get_page_zip(block);
4213   ut_a(!page_zip || page_zip_validate(page_zip, page, index));
4214 #endif /* UNIV_ZIP_DEBUG */
4215 
4216   ut_a(block->page.id.space() == index->space);
4217 
4218   if (fseg_page_is_free(seg, block->page.id.space(),
4219                         block->page.id.page_no())) {
4220     btr_validate_report1(index, level, block);
4221 
4222     ib::warn(ER_IB_MSG_40) << "Page is marked as free";
4223     ret = false;
4224 
4225   } else if (btr_page_get_index_id(page) != index->id) {
4226     ib::error(ER_IB_MSG_41) << "Page index id " << btr_page_get_index_id(page)
4227                             << " != data dictionary index id " << index->id;
4228 
4229     ret = false;
4230 
4231   } else if (!page_validate(page, index)) {
4232     btr_validate_report1(index, level, block);
4233     ret = false;
4234 
4235   } else if (level == 0 && !btr_index_page_validate(block, index)) {
4236     /* We are on level 0. Check that the records have the right
4237     number of fields, and field lengths are right. */
4238 
4239     ret = false;
4240   }
4241 
4242   ut_a(btr_page_get_level(page, &mtr) == level);
4243 
4244   right_page_no = btr_page_get_next(page, &mtr);
4245   left_page_no = btr_page_get_prev(page, &mtr);
4246 
4247   ut_a(!page_is_empty(page) ||
4248        (level == 0 && page_get_page_no(page) == dict_index_get_page(index)));
4249 
4250   if (right_page_no != FIL_NULL) {
4251     const rec_t *right_rec;
4252     savepoint = mtr_set_savepoint(&mtr);
4253 
4254     right_block = btr_block_get(page_id_t(index->space, right_page_no),
4255                                 table_page_size, RW_SX_LATCH, index, &mtr);
4256 
4257     right_page = buf_block_get_frame(right_block);
4258 
4259     if (btr_page_get_prev(right_page, &mtr) != page_get_page_no(page)) {
4260       btr_validate_report2(index, level, block, right_block);
4261       fputs(
4262           "InnoDB: broken FIL_PAGE_NEXT"
4263           " or FIL_PAGE_PREV links\n",
4264           stderr);
4265 
4266       ret = false;
4267     }
4268 
4269     if (page_is_comp(right_page) != page_is_comp(page)) {
4270       btr_validate_report2(index, level, block, right_block);
4271       fputs("InnoDB: 'compact' flag mismatch\n", stderr);
4272 
4273       ret = false;
4274 
4275       goto node_ptr_fails;
4276     }
4277 
4278     rec = page_rec_get_prev(page_get_supremum_rec(page));
4279     right_rec = page_rec_get_next(page_get_infimum_rec(right_page));
4280     offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
4281     offsets2 =
4282         rec_get_offsets(right_rec, index, offsets2, ULINT_UNDEFINED, &heap);
4283 
4284     /* For spatial index, we cannot guarantee the key ordering
4285     across pages, so skip the record compare verification for
4286     now. Will enhanced in special R-Tree index validation scheme */
4287     if (!dict_index_is_spatial(index) &&
4288         cmp_rec_rec(rec, right_rec, offsets, offsets2, index, false) >= 0) {
4289       btr_validate_report2(index, level, block, right_block);
4290 
4291       fputs(
4292           "InnoDB: records in wrong order"
4293           " on adjacent pages\n",
4294           stderr);
4295 
4296       fputs("InnoDB: record ", stderr);
4297       rec = page_rec_get_prev(page_get_supremum_rec(page));
4298       rec_print(stderr, rec, index);
4299       putc('\n', stderr);
4300       fputs("InnoDB: record ", stderr);
4301       rec = page_rec_get_next(page_get_infimum_rec(right_page));
4302       rec_print(stderr, rec, index);
4303       putc('\n', stderr);
4304 
4305       ret = false;
4306     }
4307   }
4308 
4309   if (level > 0 && left_page_no == FIL_NULL) {
4310     ut_a(REC_INFO_MIN_REC_FLAG &
4311          rec_get_info_bits(page_rec_get_next(page_get_infimum_rec(page)),
4312                            page_is_comp(page)));
4313   }
4314 
4315   /* Similarly skip the father node check for spatial index for now,
4316   for a couple of reasons:
4317   1) As mentioned, there is no ordering relationship between records
4318   in parent level and linked pages in the child level.
4319   2) Search parent from root is very costly for R-tree.
4320   We will add special validation mechanism for R-tree later (WL #7520) */
4321   if (!dict_index_is_spatial(index) &&
4322       block->page.id.page_no() != dict_index_get_page(index)) {
4323     /* Check father node pointers */
4324     rec_t *node_ptr;
4325 
4326     btr_cur_position(index, page_rec_get_next(page_get_infimum_rec(page)),
4327                      block, &node_cur);
4328     offsets = btr_page_get_father_node_ptr_for_validate(offsets, heap,
4329                                                         &node_cur, &mtr);
4330 
4331     father_page = btr_cur_get_page(&node_cur);
4332     node_ptr = btr_cur_get_rec(&node_cur);
4333 
4334     parent_page_no = page_get_page_no(father_page);
4335     parent_right_page_no = btr_page_get_next(father_page, &mtr);
4336     rightmost_child = page_rec_is_supremum(page_rec_get_next(node_ptr));
4337 
4338     btr_cur_position(index, page_rec_get_prev(page_get_supremum_rec(page)),
4339                      block, &node_cur);
4340 
4341     offsets = btr_page_get_father_node_ptr_for_validate(offsets, heap,
4342                                                         &node_cur, &mtr);
4343 
4344     if (node_ptr != btr_cur_get_rec(&node_cur) ||
4345         btr_node_ptr_get_child_page_no(node_ptr, offsets) !=
4346             block->page.id.page_no()) {
4347       btr_validate_report1(index, level, block);
4348 
4349       fputs("InnoDB: node pointer to the page is wrong\n", stderr);
4350 
4351       fputs("InnoDB: node ptr ", stderr);
4352       rec_print(stderr, node_ptr, index);
4353 
4354       rec = btr_cur_get_rec(&node_cur);
4355       fprintf(stderr,
4356               "\n"
4357               "InnoDB: node ptr child page n:o %lu\n",
4358               (ulong)btr_node_ptr_get_child_page_no(rec, offsets));
4359 
4360       fputs("InnoDB: record on page ", stderr);
4361       rec_print_new(stderr, rec, offsets);
4362       putc('\n', stderr);
4363       ret = false;
4364 
4365       goto node_ptr_fails;
4366     }
4367 
4368     if (!page_is_leaf(page)) {
4369       node_ptr_tuple = dict_index_build_node_ptr(
4370           index, page_rec_get_next(page_get_infimum_rec(page)), 0, heap,
4371           btr_page_get_level(page, &mtr));
4372 
4373       if (cmp_dtuple_rec(node_ptr_tuple, node_ptr, index, offsets)) {
4374         const rec_t *first_rec = page_rec_get_next(page_get_infimum_rec(page));
4375 
4376         btr_validate_report1(index, level, block);
4377 
4378         ib::error(ER_IB_MSG_42) << "Node ptrs differ on levels > 0";
4379 
4380         fputs("InnoDB: node ptr ", stderr);
4381         rec_print_new(stderr, node_ptr, offsets);
4382         fputs("InnoDB: first rec ", stderr);
4383         rec_print(stderr, first_rec, index);
4384         putc('\n', stderr);
4385         ret = false;
4386 
4387         goto node_ptr_fails;
4388       }
4389     }
4390 
4391     if (left_page_no == FIL_NULL) {
4392       ut_a(node_ptr == page_rec_get_next(page_get_infimum_rec(father_page)));
4393       ut_a(btr_page_get_prev(father_page, &mtr) == FIL_NULL);
4394     }
4395 
4396     if (right_page_no == FIL_NULL) {
4397       ut_a(node_ptr == page_rec_get_prev(page_get_supremum_rec(father_page)));
4398       ut_a(btr_page_get_next(father_page, &mtr) == FIL_NULL);
4399     } else {
4400       const rec_t *right_node_ptr;
4401 
4402       right_node_ptr = page_rec_get_next(node_ptr);
4403 
4404       if (!lockout && rightmost_child) {
4405         /* To obey latch order of tree blocks,
4406         we should release the right_block once to
4407         obtain lock of the uncle block. */
4408         mtr_release_block_at_savepoint(&mtr, savepoint, right_block);
4409 
4410         btr_block_get(page_id_t(index->space, parent_right_page_no),
4411                       table_page_size, RW_SX_LATCH, index, &mtr);
4412 
4413         right_block = btr_block_get(page_id_t(index->space, right_page_no),
4414                                     table_page_size, RW_SX_LATCH, index, &mtr);
4415       }
4416 
4417       btr_cur_position(index,
4418                        page_rec_get_next(page_get_infimum_rec(
4419                            buf_block_get_frame(right_block))),
4420                        right_block, &right_node_cur);
4421 
4422       offsets = btr_page_get_father_node_ptr_for_validate(
4423           offsets, heap, &right_node_cur, &mtr);
4424 
4425       if (right_node_ptr != page_get_supremum_rec(father_page)) {
4426         if (btr_cur_get_rec(&right_node_cur) != right_node_ptr) {
4427           ret = false;
4428           fputs(
4429               "InnoDB: node pointer to"
4430               " the right page is wrong\n",
4431               stderr);
4432 
4433           btr_validate_report1(index, level, block);
4434         }
4435       } else {
4436         page_t *right_father_page = btr_cur_get_page(&right_node_cur);
4437 
4438         if (btr_cur_get_rec(&right_node_cur) !=
4439             page_rec_get_next(page_get_infimum_rec(right_father_page))) {
4440           ret = false;
4441           fputs(
4442               "InnoDB: node pointer 2 to"
4443               " the right page is wrong\n",
4444               stderr);
4445 
4446           btr_validate_report1(index, level, block);
4447         }
4448 
4449         if (page_get_page_no(right_father_page) !=
4450             btr_page_get_next(father_page, &mtr)) {
4451           ret = false;
4452           fputs(
4453               "InnoDB: node pointer 3 to"
4454               " the right page is wrong\n",
4455               stderr);
4456 
4457           btr_validate_report1(index, level, block);
4458         }
4459       }
4460     }
4461   }
4462 
4463 node_ptr_fails:
4464   /* Commit the mini-transaction to release the latch on 'page'.
4465   Re-acquire the latch on right_page, which will become 'page'
4466   on the next loop.  The page has already been checked. */
4467   mtr_commit(&mtr);
4468 
4469   if (trx_is_interrupted(trx)) {
4470     /* On interrupt, return the current status. */
4471   } else if (right_page_no != FIL_NULL) {
4472     mtr_start(&mtr);
4473 
4474     if (!lockout) {
4475       if (rightmost_child) {
4476         if (parent_right_page_no != FIL_NULL) {
4477           btr_block_get(page_id_t(index->space, parent_right_page_no),
4478                         table_page_size, RW_SX_LATCH, index, &mtr);
4479         }
4480       } else if (parent_page_no != FIL_NULL) {
4481         btr_block_get(page_id_t(index->space, parent_page_no), table_page_size,
4482                       RW_SX_LATCH, index, &mtr);
4483       }
4484     }
4485 
4486     block = btr_block_get(page_id_t(index->space, right_page_no),
4487                           table_page_size, RW_SX_LATCH, index, &mtr);
4488 
4489     page = buf_block_get_frame(block);
4490 
4491     goto loop;
4492   }
4493 
4494   mem_heap_free(heap);
4495 
4496   return (ret);
4497 }
4498 
4499 /** Do an index level validation of spaital index tree.
4500  @return	true if no error found */
btr_validate_spatial_index(dict_index_t * index,const trx_t * trx)4501 static bool btr_validate_spatial_index(
4502     dict_index_t *index, /*!< in: index */
4503     const trx_t *trx)    /*!< in: transaction or NULL */
4504 {
4505   mtr_t mtr;
4506   bool ok = true;
4507 
4508   mtr_start(&mtr);
4509 
4510   mtr_x_lock(dict_index_get_lock(index), &mtr);
4511 
4512   page_t *root = btr_root_get(index, &mtr);
4513   ulint n = btr_page_get_level(root, &mtr);
4514 
4515 #ifdef UNIV_RTR_DEBUG
4516   fprintf(stderr, "R-tree level is %lu\n", n);
4517 #endif /* UNIV_RTR_DEBUG */
4518 
4519   for (ulint i = 0; i <= n; ++i) {
4520 #ifdef UNIV_RTR_DEBUG
4521     fprintf(stderr, "Level %lu:\n", n - i);
4522 #endif /* UNIV_RTR_DEBUG */
4523 
4524     if (!btr_validate_level(index, trx, n - i, true)) {
4525       ok = false;
4526       break;
4527     }
4528   }
4529 
4530   mtr_commit(&mtr);
4531 
4532   return (ok);
4533 }
4534 
4535 /** Checks the consistency of an index tree.
4536  @return true if ok */
btr_validate_index(dict_index_t * index,const trx_t * trx,bool lockout)4537 bool btr_validate_index(
4538     dict_index_t *index, /*!< in: index */
4539     const trx_t *trx,    /*!< in: transaction or NULL */
4540     bool lockout)        /*!< in: true if X-latch index is intended */
4541 {
4542   /* Full Text index are implemented by auxiliary tables,
4543   not the B-tree */
4544   if (dict_index_is_online_ddl(index) || (index->type & DICT_FTS)) {
4545     return (true);
4546   }
4547 
4548   if (dict_index_is_spatial(index)) {
4549     return (btr_validate_spatial_index(index, trx));
4550   }
4551 
4552   mtr_t mtr;
4553 
4554   mtr_start(&mtr);
4555 
4556   if (!srv_read_only_mode) {
4557     if (lockout) {
4558       mtr_x_lock(dict_index_get_lock(index), &mtr);
4559     } else {
4560       mtr_sx_lock(dict_index_get_lock(index), &mtr);
4561     }
4562   }
4563 
4564   bool ok = true;
4565   page_t *root = btr_root_get(index, &mtr);
4566   ulint n = btr_page_get_level(root, &mtr);
4567 
4568   for (ulint i = 0; i <= n; ++i) {
4569     if (!btr_validate_level(index, trx, n - i, lockout)) {
4570       ok = false;
4571       break;
4572     }
4573   }
4574 
4575   mtr_commit(&mtr);
4576 
4577   return (ok);
4578 }
4579 
4580 /** Checks if the page in the cursor can be merged with given page.
4581  If necessary, re-organize the merge_page.
4582  @return	true if possible to merge. */
btr_can_merge_with_page(btr_cur_t * cursor,page_no_t page_no,buf_block_t ** merge_block,mtr_t * mtr)4583 static bool btr_can_merge_with_page(
4584     btr_cur_t *cursor,         /*!< in: cursor on the page to merge */
4585     page_no_t page_no,         /*!< in: a sibling page */
4586     buf_block_t **merge_block, /*!< out: the merge block */
4587     mtr_t *mtr)                /*!< in: mini-transaction */
4588 {
4589   dict_index_t *index;
4590   page_t *page;
4591   ulint n_recs;
4592   ulint data_size;
4593   ulint max_ins_size_reorg;
4594   ulint max_ins_size;
4595   buf_block_t *mblock;
4596   page_t *mpage;
4597   DBUG_TRACE;
4598 
4599   if (page_no == FIL_NULL) {
4600     *merge_block = nullptr;
4601     return false;
4602   }
4603 
4604   index = btr_cur_get_index(cursor);
4605   page = btr_cur_get_page(cursor);
4606 
4607   const page_id_t page_id(dict_index_get_space(index), page_no);
4608   const page_size_t page_size(dict_table_page_size(index->table));
4609 
4610   mblock = btr_block_get(page_id, page_size, RW_X_LATCH, index, mtr);
4611   mpage = buf_block_get_frame(mblock);
4612 
4613   n_recs = page_get_n_recs(page);
4614   data_size = page_get_data_size(page);
4615 
4616   max_ins_size_reorg = page_get_max_insert_size_after_reorganize(mpage, n_recs);
4617 
4618   if (data_size > max_ins_size_reorg) {
4619     goto error;
4620   }
4621 
4622   /* If compression padding tells us that merging will result in
4623   too packed up page i.e.: which is likely to cause compression
4624   failure then don't merge the pages. */
4625   if (page_size.is_compressed() && page_is_leaf(mpage) &&
4626       (page_get_data_size(mpage) + data_size >=
4627        dict_index_zip_pad_optimal_page_size(index))) {
4628     goto error;
4629   }
4630 
4631   max_ins_size = page_get_max_insert_size(mpage, n_recs);
4632 
4633   if (data_size > max_ins_size) {
4634     /* We have to reorganize mpage */
4635 
4636     if (!btr_page_reorganize_block(false, page_zip_level, mblock, index, mtr)) {
4637       goto error;
4638     }
4639 
4640     max_ins_size = page_get_max_insert_size(mpage, n_recs);
4641 
4642     ut_ad(page_validate(mpage, index));
4643     ut_ad(max_ins_size == max_ins_size_reorg);
4644 
4645     if (data_size > max_ins_size) {
4646       /* Add fault tolerance, though this should
4647       never happen */
4648 
4649       goto error;
4650     }
4651   }
4652 
4653   *merge_block = mblock;
4654   return true;
4655 
4656 error:
4657   *merge_block = nullptr;
4658   return false;
4659 }
4660 
4661 /** Create an SDI Index
4662 @param[in]	space_id	tablespace id
4663 @param[in]	page_size	size of page
4664 @param[in,out]	mtr		mini transaction
4665 @param[in,out]	table		SDI table
4666 @return root page number of the SDI index created or FIL_NULL on failure */
btr_sdi_create(space_id_t space_id,const page_size_t & page_size,mtr_t * mtr,dict_table_t * table)4667 static page_no_t btr_sdi_create(space_id_t space_id,
4668                                 const page_size_t &page_size, mtr_t *mtr,
4669                                 dict_table_t *table) {
4670   dict_index_t *index = table->first_index();
4671   ut_ad(index != nullptr);
4672   ut_ad(UT_LIST_GET_LEN(table->indexes) == 1);
4673 
4674   index->page = btr_create(DICT_CLUSTERED | DICT_UNIQUE | DICT_SDI, space_id,
4675                            page_size, index->id, index, mtr);
4676 
4677   return (index->page);
4678 }
4679 
4680 /** Creates SDI index and stores the root page number in page 1 & 2
4681 @param[in]	space_id	tablespace id
4682 @param[in]	dict_locked	true if dict_sys mutex is acquired
4683 @return DB_SUCCESS on success, else DB_ERROR on failure */
btr_sdi_create_index(space_id_t space_id,bool dict_locked)4684 dberr_t btr_sdi_create_index(space_id_t space_id, bool dict_locked) {
4685   fil_space_t *space = fil_space_acquire(space_id);
4686   if (space == nullptr) {
4687     ut_ad(0);
4688     return (DB_ERROR);
4689   }
4690 
4691   dict_table_t *sdi_table;
4692   page_no_t sdi_root_page_num;
4693 
4694   sdi_table = dict_sdi_get_table(space_id, dict_locked, true);
4695   ut_ad(sdi_table != nullptr);
4696 
4697   mtr_t mtr;
4698   mtr.start();
4699 
4700   const page_size_t page_size = page_size_t(space->flags);
4701 
4702   /* Create B-Tree root page for SDI Indexes */
4703 
4704   sdi_root_page_num = btr_sdi_create(space_id, page_size, &mtr, sdi_table);
4705 
4706   if (sdi_root_page_num == FIL_NULL) {
4707     ib::error(ER_IB_MSG_43) << "Unable to create root index page"
4708                                " for SDI table "
4709                             << " in tablespace " << space_id;
4710     mtr.commit();
4711     dict_sdi_remove_from_cache(space_id, sdi_table, dict_locked);
4712     fil_space_release(space);
4713     return (DB_ERROR);
4714   } else {
4715     dict_index_t *index = sdi_table->first_index();
4716     index->page = sdi_root_page_num;
4717   }
4718 
4719   buf_block_t *block =
4720       buf_page_get(page_id_t(space_id, 0), page_size, RW_SX_LATCH, &mtr);
4721 
4722   buf_block_dbg_add_level(block, SYNC_FSP_PAGE);
4723 
4724   page_t *page = buf_block_get_frame(block);
4725 
4726   /* Write SDI Index root page numbers to Page 0 */
4727   fsp_sdi_write_root_to_page(page, page_size, sdi_root_page_num, &mtr);
4728 
4729   /* Space flags from memory */
4730   uint32_t fsp_flags = space->flags;
4731 
4732   ut_ad(mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_FLAGS) ==
4733         fsp_flags);
4734 
4735   fsp_flags_set_sdi(fsp_flags);
4736   mlog_write_ulint(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + page, fsp_flags,
4737                    MLOG_4BYTES, &mtr);
4738 
4739   mtr.commit();
4740 
4741   fil_space_set_flags(space, fsp_flags);
4742 
4743   dict_table_close(sdi_table, dict_locked, false);
4744 
4745   fil_space_release(space);
4746   return (DB_SUCCESS);
4747 }
4748 #endif /* !UNIV_HOTBACKUP */
4749