1 /*****************************************************************************
2 
3 Copyright (c) 2014, 2020, Oracle and/or its affiliates. All rights reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 /** @file gis/gis0sea.cc
28  InnoDB R-tree search interfaces
29 
30  Created 2014/01/16 Jimmy Yang
31  ***********************************************************************/
32 
33 #include <new>
34 
35 #include "fsp0fsp.h"
36 #include "gis0rtree.h"
37 #include "my_dbug.h"
38 #include "page0cur.h"
39 #include "page0page.h"
40 #include "page0zip.h"
41 
42 #include "btr0cur.h"
43 #include "btr0pcur.h"
44 #include "btr0sea.h"
45 #include "ibuf0ibuf.h"
46 #include "lock0lock.h"
47 #include "rem0cmp.h"
48 #include "srv0mon.h"
49 #include "sync0sync.h"
50 #include "trx0trx.h"
51 #include "univ.i"
52 
53 /** Restore the stored position of a persistent cursor bufferfixing the page */
54 static bool rtr_cur_restore_position(
55     ulint latch_mode,  /*!< in: BTR_SEARCH_LEAF, ... */
56     btr_cur_t *cursor, /*!< in: detached persistent cursor */
57     ulint level,       /*!< in: index level */
58     mtr_t *mtr);       /*!< in: mtr */
59 
60 /** Pop out used parent path entry, until we find the parent with matching
61  page number */
rtr_adjust_parent_path(rtr_info_t * rtr_info,page_no_t page_no)62 static void rtr_adjust_parent_path(
63     rtr_info_t *rtr_info, /* R-Tree info struct */
64     page_no_t page_no)    /* page number to look for */
65 {
66   while (!rtr_info->parent_path->empty()) {
67     if (rtr_info->parent_path->back().child_no == page_no) {
68       break;
69     } else {
70       if (rtr_info->parent_path->back().cursor) {
71         btr_pcur_close(rtr_info->parent_path->back().cursor);
72         ut_free(rtr_info->parent_path->back().cursor);
73       }
74 
75       rtr_info->parent_path->pop_back();
76     }
77   }
78 }
79 
80 /** Find the next matching record. This function is used by search
81  or record locating during index delete/update.
82  @return true if there is suitable record found, otherwise false */
rtr_pcur_getnext_from_path(const dtuple_t * tuple,page_cur_mode_t mode,btr_cur_t * btr_cur,ulint target_level,ulint latch_mode,bool index_locked,mtr_t * mtr)83 static bool rtr_pcur_getnext_from_path(
84     const dtuple_t *tuple, /*!< in: data tuple */
85     page_cur_mode_t mode,  /*!< in: cursor search mode */
86     btr_cur_t *btr_cur,    /*!< in: persistent cursor; NOTE that the
87                            function may release the page latch */
88     ulint target_level,
89     /*!< in: target level */
90     ulint latch_mode,
91     /*!< in: latch_mode */
92     bool index_locked,
93     /*!< in: index tree locked */
94     mtr_t *mtr) /*!< in: mtr */
95 {
96   dict_index_t *index = btr_cur->index;
97   bool found = false;
98   space_id_t space = dict_index_get_space(index);
99   page_cur_t *page_cursor;
100   ulint level = 0;
101   node_visit_t next_rec;
102   rtr_info_t *rtr_info = btr_cur->rtr_info;
103   node_seq_t page_ssn;
104   ulint my_latch_mode;
105   ulint skip_parent = false;
106   bool new_split = false;
107   bool need_parent;
108   bool for_delete = false;
109   bool for_undo_ins = false;
110 
111   /* exhausted all the pages to be searched */
112   if (rtr_info->path->empty()) {
113     return (false);
114   }
115 
116   ut_ad(dtuple_get_n_fields_cmp(tuple));
117 
118   my_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
119 
120   for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
121   for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
122 
123   /* There should be no insert coming to this function. Only
124   mode with BTR_MODIFY_* should be delete */
125   ut_ad(mode != PAGE_CUR_RTREE_INSERT);
126   ut_ad(my_latch_mode == BTR_SEARCH_LEAF || my_latch_mode == BTR_MODIFY_LEAF ||
127         my_latch_mode == BTR_MODIFY_TREE ||
128         my_latch_mode == BTR_CONT_MODIFY_TREE);
129 
130   /* Whether need to track parent information. Only need so
131   when we do tree altering operations (such as index page merge) */
132   need_parent = ((my_latch_mode == BTR_MODIFY_TREE ||
133                   my_latch_mode == BTR_CONT_MODIFY_TREE) &&
134                  mode == PAGE_CUR_RTREE_LOCATE);
135 
136   if (!index_locked) {
137     ut_ad(latch_mode & BTR_SEARCH_LEAF || latch_mode & BTR_MODIFY_LEAF);
138     mtr_s_lock(dict_index_get_lock(index), mtr);
139   } else {
140     ut_ad(
141         mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_SX_LOCK) ||
142         mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_S_LOCK) ||
143         mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_X_LOCK));
144   }
145 
146   const page_size_t &page_size = dict_table_page_size(index->table);
147 
148   /* Pop each node/page to be searched from "path" structure
149   and do a search on it. Please note, any pages that are in
150   the "path" structure are protected by "page" lock, so tey
151   cannot be shrunk away */
152   do {
153     buf_block_t *block;
154     node_seq_t path_ssn;
155     const page_t *page;
156     ulint rw_latch = RW_X_LATCH;
157     ulint tree_idx;
158 
159     mutex_enter(&rtr_info->rtr_path_mutex);
160     next_rec = rtr_info->path->back();
161     rtr_info->path->pop_back();
162     level = next_rec.level;
163     path_ssn = next_rec.seq_no;
164     tree_idx = btr_cur->tree_height - level - 1;
165 
166     /* Maintain the parent path info as well, if needed */
167     if (need_parent && !skip_parent && !new_split) {
168       ulint old_level;
169       ulint new_level;
170 
171       ut_ad(!rtr_info->parent_path->empty());
172 
173       /* Cleanup unused parent info */
174       if (rtr_info->parent_path->back().cursor) {
175         btr_pcur_close(rtr_info->parent_path->back().cursor);
176         ut_free(rtr_info->parent_path->back().cursor);
177       }
178 
179       old_level = rtr_info->parent_path->back().level;
180 
181       rtr_info->parent_path->pop_back();
182 
183       ut_ad(!rtr_info->parent_path->empty());
184 
185       /* check whether there is a level change. If so,
186       the current parent path needs to pop enough
187       nodes to adjust to the new search page */
188       new_level = rtr_info->parent_path->back().level;
189 
190       if (old_level < new_level) {
191         rtr_adjust_parent_path(rtr_info, next_rec.page_no);
192       }
193 
194       ut_ad(!rtr_info->parent_path->empty());
195 
196       ut_ad(next_rec.page_no == rtr_info->parent_path->back().child_no);
197     }
198 
199     mutex_exit(&rtr_info->rtr_path_mutex);
200 
201     skip_parent = false;
202     new_split = false;
203 
204     /* Once we have pages in "path", these pages are
205     predicate page locked, so they can't be shrunk away.
206     They also have SSN (split sequence number) to detect
207     splits, so we can directly latch single page while
208     getting them. They can be unlatched if not qualified.
209     One reason for pre-latch is that we might need to position
210     some parent position (requires latch) during search */
211     if (level == 0) {
212       /* S latched for SEARCH_LEAF, and X latched
213       for MODIFY_LEAF */
214       if (my_latch_mode <= BTR_MODIFY_LEAF) {
215         rw_latch = my_latch_mode;
216       }
217 
218       if (my_latch_mode == BTR_CONT_MODIFY_TREE ||
219           my_latch_mode == BTR_MODIFY_TREE) {
220         rw_latch = RW_NO_LATCH;
221       }
222 
223     } else if (level == target_level) {
224       rw_latch = RW_X_LATCH;
225     }
226 
227     /* Release previous locked blocks */
228     if (my_latch_mode != BTR_SEARCH_LEAF) {
229       for (ulint idx = 0; idx < btr_cur->tree_height; idx++) {
230         if (rtr_info->tree_blocks[idx]) {
231           mtr_release_block_at_savepoint(mtr, rtr_info->tree_savepoints[idx],
232                                          rtr_info->tree_blocks[idx]);
233           rtr_info->tree_blocks[idx] = nullptr;
234         }
235       }
236       for (ulint idx = RTR_MAX_LEVELS; idx < RTR_MAX_LEVELS + 3; idx++) {
237         if (rtr_info->tree_blocks[idx]) {
238           mtr_release_block_at_savepoint(mtr, rtr_info->tree_savepoints[idx],
239                                          rtr_info->tree_blocks[idx]);
240           rtr_info->tree_blocks[idx] = nullptr;
241         }
242       }
243     }
244 
245     /* set up savepoint to record any locks to be taken */
246     rtr_info->tree_savepoints[tree_idx] = mtr_set_savepoint(mtr);
247 
248 #ifdef UNIV_RTR_DEBUG
249     ut_ad(!(rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_X) ||
250             rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_S)) ||
251           my_latch_mode == BTR_MODIFY_TREE ||
252           my_latch_mode == BTR_CONT_MODIFY_TREE ||
253           !page_is_leaf(buf_block_get_frame(btr_cur->page_cur.block)));
254 #endif /* UNIV_RTR_DEBUG */
255 
256     page_id_t page_id(space, next_rec.page_no);
257 
258     block = buf_page_get_gen(page_id, page_size, rw_latch, nullptr,
259                              Page_fetch::NORMAL, __FILE__, __LINE__, mtr);
260 
261     if (block == nullptr) {
262       continue;
263     } else if (rw_latch != RW_NO_LATCH) {
264       ut_ad(!dict_index_is_ibuf(index));
265       buf_block_dbg_add_level(block, SYNC_TREE_NODE);
266     }
267 
268     rtr_info->tree_blocks[tree_idx] = block;
269 
270     page = buf_block_get_frame(block);
271     page_ssn = page_get_ssn_id(page);
272 
273     /* If there are splits, push the splitted page.
274     Note that we have SX lock on index->lock, there
275     should not be any split/shrink happening here */
276     if (page_ssn > path_ssn) {
277       page_no_t next_page_no = btr_page_get_next(page, mtr);
278       rtr_non_leaf_stack_push(rtr_info->path, next_page_no, path_ssn, level, 0,
279                               nullptr, 0);
280 
281       if (!srv_read_only_mode && mode != PAGE_CUR_RTREE_INSERT &&
282           mode != PAGE_CUR_RTREE_LOCATE) {
283         ut_ad(rtr_info->thr);
284         lock_place_prdt_page_lock({space, next_page_no}, index, rtr_info->thr);
285       }
286       new_split = true;
287 #ifdef UNIV_GIS_DEBUG
288       fprintf(stderr, "GIS_DIAG: Splitted page found: %d, %ld\n",
289               static_cast<int>(need_parent), next_page_no);
290 #endif
291     }
292 
293     page_cursor = btr_cur_get_page_cur(btr_cur);
294     page_cursor->rec = nullptr;
295 
296     if (mode == PAGE_CUR_RTREE_LOCATE) {
297       if (level == target_level && level == 0) {
298         ulint low_match;
299 
300         found = false;
301 
302         low_match = page_cur_search(block, index, tuple, PAGE_CUR_LE,
303                                     btr_cur_get_page_cur(btr_cur));
304 
305         if (low_match == dtuple_get_n_fields_cmp(tuple)) {
306           rec_t *rec = btr_cur_get_rec(btr_cur);
307 
308           if (!rec_get_deleted_flag(rec, dict_table_is_comp(index->table)) ||
309               (!for_delete && !for_undo_ins)) {
310             found = true;
311             btr_cur->low_match = low_match;
312           } else {
313             /* mark we found deleted row */
314             btr_cur->rtr_info->fd_del = true;
315           }
316         }
317       } else {
318         page_cur_mode_t page_mode = mode;
319 
320         if (level == target_level && target_level != 0) {
321           page_mode = PAGE_CUR_RTREE_GET_FATHER;
322         }
323         found = rtr_cur_search_with_match(block, index, tuple, page_mode,
324                                           page_cursor, btr_cur->rtr_info);
325 
326         /* Save the position of parent if needed */
327         if (found && need_parent) {
328           btr_pcur_t *r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
329 
330           rec_t *rec = page_cur_get_rec(page_cursor);
331           page_cur_position(rec, block, btr_pcur_get_page_cur(r_cursor));
332           r_cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
333           r_cursor->m_latch_mode = my_latch_mode;
334           btr_pcur_store_position(r_cursor, mtr);
335 #ifdef UNIV_DEBUG
336           ulint num_stored =
337               rtr_store_parent_path(block, btr_cur, rw_latch, level, mtr);
338           ut_ad(num_stored > 0);
339 #else
340           rtr_store_parent_path(block, btr_cur, rw_latch, level, mtr);
341 #endif /* UNIV_DEBUG */
342         }
343       }
344     } else {
345       found = rtr_cur_search_with_match(block, index, tuple, mode, page_cursor,
346                                         btr_cur->rtr_info);
347     }
348 
349     /* Attach predicate lock if needed, no matter whether
350     there are matched records */
351     if (mode != PAGE_CUR_RTREE_INSERT && mode != PAGE_CUR_RTREE_LOCATE &&
352         mode >= PAGE_CUR_CONTAIN && btr_cur->rtr_info->need_prdt_lock) {
353       lock_prdt_t prdt;
354 
355       trx_t *trx = thr_get_trx(btr_cur->rtr_info->thr);
356       trx_mutex_enter(trx);
357       lock_init_prdt_from_mbr(&prdt, &btr_cur->rtr_info->mbr, mode,
358                               trx->lock.lock_heap);
359       trx_mutex_exit(trx);
360 
361       if (rw_latch == RW_NO_LATCH) {
362         rw_lock_s_lock(&(block->lock));
363       }
364 
365       lock_prdt_lock(block, &prdt, index, LOCK_S, LOCK_PREDICATE,
366                      btr_cur->rtr_info->thr, mtr);
367 
368       if (rw_latch == RW_NO_LATCH) {
369         rw_lock_s_unlock(&(block->lock));
370       }
371     }
372 
373     if (found) {
374       if (level == target_level) {
375         page_cur_t *r_cur;
376         ;
377 
378         if (my_latch_mode == BTR_MODIFY_TREE && level == 0) {
379           ut_ad(rw_latch == RW_NO_LATCH);
380           page_id_t my_page_id(space, block->page.id.page_no());
381 
382           btr_cur_latch_leaves(block, my_page_id, page_size, BTR_MODIFY_TREE,
383                                btr_cur, mtr);
384         }
385 
386         r_cur = btr_cur_get_page_cur(btr_cur);
387 
388         page_cur_position(page_cur_get_rec(page_cursor),
389                           page_cur_get_block(page_cursor), r_cur);
390 
391         btr_cur->low_match = level != 0 ? DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1
392                                         : btr_cur->low_match;
393         break;
394       }
395 
396       /* Keep the parent path node, which points to
397       last node just located */
398       skip_parent = true;
399     } else {
400       /* Release latch on the current page */
401       ut_ad(rtr_info->tree_blocks[tree_idx]);
402 
403       mtr_release_block_at_savepoint(mtr, rtr_info->tree_savepoints[tree_idx],
404                                      rtr_info->tree_blocks[tree_idx]);
405       rtr_info->tree_blocks[tree_idx] = nullptr;
406     }
407 
408   } while (!rtr_info->path->empty());
409 
410   const rec_t *rec = btr_cur_get_rec(btr_cur);
411 
412   if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
413     mtr_commit(mtr);
414     mtr_start(mtr);
415   } else if (!index_locked) {
416     mtr_memo_release(mtr, dict_index_get_lock(index), MTR_MEMO_X_LOCK);
417   }
418 
419   return (found);
420 }
421 
422 /** Find the next matching record. This function will first exhaust
423 the copied record listed in the rtr_info->matches vector before
424 moving to next page
425 @param[in]	tuple		data tuple; NOTE: n_fields_cmp in tuple
426                                 must be set so that it cannot get compared
427                                 to the node ptr page number field!
428 @param[in]	mode		cursor search mode
429 @param[in]	sel_mode	select mode: SELECT_ORDINARY,
430                                 SELECT_SKIP_LOKCED, or SELECT_NO_WAIT
431 @param[in]	cursor		persistent cursor; NOTE that the function
432                                 may release the page latch
433 @param[in]	cur_level	current level
434 @param[in]	mtr		mini-transaction
435 @return true if there is next qualified record found, otherwise(if
436 exhausted) false */
rtr_pcur_move_to_next(const dtuple_t * tuple,page_cur_mode_t mode,select_mode sel_mode,btr_pcur_t * cursor,ulint cur_level,mtr_t * mtr)437 bool rtr_pcur_move_to_next(const dtuple_t *tuple, page_cur_mode_t mode,
438                            select_mode sel_mode, btr_pcur_t *cursor,
439                            ulint cur_level, mtr_t *mtr) {
440   rtr_info_t *rtr_info = cursor->m_btr_cur.rtr_info;
441 
442   ut_a(cursor->m_pos_state == BTR_PCUR_IS_POSITIONED);
443 
444   mutex_enter(&rtr_info->matches->rtr_match_mutex);
445   /* First retrieve the next record on the current page */
446   while (!rtr_info->matches->matched_recs->empty()) {
447     rtr_rec_t rec;
448     rec = rtr_info->matches->matched_recs->back();
449     rtr_info->matches->matched_recs->pop_back();
450 
451     /* Skip unlocked record
452     Note: CHECK TABLE doesn't hold record locks. */
453     if (sel_mode != SELECT_ORDINARY && !rec.locked) {
454       continue;
455     }
456 
457     mutex_exit(&rtr_info->matches->rtr_match_mutex);
458 
459     cursor->m_btr_cur.page_cur.rec = rec.r_rec;
460     cursor->m_btr_cur.page_cur.block = &rtr_info->matches->block;
461 
462     DEBUG_SYNC_C("rtr_pcur_move_to_next_return");
463     return (true);
464   }
465 
466   mutex_exit(&rtr_info->matches->rtr_match_mutex);
467 
468   /* Fetch the next page */
469   return (rtr_pcur_getnext_from_path(tuple, mode, &cursor->m_btr_cur, cur_level,
470                                      cursor->m_latch_mode, false, mtr));
471 }
472 
473 /** Check if the cursor holds record pointing to the specified child page
474  @return	true if it is (pointing to the child page) false otherwise */
rtr_compare_cursor_rec(dict_index_t * index,btr_cur_t * cursor,page_no_t page_no,mem_heap_t ** heap)475 static bool rtr_compare_cursor_rec(
476     dict_index_t *index, /*!< in: index */
477     btr_cur_t *cursor,   /*!< in: Cursor to check */
478     page_no_t page_no,   /*!< in: desired child page number */
479     mem_heap_t **heap)   /*!< in: memory heap */
480 {
481   const rec_t *rec;
482   ulint *offsets;
483 
484   rec = btr_cur_get_rec(cursor);
485 
486   offsets = rec_get_offsets(rec, index, nullptr, ULINT_UNDEFINED, heap);
487 
488   return (btr_node_ptr_get_child_page_no(rec, offsets) == page_no);
489 }
490 
491 /** Initializes and opens a persistent cursor to an index tree. It should be
492  closed with btr_pcur_close. Mainly called by row_search_index_entry() */
rtr_pcur_open_low(dict_index_t * index,ulint level,const dtuple_t * tuple,page_cur_mode_t mode,ulint latch_mode,btr_pcur_t * cursor,const char * file,ulint line,mtr_t * mtr)493 void rtr_pcur_open_low(
494     dict_index_t *index,   /*!< in: index */
495     ulint level,           /*!< in: level in the rtree */
496     const dtuple_t *tuple, /*!< in: tuple on which search done */
497     page_cur_mode_t mode,  /*!< in: PAGE_CUR_RTREE_LOCATE, ... */
498     ulint latch_mode,      /*!< in: BTR_SEARCH_LEAF, ... */
499     btr_pcur_t *cursor,    /*!< in: memory buffer for persistent cursor */
500     const char *file,      /*!< in: file name */
501     ulint line,            /*!< in: line where called */
502     mtr_t *mtr)            /*!< in: mtr */
503 {
504   btr_cur_t *btr_cursor;
505   ulint n_fields;
506   ulint low_match;
507   rec_t *rec;
508   bool tree_latched = false;
509   bool for_delete = false;
510   bool for_undo_ins = false;
511 
512   ut_ad(level == 0);
513 
514   ut_ad(latch_mode & BTR_MODIFY_LEAF || latch_mode & BTR_MODIFY_TREE);
515   ut_ad(mode == PAGE_CUR_RTREE_LOCATE);
516 
517   /* Initialize the cursor */
518 
519   btr_pcur_init(cursor);
520 
521   for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
522   for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
523 
524   cursor->m_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
525   cursor->m_search_mode = mode;
526 
527   /* Search with the tree cursor */
528 
529   btr_cursor = btr_pcur_get_btr_cur(cursor);
530 
531   btr_cursor->rtr_info = rtr_create_rtr_info(false, false, btr_cursor, index);
532 
533   /* Purge will SX lock the tree instead of take Page Locks */
534   if (btr_cursor->thr) {
535     btr_cursor->rtr_info->need_page_lock = true;
536     btr_cursor->rtr_info->thr = btr_cursor->thr;
537   }
538 
539   btr_cur_search_to_nth_level(index, level, tuple, mode, latch_mode, btr_cursor,
540                               0, file, line, mtr);
541   cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
542 
543   cursor->m_trx_if_known = nullptr;
544 
545   low_match = btr_pcur_get_low_match(cursor);
546 
547   rec = btr_pcur_get_rec(cursor);
548 
549   n_fields = dtuple_get_n_fields(tuple);
550 
551   if (latch_mode & BTR_ALREADY_S_LATCHED) {
552     ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_S_LOCK));
553     tree_latched = true;
554   }
555 
556   if (latch_mode & BTR_MODIFY_TREE) {
557     ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_X_LOCK) ||
558           mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_SX_LOCK));
559     tree_latched = true;
560   }
561 
562   if (page_rec_is_infimum(rec) || low_match != n_fields ||
563       (rec_get_deleted_flag(rec, dict_table_is_comp(index->table)) &&
564        (for_delete || for_undo_ins))) {
565     if (rec_get_deleted_flag(rec, dict_table_is_comp(index->table)) &&
566         for_delete) {
567       btr_cursor->rtr_info->fd_del = true;
568       btr_cursor->low_match = 0;
569     }
570     /* Did not find matched row in first dive. Release
571     latched block if any before search more pages */
572     if (latch_mode & BTR_MODIFY_LEAF) {
573       ulint tree_idx = btr_cursor->tree_height - 1;
574       rtr_info_t *rtr_info = btr_cursor->rtr_info;
575 
576       ut_ad(level == 0);
577 
578       if (rtr_info->tree_blocks[tree_idx]) {
579         mtr_release_block_at_savepoint(mtr, rtr_info->tree_savepoints[tree_idx],
580                                        rtr_info->tree_blocks[tree_idx]);
581         rtr_info->tree_blocks[tree_idx] = nullptr;
582       }
583     }
584 
585     bool ret = rtr_pcur_getnext_from_path(tuple, mode, btr_cursor, level,
586                                           latch_mode, tree_latched, mtr);
587 
588     if (ret) {
589       low_match = btr_pcur_get_low_match(cursor);
590       ut_ad(low_match == n_fields);
591     }
592   }
593 }
594 
595 /** Returns the upper level node pointer to a R-Tree page. It is assumed
596 that mtr holds an SX-latch or X-latch on the tree.
597 @return	rec_get_offsets() of the node pointer record */
rtr_page_get_father_node_ptr(ulint * offsets,mem_heap_t * heap,btr_cur_t * sea_cur,btr_cur_t * cursor,mtr_t * mtr)598 static ulint *rtr_page_get_father_node_ptr(
599     ulint *offsets,     /*!< in: work area for the return value */
600     mem_heap_t *heap,   /*!< in: memory heap to use */
601     btr_cur_t *sea_cur, /*!< in: search cursor */
602     btr_cur_t *cursor,  /*!< in: cursor pointing to user record,
603                         out: cursor on node pointer record,
604                         its page x-latched */
605     mtr_t *mtr)         /*!< in: mtr */
606 {
607   dtuple_t *tuple;
608   rec_t *user_rec;
609   rec_t *node_ptr;
610   ulint level;
611   page_no_t page_no;
612   dict_index_t *index;
613   rtr_mbr_t mbr;
614 
615   page_no = btr_cur_get_block(cursor)->page.id.page_no();
616   index = btr_cur_get_index(cursor);
617 
618   ut_ad(srv_read_only_mode ||
619         mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
620                                   MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
621 
622   ut_ad(dict_index_get_page(index) != page_no);
623 
624   level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
625 
626   user_rec = btr_cur_get_rec(cursor);
627   ut_a(page_rec_is_user_rec(user_rec));
628 
629   offsets = rec_get_offsets(user_rec, index, offsets, ULINT_UNDEFINED, &heap);
630   rtr_get_mbr_from_rec(user_rec, offsets, &mbr);
631 
632   tuple = rtr_index_build_node_ptr(index, &mbr, user_rec, page_no, heap, level);
633 
634   if (sea_cur && !sea_cur->rtr_info) {
635     sea_cur = nullptr;
636   }
637 
638   rtr_get_father_node(index, level + 1, tuple, sea_cur, cursor, page_no, mtr);
639 
640   node_ptr = btr_cur_get_rec(cursor);
641   ut_ad(!page_rec_is_comp(node_ptr) ||
642         rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
643   offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
644 
645   page_no_t child_page = btr_node_ptr_get_child_page_no(node_ptr, offsets);
646 
647   if (child_page != page_no) {
648     const rec_t *print_rec;
649 
650     ib::fatal error;
651 
652     error << "Corruption of index " << index->name << " of table "
653           << index->table->name << " parent page " << page_no << " child page "
654           << child_page;
655 
656     print_rec = page_rec_get_next(page_get_infimum_rec(page_align(user_rec)));
657     offsets =
658         rec_get_offsets(print_rec, index, offsets, ULINT_UNDEFINED, &heap);
659     error << "; child ";
660     rec_print(error.m_oss, print_rec,
661               rec_get_info_bits(print_rec, rec_offs_comp(offsets)), offsets);
662     offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
663     error << "; parent ";
664     rec_print(error.m_oss, print_rec,
665               rec_get_info_bits(print_rec, rec_offs_comp(offsets)), offsets);
666 
667     error << ". You should dump + drop + reimport the table to"
668              " fix the corruption. If the crash happens at"
669              " database startup, see " REFMAN
670              "forcing-innodb-recovery.html about forcing"
671              " recovery. Then dump + drop + reimport.";
672   }
673 
674   return (offsets);
675 }
676 
677 /* Get the rtree page father.
678 @param[in]	index		rtree index
679 @param[in]	block		child page in the index
680 @param[in]	mtr		mtr
681 @param[in]	sea_cur		search cursor, contains information
682                                 about parent nodes in search
683 @param[in]	cursor		cursor on node pointer record,
684                                 its page x-latched */
rtr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)685 void rtr_page_get_father(dict_index_t *index, buf_block_t *block, mtr_t *mtr,
686                          btr_cur_t *sea_cur, btr_cur_t *cursor) {
687   mem_heap_t *heap = mem_heap_create(100);
688 #ifdef UNIV_DEBUG
689   ulint *offsets;
690 
691   offsets = rtr_page_get_father_block(nullptr, heap, index, block, mtr, sea_cur,
692                                       cursor);
693 
694   ulint page_no = btr_node_ptr_get_child_page_no(cursor->page_cur.rec, offsets);
695 
696   ut_ad(page_no == block->page.id.page_no());
697 #else
698   rtr_page_get_father_block(NULL, heap, index, block, mtr, sea_cur, cursor);
699 #endif
700 
701   mem_heap_free(heap);
702 }
703 
704 /** Returns the father block to a page. It is assumed that mtr holds
705  an X or SX latch on the tree.
706  @return rec_get_offsets() of the node pointer record */
rtr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)707 ulint *rtr_page_get_father_block(
708     ulint *offsets,      /*!< in: work area for the return value */
709     mem_heap_t *heap,    /*!< in: memory heap to use */
710     dict_index_t *index, /*!< in: b-tree index */
711     buf_block_t *block,  /*!< in: child page in the index */
712     mtr_t *mtr,          /*!< in: mtr */
713     btr_cur_t *sea_cur,  /*!< in: search cursor, contains information
714                          about parent nodes in search */
715     btr_cur_t *cursor)   /*!< out: cursor on node pointer record,
716                          its page x-latched */
717 {
718   rec_t *rec =
719       page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(block)));
720   btr_cur_position(index, rec, block, cursor);
721 
722   return (rtr_page_get_father_node_ptr(offsets, heap, sea_cur, cursor, mtr));
723 }
724 
725 /** Returns the upper level node pointer to a R-Tree page. It is assumed
726  that mtr holds an x-latch on the tree. */
rtr_get_father_node(dict_index_t * index,ulint level,const dtuple_t * tuple,btr_cur_t * sea_cur,btr_cur_t * btr_cur,page_no_t page_no,mtr_t * mtr)727 void rtr_get_father_node(
728     dict_index_t *index,   /*!< in: index */
729     ulint level,           /*!< in: the tree level of search */
730     const dtuple_t *tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
731                            tuple must be set so that it cannot get
732                            compared to the node ptr page number field! */
733     btr_cur_t *sea_cur,    /*!< in: search cursor */
734     btr_cur_t *btr_cur,    /*!< in/out: tree cursor; the cursor page is
735                            s- or x-latched, but see also above! */
736     page_no_t page_no,     /*!< Current page no */
737     mtr_t *mtr)            /*!< in: mtr */
738 {
739   mem_heap_t *heap = nullptr;
740   bool ret = false;
741   const rec_t *rec;
742   ulint n_fields;
743   bool new_rtr = false;
744 
745   /* Try to optimally locate the parent node. Level should always
746   less than sea_cur->tree_height unless the root is splitting */
747   if (sea_cur && sea_cur->tree_height > level) {
748     ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
749                                     MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
750     ret = rtr_cur_restore_position(BTR_CONT_MODIFY_TREE, sea_cur, level, mtr);
751 
752     /* Once we block shrink tree nodes while there are
753     active search on it, this optimal locating should always
754     succeeds */
755     ut_ad(ret);
756 
757     if (ret) {
758       btr_pcur_t *r_cursor = rtr_get_parent_cursor(sea_cur, level, false);
759 
760       rec = btr_pcur_get_rec(r_cursor);
761 
762       ut_ad(r_cursor->m_rel_pos == BTR_PCUR_ON);
763       page_cur_position(rec, btr_pcur_get_block(r_cursor),
764                         btr_cur_get_page_cur(btr_cur));
765       btr_cur->rtr_info = sea_cur->rtr_info;
766       btr_cur->m_own_rtr_info = false;
767       btr_cur->tree_height = sea_cur->tree_height;
768       ut_ad(rtr_compare_cursor_rec(index, btr_cur, page_no, &heap));
769       goto func_exit;
770     }
771   }
772 
773   /* We arrive here in one of two scenario
774   1) check table and btr_valide
775   2) index root page being raised */
776   ut_ad(!sea_cur || sea_cur->tree_height == level);
777 
778   if (btr_cur->rtr_info) {
779     rtr_clean_rtr_info(btr_cur->rtr_info, true);
780   } else {
781     new_rtr = true;
782   }
783 
784   btr_cur->rtr_info = rtr_create_rtr_info(false, false, btr_cur, index);
785 
786   if (sea_cur && sea_cur->tree_height == level) {
787     /* root split, and search the new root */
788     btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_RTREE_LOCATE,
789                                 BTR_CONT_MODIFY_TREE, btr_cur, 0, __FILE__,
790                                 __LINE__, mtr);
791 
792   } else {
793     /* btr_validate */
794     ut_ad(level >= 1);
795     ut_ad(!sea_cur);
796 
797     btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_RTREE_LOCATE,
798                                 BTR_CONT_MODIFY_TREE, btr_cur, 0, __FILE__,
799                                 __LINE__, mtr);
800 
801     rec = btr_cur_get_rec(btr_cur);
802     n_fields = dtuple_get_n_fields_cmp(tuple);
803 
804     if (page_rec_is_infimum(rec) || (btr_cur->low_match != n_fields)) {
805       ret = rtr_pcur_getnext_from_path(tuple, PAGE_CUR_RTREE_LOCATE, btr_cur,
806                                        level, BTR_CONT_MODIFY_TREE, true, mtr);
807 
808       ut_ad(ret && btr_cur->low_match == n_fields);
809     }
810   }
811 
812   ret = rtr_compare_cursor_rec(index, btr_cur, page_no, &heap);
813 
814   ut_ad(ret);
815 
816 func_exit:
817   if (heap) {
818     mem_heap_free(heap);
819   }
820 
821   if (new_rtr && btr_cur->rtr_info) {
822     rtr_clean_rtr_info(btr_cur->rtr_info, true);
823     btr_cur->rtr_info = nullptr;
824   }
825 }
826 
827 /** Create a RTree search info structure */
rtr_create_rtr_info(bool need_prdt,bool init_matches,btr_cur_t * cursor,dict_index_t * index)828 rtr_info_t *rtr_create_rtr_info(
829     /******************/
830     bool need_prdt,      /*!< in: Whether predicate lock
831                          is needed */
832     bool init_matches,   /*!< in: Whether to initiate the
833                          "matches" structure for collecting
834                          matched leaf records */
835     btr_cur_t *cursor,   /*!< in: tree search cursor */
836     dict_index_t *index) /*!< in: index struct */
837 {
838   rtr_info_t *rtr_info;
839 
840   index = index ? index : cursor->index;
841   ut_ad(index);
842 
843   rtr_info = static_cast<rtr_info_t *>(ut_zalloc_nokey(sizeof(*rtr_info)));
844 
845   rtr_info->allocated = true;
846   rtr_info->cursor = cursor;
847   rtr_info->index = index;
848   rtr_info->is_dup = nullptr;
849 
850   if (init_matches) {
851     rtr_info->heap = mem_heap_create(sizeof(*(rtr_info->matches)));
852     rtr_info->matches = static_cast<matched_rec_t *>(
853         mem_heap_zalloc(rtr_info->heap, sizeof(*rtr_info->matches)));
854 
855     rtr_info->matches->matched_recs = UT_NEW_NOKEY(rtr_rec_vector());
856 
857     rtr_info->matches->bufp =
858         page_align(rtr_info->matches->rec_buf + UNIV_PAGE_SIZE_MAX + 1);
859     mutex_create(LATCH_ID_RTR_MATCH_MUTEX, &rtr_info->matches->rtr_match_mutex);
860     rw_lock_create(PFS_NOT_INSTRUMENTED, &(rtr_info->matches->block.lock),
861                    SYNC_LEVEL_VARYING);
862   }
863 
864   rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
865   rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
866   rtr_info->need_prdt_lock = need_prdt;
867   mutex_create(LATCH_ID_RTR_PATH_MUTEX, &rtr_info->rtr_path_mutex);
868 
869   mutex_enter(&index->rtr_track->rtr_active_mutex);
870   index->rtr_track->rtr_active->push_back(rtr_info);
871   mutex_exit(&index->rtr_track->rtr_active_mutex);
872   return (rtr_info);
873 }
874 
875 /** Update a btr_cur_t with rtr_info */
rtr_info_update_btr(btr_cur_t * cursor,rtr_info_t * rtr_info)876 void rtr_info_update_btr(
877     /******************/
878     btr_cur_t *cursor,    /*!< in/out: tree cursor */
879     rtr_info_t *rtr_info) /*!< in: rtr_info to set to the
880                           cursor */
881 {
882   ut_ad(rtr_info);
883 
884   cursor->rtr_info = rtr_info;
885 }
886 
887 /** Initialize a R-Tree Search structure */
rtr_init_rtr_info(rtr_info_t * rtr_info,bool need_prdt,btr_cur_t * cursor,dict_index_t * index,bool reinit)888 void rtr_init_rtr_info(
889     /****************/
890     rtr_info_t *rtr_info, /*!< in: rtr_info to set to the
891                           cursor */
892     bool need_prdt,       /*!< in: Whether predicate lock is
893                           needed */
894     btr_cur_t *cursor,    /*!< in: tree search cursor */
895     dict_index_t *index,  /*!< in: index structure */
896     bool reinit)          /*!< in: Whether this is a reinit */
897 {
898   ut_ad(rtr_info);
899 
900   if (!reinit) {
901     /* Reset all members. */
902     rtr_info->path = nullptr;
903     rtr_info->parent_path = nullptr;
904     rtr_info->matches = nullptr;
905 
906     mutex_create(LATCH_ID_RTR_PATH_MUTEX, &rtr_info->rtr_path_mutex);
907 
908     memset(rtr_info->tree_blocks, 0x0, sizeof(rtr_info->tree_blocks));
909     memset(rtr_info->tree_savepoints, 0x0, sizeof(rtr_info->tree_savepoints));
910     rtr_info->mbr.xmin = 0.0;
911     rtr_info->mbr.xmax = 0.0;
912     rtr_info->mbr.ymin = 0.0;
913     rtr_info->mbr.ymax = 0.0;
914     rtr_info->thr = nullptr;
915     rtr_info->heap = nullptr;
916     rtr_info->cursor = nullptr;
917     rtr_info->index = nullptr;
918     rtr_info->need_prdt_lock = false;
919     rtr_info->need_page_lock = false;
920     rtr_info->allocated = false;
921     rtr_info->mbr_adj = false;
922     rtr_info->fd_del = false;
923     rtr_info->search_tuple = nullptr;
924     rtr_info->search_mode = PAGE_CUR_UNSUPP;
925   }
926 
927   ut_ad(!rtr_info->matches || rtr_info->matches->matched_recs->empty());
928 
929   rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
930   rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
931   rtr_info->need_prdt_lock = need_prdt;
932   rtr_info->cursor = cursor;
933   rtr_info->index = index;
934   rtr_info->is_dup = nullptr;
935 
936   mutex_enter(&index->rtr_track->rtr_active_mutex);
937   index->rtr_track->rtr_active->push_back(rtr_info);
938   mutex_exit(&index->rtr_track->rtr_active_mutex);
939 }
940 
941 /** Clean up R-Tree search structure */
rtr_clean_rtr_info(rtr_info_t * rtr_info,bool free_all)942 void rtr_clean_rtr_info(rtr_info_t *rtr_info, /*!< in: RTree search info */
943                         bool free_all) /*!< in: need to free rtr_info itself */
944 {
945   dict_index_t *index;
946   bool initialized = false;
947 
948   if (!rtr_info) {
949     return;
950   }
951 
952   index = rtr_info->index;
953 
954   if (index) {
955     mutex_enter(&index->rtr_track->rtr_active_mutex);
956   }
957 
958   while (rtr_info->parent_path && !rtr_info->parent_path->empty()) {
959     btr_pcur_t *cur = rtr_info->parent_path->back().cursor;
960     rtr_info->parent_path->pop_back();
961 
962     if (cur) {
963       btr_pcur_close(cur);
964       ut_free(cur);
965     }
966   }
967 
968   UT_DELETE(rtr_info->parent_path);
969   rtr_info->parent_path = nullptr;
970 
971   if (rtr_info->path != nullptr) {
972     UT_DELETE(rtr_info->path);
973     rtr_info->path = nullptr;
974     initialized = true;
975   }
976 
977   if (rtr_info->matches) {
978     rtr_info->matches->used = false;
979     rtr_info->matches->locked = false;
980     rtr_info->matches->valid = false;
981     rtr_info->matches->matched_recs->clear();
982   }
983 
984   if (index) {
985     index->rtr_track->rtr_active->remove(rtr_info);
986     mutex_exit(&index->rtr_track->rtr_active_mutex);
987   }
988 
989   if (free_all) {
990     if (rtr_info->matches) {
991       if (rtr_info->matches->matched_recs != nullptr) {
992         UT_DELETE(rtr_info->matches->matched_recs);
993       }
994 
995       rw_lock_free(&(rtr_info->matches->block.lock));
996 
997       mutex_destroy(&rtr_info->matches->rtr_match_mutex);
998     }
999 
1000     if (rtr_info->heap) {
1001       mem_heap_free(rtr_info->heap);
1002     }
1003 
1004     if (initialized) {
1005       mutex_destroy(&rtr_info->rtr_path_mutex);
1006     }
1007 
1008     if (rtr_info->allocated) {
1009       ut_free(rtr_info);
1010     }
1011   }
1012 }
1013 
1014 /** Rebuilt the "path" to exclude the removing page no */
rtr_rebuild_path(rtr_info_t * rtr_info,page_no_t page_no)1015 static void rtr_rebuild_path(
1016     rtr_info_t *rtr_info, /*!< in: RTree search info */
1017     page_no_t page_no)    /*!< in: need to free rtr_info itself */
1018 {
1019   rtr_node_path_t *new_path = UT_NEW_NOKEY(rtr_node_path_t());
1020 
1021   rtr_node_path_t::iterator rit;
1022 #ifdef UNIV_DEBUG
1023   ulint before_size = rtr_info->path->size();
1024 #endif /* UNIV_DEBUG */
1025 
1026   for (rit = rtr_info->path->begin(); rit != rtr_info->path->end(); ++rit) {
1027     node_visit_t next_rec = *rit;
1028 
1029     if (next_rec.page_no == page_no) {
1030       continue;
1031     }
1032 
1033     new_path->push_back(next_rec);
1034 #ifdef UNIV_DEBUG
1035     node_visit_t rec = new_path->back();
1036     ut_ad(rec.level < rtr_info->cursor->tree_height && rec.page_no > 0);
1037 #endif /* UNIV_DEBUG */
1038   }
1039 
1040   UT_DELETE(rtr_info->path);
1041 
1042   ut_ad(new_path->size() == before_size - 1);
1043 
1044   rtr_info->path = new_path;
1045 
1046   if (!rtr_info->parent_path->empty()) {
1047     rtr_node_path_t *new_parent_path = UT_NEW_NOKEY(rtr_node_path_t());
1048 
1049     for (rit = rtr_info->parent_path->begin();
1050          rit != rtr_info->parent_path->end(); ++rit) {
1051       node_visit_t next_rec = *rit;
1052 
1053       if (next_rec.child_no == page_no) {
1054         btr_pcur_t *cur = next_rec.cursor;
1055 
1056         if (cur) {
1057           btr_pcur_close(cur);
1058           ut_free(cur);
1059         }
1060 
1061         continue;
1062       }
1063 
1064       new_parent_path->push_back(next_rec);
1065     }
1066     UT_DELETE(rtr_info->parent_path);
1067     rtr_info->parent_path = new_parent_path;
1068   }
1069 }
1070 
1071 /** Check whether a discarding page is in anyone's search path */
rtr_check_discard_page(dict_index_t * index,btr_cur_t * cursor,buf_block_t * block)1072 void rtr_check_discard_page(
1073     dict_index_t *index, /*!< in: index */
1074     btr_cur_t *cursor,   /*!< in: cursor on the page to discard: not on
1075                          the root page */
1076     buf_block_t *block)  /*!< in: block of page to be discarded */
1077 {
1078   page_no_t pageno = block->page.id.page_no();
1079   rtr_info_t *rtr_info;
1080   rtr_info_active::iterator it;
1081 
1082   mutex_enter(&index->rtr_track->rtr_active_mutex);
1083 
1084   for (it = index->rtr_track->rtr_active->begin();
1085        it != index->rtr_track->rtr_active->end(); ++it) {
1086     rtr_info = *it;
1087     rtr_node_path_t::iterator rit;
1088     bool found = false;
1089 
1090     if (cursor && rtr_info == cursor->rtr_info) {
1091       continue;
1092     }
1093 
1094     mutex_enter(&rtr_info->rtr_path_mutex);
1095     for (rit = rtr_info->path->begin(); rit != rtr_info->path->end(); ++rit) {
1096       node_visit_t node = *rit;
1097 
1098       if (node.page_no == pageno) {
1099         found = true;
1100         break;
1101       }
1102     }
1103 
1104     if (found) {
1105       rtr_rebuild_path(rtr_info, pageno);
1106     }
1107     mutex_exit(&rtr_info->rtr_path_mutex);
1108 
1109     if (rtr_info->matches) {
1110       mutex_enter(&rtr_info->matches->rtr_match_mutex);
1111 
1112       if ((&rtr_info->matches->block)->page.id.page_no() == pageno) {
1113         if (!rtr_info->matches->matched_recs->empty()) {
1114           rtr_info->matches->matched_recs->clear();
1115         }
1116         ut_ad(rtr_info->matches->matched_recs->empty());
1117         rtr_info->matches->valid = false;
1118       }
1119 
1120       mutex_exit(&rtr_info->matches->rtr_match_mutex);
1121     }
1122   }
1123 
1124   mutex_exit(&index->rtr_track->rtr_active_mutex);
1125 
1126   locksys::Shard_latch_guard guard{block->get_page_id()};
1127   lock_prdt_page_free_from_discard(block, lock_sys->prdt_hash);
1128   lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
1129 }
1130 
1131 /** Restore the stored position of a persistent cursor bufferfixing the page */
rtr_cur_restore_position(ulint latch_mode,btr_cur_t * btr_cur,ulint level,mtr_t * mtr)1132 static bool rtr_cur_restore_position(
1133     ulint latch_mode,   /*!< in: BTR_SEARCH_LEAF, ... */
1134     btr_cur_t *btr_cur, /*!< in: detached persistent cursor */
1135     ulint level,        /*!< in: index level */
1136     mtr_t *mtr)         /*!< in: mtr */
1137 {
1138   dict_index_t *index;
1139   mem_heap_t *heap;
1140   btr_pcur_t *r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
1141   dtuple_t *tuple;
1142   bool ret = false;
1143 
1144   ut_ad(mtr);
1145   ut_ad(r_cursor);
1146   ut_ad(mtr->is_active());
1147 
1148   index = btr_cur_get_index(btr_cur);
1149 
1150   if (r_cursor->m_rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE ||
1151       r_cursor->m_rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE) {
1152     return (false);
1153   }
1154 
1155   DBUG_EXECUTE_IF("rtr_pessimistic_position", r_cursor->m_modify_clock = 100;);
1156 
1157   ut_ad(latch_mode == BTR_CONT_MODIFY_TREE);
1158 
1159   if (!buf_pool_is_obsolete(r_cursor->m_withdraw_clock) &&
1160       buf_page_optimistic_get(RW_X_LATCH, r_cursor->m_block_when_stored,
1161                               r_cursor->m_modify_clock, Page_fetch::NORMAL,
1162                               __FILE__, __LINE__, mtr)) {
1163     ut_ad(r_cursor->m_pos_state == BTR_PCUR_IS_POSITIONED);
1164 
1165     ut_ad(r_cursor->m_rel_pos == BTR_PCUR_ON);
1166 #ifdef UNIV_DEBUG
1167     do {
1168       const rec_t *rec;
1169       const ulint *offsets1;
1170       const ulint *offsets2;
1171       ulint comp;
1172 
1173       rec = btr_pcur_get_rec(r_cursor);
1174 
1175       heap = mem_heap_create(256);
1176       offsets1 = rec_get_offsets(r_cursor->m_old_rec, index, nullptr,
1177                                  r_cursor->m_old_n_fields, &heap);
1178       offsets2 =
1179           rec_get_offsets(rec, index, nullptr, r_cursor->m_old_n_fields, &heap);
1180 
1181       comp = rec_offs_comp(offsets1);
1182 
1183       if (rec_get_info_bits(r_cursor->m_old_rec, comp) &
1184           REC_INFO_MIN_REC_FLAG) {
1185         ut_ad(rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG);
1186       } else {
1187         ut_ad(!cmp_rec_rec(r_cursor->m_old_rec, rec, offsets1, offsets2, index,
1188                            page_is_spatial_non_leaf(rec, index)));
1189       }
1190 
1191       mem_heap_free(heap);
1192     } while (false);
1193 #endif /* UNIV_DEBUG */
1194 
1195     return (true);
1196   }
1197 
1198   /* Page has changed, for R-Tree, the page cannot be shrunk away,
1199   so we search the page and its right siblings */
1200   buf_block_t *block;
1201   node_seq_t page_ssn;
1202   const page_t *page;
1203   page_cur_t *page_cursor;
1204   node_visit_t *node = rtr_get_parent_node(btr_cur, level, false);
1205   space_id_t space = dict_index_get_space(index);
1206   node_seq_t path_ssn = node->seq_no;
1207   page_size_t page_size = dict_table_page_size(index->table);
1208 
1209   page_no_t page_no = node->page_no;
1210 
1211   heap = mem_heap_create(256);
1212 
1213   tuple = dict_index_build_data_tuple(index, r_cursor->m_old_rec,
1214                                       r_cursor->m_old_n_fields, heap);
1215 
1216   page_cursor = btr_pcur_get_page_cur(r_cursor);
1217   ut_ad(r_cursor == node->cursor);
1218 
1219 search_again:
1220   page_id_t page_id(space, page_no);
1221 
1222   block = buf_page_get_gen(page_id, page_size, RW_X_LATCH, nullptr,
1223                            Page_fetch::NORMAL, __FILE__, __LINE__, mtr);
1224 
1225   ut_ad(block);
1226 
1227   /* Get the page SSN */
1228   page = buf_block_get_frame(block);
1229   page_ssn = page_get_ssn_id(page);
1230 
1231   ulint low_match =
1232       page_cur_search(block, index, tuple, PAGE_CUR_LE, page_cursor);
1233 
1234   if (low_match == r_cursor->m_old_n_fields) {
1235     const rec_t *rec;
1236     const ulint *offsets1;
1237     const ulint *offsets2;
1238     ulint comp;
1239 
1240     rec = btr_pcur_get_rec(r_cursor);
1241 
1242     offsets1 = rec_get_offsets(r_cursor->m_old_rec, index, nullptr,
1243                                r_cursor->m_old_n_fields, &heap);
1244     offsets2 =
1245         rec_get_offsets(rec, index, nullptr, r_cursor->m_old_n_fields, &heap);
1246 
1247     comp = rec_offs_comp(offsets1);
1248 
1249     if ((rec_get_info_bits(r_cursor->m_old_rec, comp) &
1250          REC_INFO_MIN_REC_FLAG) &&
1251         (rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG)) {
1252       r_cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
1253       ret = true;
1254     } else if (!cmp_rec_rec(r_cursor->m_old_rec, rec, offsets1, offsets2, index,
1255                             page_is_spatial_non_leaf(rec, index))) {
1256       r_cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
1257       ret = true;
1258     }
1259   }
1260 
1261   /* Check the page SSN to see if it has been splitted, if so, search
1262   the right page */
1263   if (!ret && page_ssn > path_ssn) {
1264     page_no = btr_page_get_next(page, mtr);
1265     goto search_again;
1266   }
1267 
1268   mem_heap_free(heap);
1269 
1270   return (ret);
1271 }
1272 
1273 /** Copy the leaf level R-tree record, and push it to matched_rec in rtr_info */
rtr_leaf_push_match_rec(const rec_t * rec,rtr_info_t * rtr_info,ulint * offsets,bool is_comp)1274 static void rtr_leaf_push_match_rec(
1275     const rec_t *rec,     /*!< in: record to copy */
1276     rtr_info_t *rtr_info, /*!< in/out: search stack */
1277     ulint *offsets,       /*!< in: offsets */
1278     bool is_comp)         /*!< in: is compact format */
1279 {
1280   byte *buf;
1281   matched_rec_t *match_rec = rtr_info->matches;
1282   rec_t *copy;
1283   ulint data_len;
1284   rtr_rec_t rtr_rec;
1285 
1286   buf = match_rec->block.frame + match_rec->used;
1287 
1288   copy = rec_copy(buf, rec, offsets);
1289 
1290   if (is_comp) {
1291     rec_set_next_offs_new(copy, PAGE_NEW_SUPREMUM);
1292   } else {
1293     rec_set_next_offs_old(copy, PAGE_OLD_SUPREMUM);
1294   }
1295 
1296   rtr_rec.r_rec = copy;
1297   rtr_rec.locked = false;
1298 
1299   match_rec->matched_recs->push_back(rtr_rec);
1300   match_rec->valid = true;
1301 
1302   data_len = rec_offs_data_size(offsets) + rec_offs_extra_size(offsets);
1303   match_rec->used += data_len;
1304 
1305   ut_ad(match_rec->used < UNIV_PAGE_SIZE);
1306 }
1307 
1308 /** Store the parent path cursor
1309  @return number of cursor stored */
rtr_store_parent_path(const buf_block_t * block,btr_cur_t * btr_cur,ulint latch_mode,ulint level,mtr_t * mtr)1310 ulint rtr_store_parent_path(
1311     const buf_block_t *block, /*!< in: block of the page */
1312     btr_cur_t *btr_cur,       /*!< in/out: persistent cursor */
1313     ulint latch_mode,
1314     /*!< in: latch_mode */
1315     ulint level, /*!< in: index level */
1316     mtr_t *mtr)  /*!< in: mtr */
1317 {
1318   ulint num = btr_cur->rtr_info->parent_path->size();
1319   ulint num_stored = 0;
1320 
1321   while (num >= 1) {
1322     node_visit_t *node = &(*btr_cur->rtr_info->parent_path)[num - 1];
1323     btr_pcur_t *r_cursor = node->cursor;
1324     buf_block_t *cur_block;
1325 
1326     if (node->level > level) {
1327       break;
1328     }
1329 
1330     r_cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
1331     r_cursor->m_latch_mode = latch_mode;
1332 
1333     cur_block = btr_pcur_get_block(r_cursor);
1334 
1335     if (cur_block == block) {
1336       btr_pcur_store_position(r_cursor, mtr);
1337       num_stored++;
1338     } else {
1339       break;
1340     }
1341 
1342     num--;
1343   }
1344 
1345   return (num_stored);
1346 }
1347 /** push a nonleaf index node to the search path for insertion */
rtr_non_leaf_insert_stack_push(dict_index_t * index,rtr_node_path_t * path,ulint level,page_no_t child_no,const buf_block_t * block,const rec_t * rec,double mbr_inc)1348 static void rtr_non_leaf_insert_stack_push(
1349     dict_index_t *index,      /*!< in: index descriptor */
1350     rtr_node_path_t *path,    /*!< in/out: search path */
1351     ulint level,              /*!< in: index page level */
1352     page_no_t child_no,       /*!< in: child page no */
1353     const buf_block_t *block, /*!< in: block of the page */
1354     const rec_t *rec,         /*!< in: positioned record */
1355     double mbr_inc)           /*!< in: MBR needs to be enlarged */
1356 {
1357   node_seq_t new_seq;
1358   btr_pcur_t *my_cursor;
1359   page_no_t page_no = block->page.id.page_no();
1360 
1361   my_cursor = static_cast<btr_pcur_t *>(ut_malloc_nokey(sizeof(*my_cursor)));
1362 
1363   btr_pcur_init(my_cursor);
1364 
1365   page_cur_position(rec, block, btr_pcur_get_page_cur(my_cursor));
1366 
1367   (btr_pcur_get_btr_cur(my_cursor))->index = index;
1368 
1369   new_seq = rtr_get_current_ssn_id(index);
1370   rtr_non_leaf_stack_push(path, page_no, new_seq, level, child_no, my_cursor,
1371                           mbr_inc);
1372 }
1373 
1374 /** Copy a buf_block_t strcuture, except "block->lock", "block->mutex" and
1375 "block->debug_latch."
1376 @param[in,out]	matches	copy to match->block
1377 @param[in]	block	block to copy */
rtr_copy_buf(matched_rec_t * matches,const buf_block_t * block)1378 static void rtr_copy_buf(matched_rec_t *matches, const buf_block_t *block) {
1379   /* Copy all members of "block" to "matches->block" except "mutex"
1380   and "lock". We skip "mutex" and "lock" because they are not used
1381   from the dummy buf_block_t we create here and because memcpy()ing
1382   them generates (valid) compiler warnings that the vtable pointer
1383   will be copied. It is also undefined what will happen with the
1384   newly memcpy()ed mutex if the source mutex was acquired by
1385   (another) thread while it was copied. */
1386   new (&matches->block.page) buf_page_t(block->page);
1387   matches->block.frame = block->frame;
1388   matches->block.unzip_LRU = block->unzip_LRU;
1389 
1390   ut_d(matches->block.in_unzip_LRU_list = block->in_unzip_LRU_list);
1391   ut_d(matches->block.in_withdraw_list = block->in_withdraw_list);
1392 
1393   /* Skip buf_block_t::mutex */
1394   /* Skip buf_block_t::lock */
1395   matches->block.lock_hash_val = block->lock_hash_val;
1396   matches->block.modify_clock = block->modify_clock;
1397   matches->block.n_hash_helps = block->n_hash_helps;
1398   matches->block.n_fields = block->n_fields;
1399   matches->block.left_side = block->left_side;
1400 #if defined UNIV_AHI_DEBUG || defined UNIV_DEBUG
1401   matches->block.n_pointers = block->n_pointers;
1402 #endif /* UNIV_AHI_DEBUG || UNIV_DEBUG */
1403   matches->block.curr_n_fields = block->curr_n_fields;
1404   matches->block.curr_left_side = block->curr_left_side;
1405   matches->block.index = block->index;
1406   matches->block.made_dirty_with_no_latch = block->made_dirty_with_no_latch;
1407 
1408 #ifndef UNIV_HOTBACKUP
1409 #ifdef UNIV_DEBUG
1410   /* The buf_block_t copy does not contain a valid debug_latch object, mark it
1411   as invalid so that we can detect any uses in our valgrind tests. Copy
1412   semantics for locks are not well defined, so the previous code which simply
1413   defined the default copy assignment operator was not correct. This was changed
1414   because an std::atomic<bool> member was added and rw_lock_t became explicitly
1415   non copyable. */
1416   UNIV_MEM_INVALID(&matches->block.debug_latch, sizeof(rw_lock_t));
1417 #endif /* UNIV_DEBUG */
1418 #endif /* !UNIV_HOTBACKUP */
1419 }
1420 
1421 /** Generate a shadow copy of the page block header to save the
1422  matched records */
rtr_init_match(matched_rec_t * matches,const buf_block_t * block,const page_t * page)1423 static void rtr_init_match(
1424     matched_rec_t *matches,   /*!< in/out: match to initialize */
1425     const buf_block_t *block, /*!< in: buffer block */
1426     const page_t *page)       /*!< in: buffer page */
1427 {
1428   ut_ad(matches->matched_recs->empty());
1429   matches->locked = false;
1430   rtr_copy_buf(matches, block);
1431   matches->block.frame = matches->bufp;
1432   matches->valid = false;
1433   /* We have to copy PAGE_W*_SUPREMUM_END bytes so that we can
1434   use infimum/supremum of this page as normal btr page for search. */
1435   memcpy(matches->block.frame, page,
1436          page_is_comp(page) ? PAGE_NEW_SUPREMUM_END : PAGE_OLD_SUPREMUM_END);
1437   matches->used =
1438       page_is_comp(page) ? PAGE_NEW_SUPREMUM_END : PAGE_OLD_SUPREMUM_END;
1439 #ifdef RTR_SEARCH_DIAGNOSTIC
1440   ulint pageno = page_get_page_no(page);
1441   fprintf(stderr, "INNODB_RTR: Searching leaf page %d\n",
1442           static_cast<int>(pageno));
1443 #endif /* RTR_SEARCH_DIAGNOSTIC */
1444 }
1445 
1446 /** Get the bounding box content from an index record */
rtr_get_mbr_from_rec(const rec_t * rec,const ulint * offsets,rtr_mbr_t * mbr)1447 void rtr_get_mbr_from_rec(const rec_t *rec,     /*!< in: data tuple */
1448                           const ulint *offsets, /*!< in: offsets array */
1449                           rtr_mbr_t *mbr)       /*!< out MBR */
1450 {
1451   ulint rec_f_len;
1452   const byte *data;
1453 
1454   data = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1455 
1456   rtr_read_mbr(data, mbr);
1457 }
1458 
1459 /** Get the bounding box content from a MBR data record */
rtr_get_mbr_from_tuple(const dtuple_t * dtuple,rtr_mbr * mbr)1460 void rtr_get_mbr_from_tuple(const dtuple_t *dtuple, /*!< in: data tuple */
1461                             rtr_mbr *mbr)           /*!< out: mbr to fill */
1462 {
1463   const dfield_t *dtuple_field;
1464   ulint dtuple_f_len;
1465   byte *data;
1466 
1467   dtuple_field = dtuple_get_nth_field(dtuple, 0);
1468   dtuple_f_len = dfield_get_len(dtuple_field);
1469   ut_a(dtuple_f_len >= 4 * sizeof(double));
1470 
1471   data = static_cast<byte *>(dfield_get_data(dtuple_field));
1472 
1473   rtr_read_mbr(data, mbr);
1474 }
1475 
1476 /** Searches the right position in rtree for a page cursor. */
rtr_cur_search_with_match(const buf_block_t * block,dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,page_cur_t * cursor,rtr_info_t * rtr_info)1477 bool rtr_cur_search_with_match(
1478     const buf_block_t *block, /*!< in: buffer block */
1479     dict_index_t *index,      /*!< in: index descriptor */
1480     const dtuple_t *tuple,    /*!< in: data tuple */
1481     page_cur_mode_t mode,     /*!< in: PAGE_CUR_RTREE_INSERT,
1482                               PAGE_CUR_RTREE_LOCATE etc. */
1483     page_cur_t *cursor,       /*!< in/out: page cursor */
1484     rtr_info_t *rtr_info)     /*!< in/out: search stack */
1485 {
1486   bool found = false;
1487   const page_t *page;
1488   const rec_t *rec;
1489   const rec_t *last_rec;
1490   ulint offsets_[REC_OFFS_NORMAL_SIZE];
1491   ulint *offsets = offsets_;
1492   mem_heap_t *heap = nullptr;
1493   int cmp = 1;
1494   bool is_leaf;
1495   double least_inc = DBL_MAX;
1496   const rec_t *best_rec;
1497   const rec_t *last_match_rec = nullptr;
1498   ulint level;
1499   bool match_init = false;
1500   space_id_t space = block->page.id.space();
1501   page_cur_mode_t orig_mode = mode;
1502   const rec_t *first_rec = nullptr;
1503 
1504   rec_offs_init(offsets_);
1505 
1506   ut_ad(RTREE_SEARCH_MODE(mode));
1507 
1508   ut_ad(dict_index_is_spatial(index));
1509 
1510   page = buf_block_get_frame(block);
1511 
1512   is_leaf = page_is_leaf(page);
1513   level = btr_page_get_level(page, mtr);
1514 
1515   if (mode == PAGE_CUR_RTREE_LOCATE) {
1516     ut_ad(level != 0);
1517     mode = PAGE_CUR_WITHIN;
1518   }
1519 
1520   rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0));
1521 
1522   last_rec = rec;
1523   best_rec = rec;
1524 
1525   if (page_rec_is_infimum(rec)) {
1526     rec = page_rec_get_next_const(rec);
1527   }
1528 
1529   /* Check insert tuple size is larger than first rec, and try to
1530   avoid it if possible */
1531   if (mode == PAGE_CUR_RTREE_INSERT && !page_rec_is_supremum(rec)) {
1532     ulint new_rec_size = rec_get_converted_size(index, tuple);
1533 
1534     offsets = rec_get_offsets(rec, index, offsets,
1535                               dtuple_get_n_fields_cmp(tuple), &heap);
1536 
1537     if (rec_offs_size(offsets) < new_rec_size) {
1538       first_rec = rec;
1539     }
1540 
1541     /* If this is the left-most page of this index level
1542     and the table is a compressed table, try to avoid
1543     first page as much as possible, as there will be problem
1544     when update MIN_REC rec in compress table */
1545     if (buf_block_get_page_zip(block) &&
1546         mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL &&
1547         page_get_n_recs(page) >= 2) {
1548       rec = page_rec_get_next_const(rec);
1549     }
1550   }
1551 
1552   while (!page_rec_is_supremum(rec)) {
1553     offsets = rec_get_offsets(rec, index, offsets,
1554                               dtuple_get_n_fields_cmp(tuple), &heap);
1555     if (!is_leaf) {
1556       switch (mode) {
1557         case PAGE_CUR_CONTAIN:
1558         case PAGE_CUR_INTERSECT:
1559         case PAGE_CUR_MBR_EQUAL:
1560           /* At non-leaf level, we will need to check
1561           both CONTAIN and INTERSECT for either of
1562           the search mode */
1563           cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, PAGE_CUR_CONTAIN,
1564                                         index->rtr_srs.get());
1565 
1566           if (cmp != 0) {
1567             cmp = cmp_dtuple_rec_with_gis(
1568                 tuple, rec, offsets, PAGE_CUR_INTERSECT, index->rtr_srs.get());
1569           }
1570           break;
1571         case PAGE_CUR_DISJOINT:
1572           cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, mode,
1573                                         index->rtr_srs.get());
1574 
1575           if (cmp != 0) {
1576             cmp = cmp_dtuple_rec_with_gis(
1577                 tuple, rec, offsets, PAGE_CUR_INTERSECT, index->rtr_srs.get());
1578           }
1579           break;
1580         case PAGE_CUR_RTREE_INSERT:
1581           double increase;
1582           double area;
1583 
1584           cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, PAGE_CUR_WITHIN,
1585                                         index->rtr_srs.get());
1586 
1587           if (cmp != 0) {
1588             increase = rtr_rec_cal_increase(tuple, rec, offsets, &area,
1589                                             index->rtr_srs.get());
1590             /* Once it goes beyond DBL_MAX or
1591             they are both DBL_MAX, it would not
1592             make sense to record such value,
1593             just make it DBL_MAX / 2  */
1594             if (increase >= DBL_MAX || (increase == 0 && std::isfinite(area))) {
1595               increase = DBL_MAX / 2;
1596             }
1597 
1598             if (increase < least_inc) {
1599               least_inc = increase;
1600               best_rec = rec;
1601             } else if (best_rec && best_rec == first_rec) {
1602               /* if first_rec is set,
1603               we will try to avoid it */
1604               least_inc = increase;
1605               best_rec = rec;
1606             }
1607           }
1608           break;
1609         case PAGE_CUR_RTREE_GET_FATHER:
1610           cmp = cmp_dtuple_rec_with_gis_internal(tuple, rec, offsets,
1611                                                  index->rtr_srs.get());
1612           break;
1613         default:
1614           /* WITHIN etc. */
1615           cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, mode,
1616                                         index->rtr_srs.get());
1617       }
1618     } else {
1619       /* At leaf level, INSERT should translate to LE */
1620       ut_ad(mode != PAGE_CUR_RTREE_INSERT);
1621 
1622       cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, mode,
1623                                     index->rtr_srs.get());
1624     }
1625 
1626     if (cmp == 0) {
1627       found = true;
1628 
1629       /* If located, the matching node/rec will be pushed
1630       to rtr_info->path for non-leaf nodes, or
1631       rtr_info->matches for leaf nodes */
1632       if (rtr_info && mode != PAGE_CUR_RTREE_INSERT) {
1633         if (!is_leaf) {
1634           page_no_t page_no;
1635           node_seq_t new_seq;
1636           bool is_loc;
1637 
1638           is_loc = (orig_mode == PAGE_CUR_RTREE_LOCATE ||
1639                     orig_mode == PAGE_CUR_RTREE_GET_FATHER);
1640 
1641           offsets =
1642               rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1643 
1644           page_no = btr_node_ptr_get_child_page_no(rec, offsets);
1645 
1646           ut_ad(level >= 1);
1647 
1648           /* Get current SSN, before we insert
1649           it into the path stack */
1650           new_seq = rtr_get_current_ssn_id(index);
1651 
1652           rtr_non_leaf_stack_push(rtr_info->path, page_no, new_seq, level - 1,
1653                                   0, nullptr, 0);
1654 
1655           if (is_loc) {
1656             rtr_non_leaf_insert_stack_push(index, rtr_info->parent_path, level,
1657                                            page_no, block, rec, 0);
1658           }
1659 
1660           if (!srv_read_only_mode && (rtr_info->need_page_lock || !is_loc)) {
1661             /* Lock the page, preventing it
1662             from being shrunk */
1663             lock_place_prdt_page_lock({space, page_no}, index, rtr_info->thr);
1664           }
1665         } else {
1666           ut_ad(orig_mode != PAGE_CUR_RTREE_LOCATE);
1667 
1668           if (!match_init) {
1669             rtr_init_match(rtr_info->matches, block, page);
1670             match_init = true;
1671           }
1672 
1673           /* Collect matched records on page */
1674           offsets =
1675               rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1676           rtr_leaf_push_match_rec(rec, rtr_info, offsets, page_is_comp(page));
1677         }
1678 
1679         last_match_rec = rec;
1680       } else {
1681         /* This is the insertion case, it will break
1682         once it finds the first MBR that can accomodate
1683         the inserting rec */
1684         break;
1685       }
1686     }
1687 
1688     last_rec = rec;
1689 
1690     rec = page_rec_get_next_const(rec);
1691   }
1692 
1693   /* All records on page are searched */
1694   if (page_rec_is_supremum(rec)) {
1695     if (!is_leaf) {
1696       if (!found) {
1697         /* No match case, if it is for insertion,
1698         then we select the record that result in
1699         least increased area */
1700         if (mode == PAGE_CUR_RTREE_INSERT) {
1701           page_no_t child_no;
1702           ut_ad(least_inc < DBL_MAX);
1703           offsets =
1704               rec_get_offsets(best_rec, index, offsets, ULINT_UNDEFINED, &heap);
1705           child_no = btr_node_ptr_get_child_page_no(best_rec, offsets);
1706 
1707           rtr_non_leaf_insert_stack_push(index, rtr_info->parent_path, level,
1708                                          child_no, block, best_rec, least_inc);
1709 
1710           page_cur_position(best_rec, block, cursor);
1711           rtr_info->mbr_adj = true;
1712         } else {
1713           /* Position at the last rec of the
1714           page, if it is not the leaf page */
1715           page_cur_position(last_rec, block, cursor);
1716         }
1717       } else {
1718         /* There are matching records, position
1719         in the last matching records */
1720         if (rtr_info) {
1721           rec = last_match_rec;
1722           page_cur_position(rec, block, cursor);
1723         }
1724       }
1725     } else if (rtr_info) {
1726       /* Leaf level, no match, position at the
1727       last (supremum) rec */
1728       if (!last_match_rec) {
1729         page_cur_position(rec, block, cursor);
1730         goto func_exit;
1731       }
1732 
1733       /* There are matched records */
1734       matched_rec_t *match_rec = rtr_info->matches;
1735 
1736       rtr_rec_t test_rec;
1737 
1738       test_rec = match_rec->matched_recs->back();
1739 #ifdef UNIV_DEBUG
1740       ulint offsets_2[REC_OFFS_NORMAL_SIZE];
1741       ulint *offsets2 = offsets_2;
1742       rec_offs_init(offsets_2);
1743 
1744       ut_ad(found);
1745 
1746       /* Verify the record to be positioned is the same
1747       as the last record in matched_rec vector */
1748       offsets2 = rec_get_offsets(test_rec.r_rec, index, offsets2,
1749                                  ULINT_UNDEFINED, &heap);
1750 
1751       offsets = rec_get_offsets(last_match_rec, index, offsets, ULINT_UNDEFINED,
1752                                 &heap);
1753 
1754       ut_ad(cmp_rec_rec(test_rec.r_rec, last_match_rec, offsets2, offsets,
1755                         index, false) == 0);
1756 #endif /* UNIV_DEBUG */
1757       /* Pop the last match record and position on it */
1758       match_rec->matched_recs->pop_back();
1759       page_cur_position(test_rec.r_rec, &match_rec->block, cursor);
1760     }
1761   } else {
1762     if (mode == PAGE_CUR_RTREE_INSERT) {
1763       page_no_t child_no;
1764       ut_ad(!last_match_rec && rec);
1765 
1766       offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1767 
1768       child_no = btr_node_ptr_get_child_page_no(rec, offsets);
1769 
1770       rtr_non_leaf_insert_stack_push(index, rtr_info->parent_path, level,
1771                                      child_no, block, rec, 0);
1772 
1773     } else if (rtr_info && found && !is_leaf) {
1774       rec = last_match_rec;
1775     }
1776 
1777     page_cur_position(rec, block, cursor);
1778   }
1779 
1780 #ifdef UNIV_DEBUG
1781   /* Verify that we are positioned at the same child page as pushed in
1782   the path stack */
1783   if (!is_leaf && (!page_rec_is_supremum(rec) || found) &&
1784       mode != PAGE_CUR_RTREE_INSERT) {
1785     page_no_t page_no;
1786 
1787     offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1788     page_no = btr_node_ptr_get_child_page_no(rec, offsets);
1789 
1790     if (rtr_info && found) {
1791       rtr_node_path_t *path = rtr_info->path;
1792       node_visit_t last_visit = path->back();
1793 
1794       ut_ad(last_visit.page_no == page_no);
1795     }
1796   }
1797 #endif /* UNIV_DEBUG */
1798 
1799 func_exit:
1800   if (UNIV_LIKELY_NULL(heap)) {
1801     mem_heap_free(heap);
1802   }
1803 
1804   return (found);
1805 }
1806