1 /*****************************************************************************
2
3 Copyright (c) 2014, 2020, Oracle and/or its affiliates. All rights reserved.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25 *****************************************************************************/
26
27 /** @file gis/gis0sea.cc
28 InnoDB R-tree search interfaces
29
30 Created 2014/01/16 Jimmy Yang
31 ***********************************************************************/
32
33 #include <new>
34
35 #include "fsp0fsp.h"
36 #include "gis0rtree.h"
37 #include "my_dbug.h"
38 #include "page0cur.h"
39 #include "page0page.h"
40 #include "page0zip.h"
41
42 #include "btr0cur.h"
43 #include "btr0pcur.h"
44 #include "btr0sea.h"
45 #include "ibuf0ibuf.h"
46 #include "lock0lock.h"
47 #include "rem0cmp.h"
48 #include "srv0mon.h"
49 #include "sync0sync.h"
50 #include "trx0trx.h"
51 #include "univ.i"
52
53 /** Restore the stored position of a persistent cursor bufferfixing the page */
54 static bool rtr_cur_restore_position(
55 ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */
56 btr_cur_t *cursor, /*!< in: detached persistent cursor */
57 ulint level, /*!< in: index level */
58 mtr_t *mtr); /*!< in: mtr */
59
60 /** Pop out used parent path entry, until we find the parent with matching
61 page number */
rtr_adjust_parent_path(rtr_info_t * rtr_info,page_no_t page_no)62 static void rtr_adjust_parent_path(
63 rtr_info_t *rtr_info, /* R-Tree info struct */
64 page_no_t page_no) /* page number to look for */
65 {
66 while (!rtr_info->parent_path->empty()) {
67 if (rtr_info->parent_path->back().child_no == page_no) {
68 break;
69 } else {
70 if (rtr_info->parent_path->back().cursor) {
71 btr_pcur_close(rtr_info->parent_path->back().cursor);
72 ut_free(rtr_info->parent_path->back().cursor);
73 }
74
75 rtr_info->parent_path->pop_back();
76 }
77 }
78 }
79
80 /** Find the next matching record. This function is used by search
81 or record locating during index delete/update.
82 @return true if there is suitable record found, otherwise false */
rtr_pcur_getnext_from_path(const dtuple_t * tuple,page_cur_mode_t mode,btr_cur_t * btr_cur,ulint target_level,ulint latch_mode,bool index_locked,mtr_t * mtr)83 static bool rtr_pcur_getnext_from_path(
84 const dtuple_t *tuple, /*!< in: data tuple */
85 page_cur_mode_t mode, /*!< in: cursor search mode */
86 btr_cur_t *btr_cur, /*!< in: persistent cursor; NOTE that the
87 function may release the page latch */
88 ulint target_level,
89 /*!< in: target level */
90 ulint latch_mode,
91 /*!< in: latch_mode */
92 bool index_locked,
93 /*!< in: index tree locked */
94 mtr_t *mtr) /*!< in: mtr */
95 {
96 dict_index_t *index = btr_cur->index;
97 bool found = false;
98 space_id_t space = dict_index_get_space(index);
99 page_cur_t *page_cursor;
100 ulint level = 0;
101 node_visit_t next_rec;
102 rtr_info_t *rtr_info = btr_cur->rtr_info;
103 node_seq_t page_ssn;
104 ulint my_latch_mode;
105 ulint skip_parent = false;
106 bool new_split = false;
107 bool need_parent;
108 bool for_delete = false;
109 bool for_undo_ins = false;
110
111 /* exhausted all the pages to be searched */
112 if (rtr_info->path->empty()) {
113 return (false);
114 }
115
116 ut_ad(dtuple_get_n_fields_cmp(tuple));
117
118 my_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
119
120 for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
121 for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
122
123 /* There should be no insert coming to this function. Only
124 mode with BTR_MODIFY_* should be delete */
125 ut_ad(mode != PAGE_CUR_RTREE_INSERT);
126 ut_ad(my_latch_mode == BTR_SEARCH_LEAF || my_latch_mode == BTR_MODIFY_LEAF ||
127 my_latch_mode == BTR_MODIFY_TREE ||
128 my_latch_mode == BTR_CONT_MODIFY_TREE);
129
130 /* Whether need to track parent information. Only need so
131 when we do tree altering operations (such as index page merge) */
132 need_parent = ((my_latch_mode == BTR_MODIFY_TREE ||
133 my_latch_mode == BTR_CONT_MODIFY_TREE) &&
134 mode == PAGE_CUR_RTREE_LOCATE);
135
136 if (!index_locked) {
137 ut_ad(latch_mode & BTR_SEARCH_LEAF || latch_mode & BTR_MODIFY_LEAF);
138 mtr_s_lock(dict_index_get_lock(index), mtr);
139 } else {
140 ut_ad(
141 mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_SX_LOCK) ||
142 mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_S_LOCK) ||
143 mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_X_LOCK));
144 }
145
146 const page_size_t &page_size = dict_table_page_size(index->table);
147
148 /* Pop each node/page to be searched from "path" structure
149 and do a search on it. Please note, any pages that are in
150 the "path" structure are protected by "page" lock, so tey
151 cannot be shrunk away */
152 do {
153 buf_block_t *block;
154 node_seq_t path_ssn;
155 const page_t *page;
156 ulint rw_latch = RW_X_LATCH;
157 ulint tree_idx;
158
159 mutex_enter(&rtr_info->rtr_path_mutex);
160 next_rec = rtr_info->path->back();
161 rtr_info->path->pop_back();
162 level = next_rec.level;
163 path_ssn = next_rec.seq_no;
164 tree_idx = btr_cur->tree_height - level - 1;
165
166 /* Maintain the parent path info as well, if needed */
167 if (need_parent && !skip_parent && !new_split) {
168 ulint old_level;
169 ulint new_level;
170
171 ut_ad(!rtr_info->parent_path->empty());
172
173 /* Cleanup unused parent info */
174 if (rtr_info->parent_path->back().cursor) {
175 btr_pcur_close(rtr_info->parent_path->back().cursor);
176 ut_free(rtr_info->parent_path->back().cursor);
177 }
178
179 old_level = rtr_info->parent_path->back().level;
180
181 rtr_info->parent_path->pop_back();
182
183 ut_ad(!rtr_info->parent_path->empty());
184
185 /* check whether there is a level change. If so,
186 the current parent path needs to pop enough
187 nodes to adjust to the new search page */
188 new_level = rtr_info->parent_path->back().level;
189
190 if (old_level < new_level) {
191 rtr_adjust_parent_path(rtr_info, next_rec.page_no);
192 }
193
194 ut_ad(!rtr_info->parent_path->empty());
195
196 ut_ad(next_rec.page_no == rtr_info->parent_path->back().child_no);
197 }
198
199 mutex_exit(&rtr_info->rtr_path_mutex);
200
201 skip_parent = false;
202 new_split = false;
203
204 /* Once we have pages in "path", these pages are
205 predicate page locked, so they can't be shrunk away.
206 They also have SSN (split sequence number) to detect
207 splits, so we can directly latch single page while
208 getting them. They can be unlatched if not qualified.
209 One reason for pre-latch is that we might need to position
210 some parent position (requires latch) during search */
211 if (level == 0) {
212 /* S latched for SEARCH_LEAF, and X latched
213 for MODIFY_LEAF */
214 if (my_latch_mode <= BTR_MODIFY_LEAF) {
215 rw_latch = my_latch_mode;
216 }
217
218 if (my_latch_mode == BTR_CONT_MODIFY_TREE ||
219 my_latch_mode == BTR_MODIFY_TREE) {
220 rw_latch = RW_NO_LATCH;
221 }
222
223 } else if (level == target_level) {
224 rw_latch = RW_X_LATCH;
225 }
226
227 /* Release previous locked blocks */
228 if (my_latch_mode != BTR_SEARCH_LEAF) {
229 for (ulint idx = 0; idx < btr_cur->tree_height; idx++) {
230 if (rtr_info->tree_blocks[idx]) {
231 mtr_release_block_at_savepoint(mtr, rtr_info->tree_savepoints[idx],
232 rtr_info->tree_blocks[idx]);
233 rtr_info->tree_blocks[idx] = nullptr;
234 }
235 }
236 for (ulint idx = RTR_MAX_LEVELS; idx < RTR_MAX_LEVELS + 3; idx++) {
237 if (rtr_info->tree_blocks[idx]) {
238 mtr_release_block_at_savepoint(mtr, rtr_info->tree_savepoints[idx],
239 rtr_info->tree_blocks[idx]);
240 rtr_info->tree_blocks[idx] = nullptr;
241 }
242 }
243 }
244
245 /* set up savepoint to record any locks to be taken */
246 rtr_info->tree_savepoints[tree_idx] = mtr_set_savepoint(mtr);
247
248 #ifdef UNIV_RTR_DEBUG
249 ut_ad(!(rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_X) ||
250 rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_S)) ||
251 my_latch_mode == BTR_MODIFY_TREE ||
252 my_latch_mode == BTR_CONT_MODIFY_TREE ||
253 !page_is_leaf(buf_block_get_frame(btr_cur->page_cur.block)));
254 #endif /* UNIV_RTR_DEBUG */
255
256 page_id_t page_id(space, next_rec.page_no);
257
258 block = buf_page_get_gen(page_id, page_size, rw_latch, nullptr,
259 Page_fetch::NORMAL, __FILE__, __LINE__, mtr);
260
261 if (block == nullptr) {
262 continue;
263 } else if (rw_latch != RW_NO_LATCH) {
264 ut_ad(!dict_index_is_ibuf(index));
265 buf_block_dbg_add_level(block, SYNC_TREE_NODE);
266 }
267
268 rtr_info->tree_blocks[tree_idx] = block;
269
270 page = buf_block_get_frame(block);
271 page_ssn = page_get_ssn_id(page);
272
273 /* If there are splits, push the splitted page.
274 Note that we have SX lock on index->lock, there
275 should not be any split/shrink happening here */
276 if (page_ssn > path_ssn) {
277 page_no_t next_page_no = btr_page_get_next(page, mtr);
278 rtr_non_leaf_stack_push(rtr_info->path, next_page_no, path_ssn, level, 0,
279 nullptr, 0);
280
281 if (!srv_read_only_mode && mode != PAGE_CUR_RTREE_INSERT &&
282 mode != PAGE_CUR_RTREE_LOCATE) {
283 ut_ad(rtr_info->thr);
284 lock_place_prdt_page_lock({space, next_page_no}, index, rtr_info->thr);
285 }
286 new_split = true;
287 #ifdef UNIV_GIS_DEBUG
288 fprintf(stderr, "GIS_DIAG: Splitted page found: %d, %ld\n",
289 static_cast<int>(need_parent), next_page_no);
290 #endif
291 }
292
293 page_cursor = btr_cur_get_page_cur(btr_cur);
294 page_cursor->rec = nullptr;
295
296 if (mode == PAGE_CUR_RTREE_LOCATE) {
297 if (level == target_level && level == 0) {
298 ulint low_match;
299
300 found = false;
301
302 low_match = page_cur_search(block, index, tuple, PAGE_CUR_LE,
303 btr_cur_get_page_cur(btr_cur));
304
305 if (low_match == dtuple_get_n_fields_cmp(tuple)) {
306 rec_t *rec = btr_cur_get_rec(btr_cur);
307
308 if (!rec_get_deleted_flag(rec, dict_table_is_comp(index->table)) ||
309 (!for_delete && !for_undo_ins)) {
310 found = true;
311 btr_cur->low_match = low_match;
312 } else {
313 /* mark we found deleted row */
314 btr_cur->rtr_info->fd_del = true;
315 }
316 }
317 } else {
318 page_cur_mode_t page_mode = mode;
319
320 if (level == target_level && target_level != 0) {
321 page_mode = PAGE_CUR_RTREE_GET_FATHER;
322 }
323 found = rtr_cur_search_with_match(block, index, tuple, page_mode,
324 page_cursor, btr_cur->rtr_info);
325
326 /* Save the position of parent if needed */
327 if (found && need_parent) {
328 btr_pcur_t *r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
329
330 rec_t *rec = page_cur_get_rec(page_cursor);
331 page_cur_position(rec, block, btr_pcur_get_page_cur(r_cursor));
332 r_cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
333 r_cursor->m_latch_mode = my_latch_mode;
334 btr_pcur_store_position(r_cursor, mtr);
335 #ifdef UNIV_DEBUG
336 ulint num_stored =
337 rtr_store_parent_path(block, btr_cur, rw_latch, level, mtr);
338 ut_ad(num_stored > 0);
339 #else
340 rtr_store_parent_path(block, btr_cur, rw_latch, level, mtr);
341 #endif /* UNIV_DEBUG */
342 }
343 }
344 } else {
345 found = rtr_cur_search_with_match(block, index, tuple, mode, page_cursor,
346 btr_cur->rtr_info);
347 }
348
349 /* Attach predicate lock if needed, no matter whether
350 there are matched records */
351 if (mode != PAGE_CUR_RTREE_INSERT && mode != PAGE_CUR_RTREE_LOCATE &&
352 mode >= PAGE_CUR_CONTAIN && btr_cur->rtr_info->need_prdt_lock) {
353 lock_prdt_t prdt;
354
355 trx_t *trx = thr_get_trx(btr_cur->rtr_info->thr);
356 trx_mutex_enter(trx);
357 lock_init_prdt_from_mbr(&prdt, &btr_cur->rtr_info->mbr, mode,
358 trx->lock.lock_heap);
359 trx_mutex_exit(trx);
360
361 if (rw_latch == RW_NO_LATCH) {
362 rw_lock_s_lock(&(block->lock));
363 }
364
365 lock_prdt_lock(block, &prdt, index, LOCK_S, LOCK_PREDICATE,
366 btr_cur->rtr_info->thr, mtr);
367
368 if (rw_latch == RW_NO_LATCH) {
369 rw_lock_s_unlock(&(block->lock));
370 }
371 }
372
373 if (found) {
374 if (level == target_level) {
375 page_cur_t *r_cur;
376 ;
377
378 if (my_latch_mode == BTR_MODIFY_TREE && level == 0) {
379 ut_ad(rw_latch == RW_NO_LATCH);
380 page_id_t my_page_id(space, block->page.id.page_no());
381
382 btr_cur_latch_leaves(block, my_page_id, page_size, BTR_MODIFY_TREE,
383 btr_cur, mtr);
384 }
385
386 r_cur = btr_cur_get_page_cur(btr_cur);
387
388 page_cur_position(page_cur_get_rec(page_cursor),
389 page_cur_get_block(page_cursor), r_cur);
390
391 btr_cur->low_match = level != 0 ? DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1
392 : btr_cur->low_match;
393 break;
394 }
395
396 /* Keep the parent path node, which points to
397 last node just located */
398 skip_parent = true;
399 } else {
400 /* Release latch on the current page */
401 ut_ad(rtr_info->tree_blocks[tree_idx]);
402
403 mtr_release_block_at_savepoint(mtr, rtr_info->tree_savepoints[tree_idx],
404 rtr_info->tree_blocks[tree_idx]);
405 rtr_info->tree_blocks[tree_idx] = nullptr;
406 }
407
408 } while (!rtr_info->path->empty());
409
410 const rec_t *rec = btr_cur_get_rec(btr_cur);
411
412 if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
413 mtr_commit(mtr);
414 mtr_start(mtr);
415 } else if (!index_locked) {
416 mtr_memo_release(mtr, dict_index_get_lock(index), MTR_MEMO_X_LOCK);
417 }
418
419 return (found);
420 }
421
422 /** Find the next matching record. This function will first exhaust
423 the copied record listed in the rtr_info->matches vector before
424 moving to next page
425 @param[in] tuple data tuple; NOTE: n_fields_cmp in tuple
426 must be set so that it cannot get compared
427 to the node ptr page number field!
428 @param[in] mode cursor search mode
429 @param[in] sel_mode select mode: SELECT_ORDINARY,
430 SELECT_SKIP_LOKCED, or SELECT_NO_WAIT
431 @param[in] cursor persistent cursor; NOTE that the function
432 may release the page latch
433 @param[in] cur_level current level
434 @param[in] mtr mini-transaction
435 @return true if there is next qualified record found, otherwise(if
436 exhausted) false */
rtr_pcur_move_to_next(const dtuple_t * tuple,page_cur_mode_t mode,select_mode sel_mode,btr_pcur_t * cursor,ulint cur_level,mtr_t * mtr)437 bool rtr_pcur_move_to_next(const dtuple_t *tuple, page_cur_mode_t mode,
438 select_mode sel_mode, btr_pcur_t *cursor,
439 ulint cur_level, mtr_t *mtr) {
440 rtr_info_t *rtr_info = cursor->m_btr_cur.rtr_info;
441
442 ut_a(cursor->m_pos_state == BTR_PCUR_IS_POSITIONED);
443
444 mutex_enter(&rtr_info->matches->rtr_match_mutex);
445 /* First retrieve the next record on the current page */
446 while (!rtr_info->matches->matched_recs->empty()) {
447 rtr_rec_t rec;
448 rec = rtr_info->matches->matched_recs->back();
449 rtr_info->matches->matched_recs->pop_back();
450
451 /* Skip unlocked record
452 Note: CHECK TABLE doesn't hold record locks. */
453 if (sel_mode != SELECT_ORDINARY && !rec.locked) {
454 continue;
455 }
456
457 mutex_exit(&rtr_info->matches->rtr_match_mutex);
458
459 cursor->m_btr_cur.page_cur.rec = rec.r_rec;
460 cursor->m_btr_cur.page_cur.block = &rtr_info->matches->block;
461
462 DEBUG_SYNC_C("rtr_pcur_move_to_next_return");
463 return (true);
464 }
465
466 mutex_exit(&rtr_info->matches->rtr_match_mutex);
467
468 /* Fetch the next page */
469 return (rtr_pcur_getnext_from_path(tuple, mode, &cursor->m_btr_cur, cur_level,
470 cursor->m_latch_mode, false, mtr));
471 }
472
473 /** Check if the cursor holds record pointing to the specified child page
474 @return true if it is (pointing to the child page) false otherwise */
rtr_compare_cursor_rec(dict_index_t * index,btr_cur_t * cursor,page_no_t page_no,mem_heap_t ** heap)475 static bool rtr_compare_cursor_rec(
476 dict_index_t *index, /*!< in: index */
477 btr_cur_t *cursor, /*!< in: Cursor to check */
478 page_no_t page_no, /*!< in: desired child page number */
479 mem_heap_t **heap) /*!< in: memory heap */
480 {
481 const rec_t *rec;
482 ulint *offsets;
483
484 rec = btr_cur_get_rec(cursor);
485
486 offsets = rec_get_offsets(rec, index, nullptr, ULINT_UNDEFINED, heap);
487
488 return (btr_node_ptr_get_child_page_no(rec, offsets) == page_no);
489 }
490
491 /** Initializes and opens a persistent cursor to an index tree. It should be
492 closed with btr_pcur_close. Mainly called by row_search_index_entry() */
rtr_pcur_open_low(dict_index_t * index,ulint level,const dtuple_t * tuple,page_cur_mode_t mode,ulint latch_mode,btr_pcur_t * cursor,const char * file,ulint line,mtr_t * mtr)493 void rtr_pcur_open_low(
494 dict_index_t *index, /*!< in: index */
495 ulint level, /*!< in: level in the rtree */
496 const dtuple_t *tuple, /*!< in: tuple on which search done */
497 page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_LOCATE, ... */
498 ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */
499 btr_pcur_t *cursor, /*!< in: memory buffer for persistent cursor */
500 const char *file, /*!< in: file name */
501 ulint line, /*!< in: line where called */
502 mtr_t *mtr) /*!< in: mtr */
503 {
504 btr_cur_t *btr_cursor;
505 ulint n_fields;
506 ulint low_match;
507 rec_t *rec;
508 bool tree_latched = false;
509 bool for_delete = false;
510 bool for_undo_ins = false;
511
512 ut_ad(level == 0);
513
514 ut_ad(latch_mode & BTR_MODIFY_LEAF || latch_mode & BTR_MODIFY_TREE);
515 ut_ad(mode == PAGE_CUR_RTREE_LOCATE);
516
517 /* Initialize the cursor */
518
519 btr_pcur_init(cursor);
520
521 for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
522 for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
523
524 cursor->m_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
525 cursor->m_search_mode = mode;
526
527 /* Search with the tree cursor */
528
529 btr_cursor = btr_pcur_get_btr_cur(cursor);
530
531 btr_cursor->rtr_info = rtr_create_rtr_info(false, false, btr_cursor, index);
532
533 /* Purge will SX lock the tree instead of take Page Locks */
534 if (btr_cursor->thr) {
535 btr_cursor->rtr_info->need_page_lock = true;
536 btr_cursor->rtr_info->thr = btr_cursor->thr;
537 }
538
539 btr_cur_search_to_nth_level(index, level, tuple, mode, latch_mode, btr_cursor,
540 0, file, line, mtr);
541 cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
542
543 cursor->m_trx_if_known = nullptr;
544
545 low_match = btr_pcur_get_low_match(cursor);
546
547 rec = btr_pcur_get_rec(cursor);
548
549 n_fields = dtuple_get_n_fields(tuple);
550
551 if (latch_mode & BTR_ALREADY_S_LATCHED) {
552 ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_S_LOCK));
553 tree_latched = true;
554 }
555
556 if (latch_mode & BTR_MODIFY_TREE) {
557 ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_X_LOCK) ||
558 mtr_memo_contains(mtr, dict_index_get_lock(index), MTR_MEMO_SX_LOCK));
559 tree_latched = true;
560 }
561
562 if (page_rec_is_infimum(rec) || low_match != n_fields ||
563 (rec_get_deleted_flag(rec, dict_table_is_comp(index->table)) &&
564 (for_delete || for_undo_ins))) {
565 if (rec_get_deleted_flag(rec, dict_table_is_comp(index->table)) &&
566 for_delete) {
567 btr_cursor->rtr_info->fd_del = true;
568 btr_cursor->low_match = 0;
569 }
570 /* Did not find matched row in first dive. Release
571 latched block if any before search more pages */
572 if (latch_mode & BTR_MODIFY_LEAF) {
573 ulint tree_idx = btr_cursor->tree_height - 1;
574 rtr_info_t *rtr_info = btr_cursor->rtr_info;
575
576 ut_ad(level == 0);
577
578 if (rtr_info->tree_blocks[tree_idx]) {
579 mtr_release_block_at_savepoint(mtr, rtr_info->tree_savepoints[tree_idx],
580 rtr_info->tree_blocks[tree_idx]);
581 rtr_info->tree_blocks[tree_idx] = nullptr;
582 }
583 }
584
585 bool ret = rtr_pcur_getnext_from_path(tuple, mode, btr_cursor, level,
586 latch_mode, tree_latched, mtr);
587
588 if (ret) {
589 low_match = btr_pcur_get_low_match(cursor);
590 ut_ad(low_match == n_fields);
591 }
592 }
593 }
594
595 /** Returns the upper level node pointer to a R-Tree page. It is assumed
596 that mtr holds an SX-latch or X-latch on the tree.
597 @return rec_get_offsets() of the node pointer record */
rtr_page_get_father_node_ptr(ulint * offsets,mem_heap_t * heap,btr_cur_t * sea_cur,btr_cur_t * cursor,mtr_t * mtr)598 static ulint *rtr_page_get_father_node_ptr(
599 ulint *offsets, /*!< in: work area for the return value */
600 mem_heap_t *heap, /*!< in: memory heap to use */
601 btr_cur_t *sea_cur, /*!< in: search cursor */
602 btr_cur_t *cursor, /*!< in: cursor pointing to user record,
603 out: cursor on node pointer record,
604 its page x-latched */
605 mtr_t *mtr) /*!< in: mtr */
606 {
607 dtuple_t *tuple;
608 rec_t *user_rec;
609 rec_t *node_ptr;
610 ulint level;
611 page_no_t page_no;
612 dict_index_t *index;
613 rtr_mbr_t mbr;
614
615 page_no = btr_cur_get_block(cursor)->page.id.page_no();
616 index = btr_cur_get_index(cursor);
617
618 ut_ad(srv_read_only_mode ||
619 mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
620 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
621
622 ut_ad(dict_index_get_page(index) != page_no);
623
624 level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
625
626 user_rec = btr_cur_get_rec(cursor);
627 ut_a(page_rec_is_user_rec(user_rec));
628
629 offsets = rec_get_offsets(user_rec, index, offsets, ULINT_UNDEFINED, &heap);
630 rtr_get_mbr_from_rec(user_rec, offsets, &mbr);
631
632 tuple = rtr_index_build_node_ptr(index, &mbr, user_rec, page_no, heap, level);
633
634 if (sea_cur && !sea_cur->rtr_info) {
635 sea_cur = nullptr;
636 }
637
638 rtr_get_father_node(index, level + 1, tuple, sea_cur, cursor, page_no, mtr);
639
640 node_ptr = btr_cur_get_rec(cursor);
641 ut_ad(!page_rec_is_comp(node_ptr) ||
642 rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
643 offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
644
645 page_no_t child_page = btr_node_ptr_get_child_page_no(node_ptr, offsets);
646
647 if (child_page != page_no) {
648 const rec_t *print_rec;
649
650 ib::fatal error;
651
652 error << "Corruption of index " << index->name << " of table "
653 << index->table->name << " parent page " << page_no << " child page "
654 << child_page;
655
656 print_rec = page_rec_get_next(page_get_infimum_rec(page_align(user_rec)));
657 offsets =
658 rec_get_offsets(print_rec, index, offsets, ULINT_UNDEFINED, &heap);
659 error << "; child ";
660 rec_print(error.m_oss, print_rec,
661 rec_get_info_bits(print_rec, rec_offs_comp(offsets)), offsets);
662 offsets = rec_get_offsets(node_ptr, index, offsets, ULINT_UNDEFINED, &heap);
663 error << "; parent ";
664 rec_print(error.m_oss, print_rec,
665 rec_get_info_bits(print_rec, rec_offs_comp(offsets)), offsets);
666
667 error << ". You should dump + drop + reimport the table to"
668 " fix the corruption. If the crash happens at"
669 " database startup, see " REFMAN
670 "forcing-innodb-recovery.html about forcing"
671 " recovery. Then dump + drop + reimport.";
672 }
673
674 return (offsets);
675 }
676
677 /* Get the rtree page father.
678 @param[in] index rtree index
679 @param[in] block child page in the index
680 @param[in] mtr mtr
681 @param[in] sea_cur search cursor, contains information
682 about parent nodes in search
683 @param[in] cursor cursor on node pointer record,
684 its page x-latched */
rtr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)685 void rtr_page_get_father(dict_index_t *index, buf_block_t *block, mtr_t *mtr,
686 btr_cur_t *sea_cur, btr_cur_t *cursor) {
687 mem_heap_t *heap = mem_heap_create(100);
688 #ifdef UNIV_DEBUG
689 ulint *offsets;
690
691 offsets = rtr_page_get_father_block(nullptr, heap, index, block, mtr, sea_cur,
692 cursor);
693
694 ulint page_no = btr_node_ptr_get_child_page_no(cursor->page_cur.rec, offsets);
695
696 ut_ad(page_no == block->page.id.page_no());
697 #else
698 rtr_page_get_father_block(NULL, heap, index, block, mtr, sea_cur, cursor);
699 #endif
700
701 mem_heap_free(heap);
702 }
703
704 /** Returns the father block to a page. It is assumed that mtr holds
705 an X or SX latch on the tree.
706 @return rec_get_offsets() of the node pointer record */
rtr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)707 ulint *rtr_page_get_father_block(
708 ulint *offsets, /*!< in: work area for the return value */
709 mem_heap_t *heap, /*!< in: memory heap to use */
710 dict_index_t *index, /*!< in: b-tree index */
711 buf_block_t *block, /*!< in: child page in the index */
712 mtr_t *mtr, /*!< in: mtr */
713 btr_cur_t *sea_cur, /*!< in: search cursor, contains information
714 about parent nodes in search */
715 btr_cur_t *cursor) /*!< out: cursor on node pointer record,
716 its page x-latched */
717 {
718 rec_t *rec =
719 page_rec_get_next(page_get_infimum_rec(buf_block_get_frame(block)));
720 btr_cur_position(index, rec, block, cursor);
721
722 return (rtr_page_get_father_node_ptr(offsets, heap, sea_cur, cursor, mtr));
723 }
724
725 /** Returns the upper level node pointer to a R-Tree page. It is assumed
726 that mtr holds an x-latch on the tree. */
rtr_get_father_node(dict_index_t * index,ulint level,const dtuple_t * tuple,btr_cur_t * sea_cur,btr_cur_t * btr_cur,page_no_t page_no,mtr_t * mtr)727 void rtr_get_father_node(
728 dict_index_t *index, /*!< in: index */
729 ulint level, /*!< in: the tree level of search */
730 const dtuple_t *tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
731 tuple must be set so that it cannot get
732 compared to the node ptr page number field! */
733 btr_cur_t *sea_cur, /*!< in: search cursor */
734 btr_cur_t *btr_cur, /*!< in/out: tree cursor; the cursor page is
735 s- or x-latched, but see also above! */
736 page_no_t page_no, /*!< Current page no */
737 mtr_t *mtr) /*!< in: mtr */
738 {
739 mem_heap_t *heap = nullptr;
740 bool ret = false;
741 const rec_t *rec;
742 ulint n_fields;
743 bool new_rtr = false;
744
745 /* Try to optimally locate the parent node. Level should always
746 less than sea_cur->tree_height unless the root is splitting */
747 if (sea_cur && sea_cur->tree_height > level) {
748 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
749 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
750 ret = rtr_cur_restore_position(BTR_CONT_MODIFY_TREE, sea_cur, level, mtr);
751
752 /* Once we block shrink tree nodes while there are
753 active search on it, this optimal locating should always
754 succeeds */
755 ut_ad(ret);
756
757 if (ret) {
758 btr_pcur_t *r_cursor = rtr_get_parent_cursor(sea_cur, level, false);
759
760 rec = btr_pcur_get_rec(r_cursor);
761
762 ut_ad(r_cursor->m_rel_pos == BTR_PCUR_ON);
763 page_cur_position(rec, btr_pcur_get_block(r_cursor),
764 btr_cur_get_page_cur(btr_cur));
765 btr_cur->rtr_info = sea_cur->rtr_info;
766 btr_cur->m_own_rtr_info = false;
767 btr_cur->tree_height = sea_cur->tree_height;
768 ut_ad(rtr_compare_cursor_rec(index, btr_cur, page_no, &heap));
769 goto func_exit;
770 }
771 }
772
773 /* We arrive here in one of two scenario
774 1) check table and btr_valide
775 2) index root page being raised */
776 ut_ad(!sea_cur || sea_cur->tree_height == level);
777
778 if (btr_cur->rtr_info) {
779 rtr_clean_rtr_info(btr_cur->rtr_info, true);
780 } else {
781 new_rtr = true;
782 }
783
784 btr_cur->rtr_info = rtr_create_rtr_info(false, false, btr_cur, index);
785
786 if (sea_cur && sea_cur->tree_height == level) {
787 /* root split, and search the new root */
788 btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_RTREE_LOCATE,
789 BTR_CONT_MODIFY_TREE, btr_cur, 0, __FILE__,
790 __LINE__, mtr);
791
792 } else {
793 /* btr_validate */
794 ut_ad(level >= 1);
795 ut_ad(!sea_cur);
796
797 btr_cur_search_to_nth_level(index, level, tuple, PAGE_CUR_RTREE_LOCATE,
798 BTR_CONT_MODIFY_TREE, btr_cur, 0, __FILE__,
799 __LINE__, mtr);
800
801 rec = btr_cur_get_rec(btr_cur);
802 n_fields = dtuple_get_n_fields_cmp(tuple);
803
804 if (page_rec_is_infimum(rec) || (btr_cur->low_match != n_fields)) {
805 ret = rtr_pcur_getnext_from_path(tuple, PAGE_CUR_RTREE_LOCATE, btr_cur,
806 level, BTR_CONT_MODIFY_TREE, true, mtr);
807
808 ut_ad(ret && btr_cur->low_match == n_fields);
809 }
810 }
811
812 ret = rtr_compare_cursor_rec(index, btr_cur, page_no, &heap);
813
814 ut_ad(ret);
815
816 func_exit:
817 if (heap) {
818 mem_heap_free(heap);
819 }
820
821 if (new_rtr && btr_cur->rtr_info) {
822 rtr_clean_rtr_info(btr_cur->rtr_info, true);
823 btr_cur->rtr_info = nullptr;
824 }
825 }
826
827 /** Create a RTree search info structure */
rtr_create_rtr_info(bool need_prdt,bool init_matches,btr_cur_t * cursor,dict_index_t * index)828 rtr_info_t *rtr_create_rtr_info(
829 /******************/
830 bool need_prdt, /*!< in: Whether predicate lock
831 is needed */
832 bool init_matches, /*!< in: Whether to initiate the
833 "matches" structure for collecting
834 matched leaf records */
835 btr_cur_t *cursor, /*!< in: tree search cursor */
836 dict_index_t *index) /*!< in: index struct */
837 {
838 rtr_info_t *rtr_info;
839
840 index = index ? index : cursor->index;
841 ut_ad(index);
842
843 rtr_info = static_cast<rtr_info_t *>(ut_zalloc_nokey(sizeof(*rtr_info)));
844
845 rtr_info->allocated = true;
846 rtr_info->cursor = cursor;
847 rtr_info->index = index;
848 rtr_info->is_dup = nullptr;
849
850 if (init_matches) {
851 rtr_info->heap = mem_heap_create(sizeof(*(rtr_info->matches)));
852 rtr_info->matches = static_cast<matched_rec_t *>(
853 mem_heap_zalloc(rtr_info->heap, sizeof(*rtr_info->matches)));
854
855 rtr_info->matches->matched_recs = UT_NEW_NOKEY(rtr_rec_vector());
856
857 rtr_info->matches->bufp =
858 page_align(rtr_info->matches->rec_buf + UNIV_PAGE_SIZE_MAX + 1);
859 mutex_create(LATCH_ID_RTR_MATCH_MUTEX, &rtr_info->matches->rtr_match_mutex);
860 rw_lock_create(PFS_NOT_INSTRUMENTED, &(rtr_info->matches->block.lock),
861 SYNC_LEVEL_VARYING);
862 }
863
864 rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
865 rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
866 rtr_info->need_prdt_lock = need_prdt;
867 mutex_create(LATCH_ID_RTR_PATH_MUTEX, &rtr_info->rtr_path_mutex);
868
869 mutex_enter(&index->rtr_track->rtr_active_mutex);
870 index->rtr_track->rtr_active->push_back(rtr_info);
871 mutex_exit(&index->rtr_track->rtr_active_mutex);
872 return (rtr_info);
873 }
874
875 /** Update a btr_cur_t with rtr_info */
rtr_info_update_btr(btr_cur_t * cursor,rtr_info_t * rtr_info)876 void rtr_info_update_btr(
877 /******************/
878 btr_cur_t *cursor, /*!< in/out: tree cursor */
879 rtr_info_t *rtr_info) /*!< in: rtr_info to set to the
880 cursor */
881 {
882 ut_ad(rtr_info);
883
884 cursor->rtr_info = rtr_info;
885 }
886
887 /** Initialize a R-Tree Search structure */
rtr_init_rtr_info(rtr_info_t * rtr_info,bool need_prdt,btr_cur_t * cursor,dict_index_t * index,bool reinit)888 void rtr_init_rtr_info(
889 /****************/
890 rtr_info_t *rtr_info, /*!< in: rtr_info to set to the
891 cursor */
892 bool need_prdt, /*!< in: Whether predicate lock is
893 needed */
894 btr_cur_t *cursor, /*!< in: tree search cursor */
895 dict_index_t *index, /*!< in: index structure */
896 bool reinit) /*!< in: Whether this is a reinit */
897 {
898 ut_ad(rtr_info);
899
900 if (!reinit) {
901 /* Reset all members. */
902 rtr_info->path = nullptr;
903 rtr_info->parent_path = nullptr;
904 rtr_info->matches = nullptr;
905
906 mutex_create(LATCH_ID_RTR_PATH_MUTEX, &rtr_info->rtr_path_mutex);
907
908 memset(rtr_info->tree_blocks, 0x0, sizeof(rtr_info->tree_blocks));
909 memset(rtr_info->tree_savepoints, 0x0, sizeof(rtr_info->tree_savepoints));
910 rtr_info->mbr.xmin = 0.0;
911 rtr_info->mbr.xmax = 0.0;
912 rtr_info->mbr.ymin = 0.0;
913 rtr_info->mbr.ymax = 0.0;
914 rtr_info->thr = nullptr;
915 rtr_info->heap = nullptr;
916 rtr_info->cursor = nullptr;
917 rtr_info->index = nullptr;
918 rtr_info->need_prdt_lock = false;
919 rtr_info->need_page_lock = false;
920 rtr_info->allocated = false;
921 rtr_info->mbr_adj = false;
922 rtr_info->fd_del = false;
923 rtr_info->search_tuple = nullptr;
924 rtr_info->search_mode = PAGE_CUR_UNSUPP;
925 }
926
927 ut_ad(!rtr_info->matches || rtr_info->matches->matched_recs->empty());
928
929 rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
930 rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
931 rtr_info->need_prdt_lock = need_prdt;
932 rtr_info->cursor = cursor;
933 rtr_info->index = index;
934 rtr_info->is_dup = nullptr;
935
936 mutex_enter(&index->rtr_track->rtr_active_mutex);
937 index->rtr_track->rtr_active->push_back(rtr_info);
938 mutex_exit(&index->rtr_track->rtr_active_mutex);
939 }
940
941 /** Clean up R-Tree search structure */
rtr_clean_rtr_info(rtr_info_t * rtr_info,bool free_all)942 void rtr_clean_rtr_info(rtr_info_t *rtr_info, /*!< in: RTree search info */
943 bool free_all) /*!< in: need to free rtr_info itself */
944 {
945 dict_index_t *index;
946 bool initialized = false;
947
948 if (!rtr_info) {
949 return;
950 }
951
952 index = rtr_info->index;
953
954 if (index) {
955 mutex_enter(&index->rtr_track->rtr_active_mutex);
956 }
957
958 while (rtr_info->parent_path && !rtr_info->parent_path->empty()) {
959 btr_pcur_t *cur = rtr_info->parent_path->back().cursor;
960 rtr_info->parent_path->pop_back();
961
962 if (cur) {
963 btr_pcur_close(cur);
964 ut_free(cur);
965 }
966 }
967
968 UT_DELETE(rtr_info->parent_path);
969 rtr_info->parent_path = nullptr;
970
971 if (rtr_info->path != nullptr) {
972 UT_DELETE(rtr_info->path);
973 rtr_info->path = nullptr;
974 initialized = true;
975 }
976
977 if (rtr_info->matches) {
978 rtr_info->matches->used = false;
979 rtr_info->matches->locked = false;
980 rtr_info->matches->valid = false;
981 rtr_info->matches->matched_recs->clear();
982 }
983
984 if (index) {
985 index->rtr_track->rtr_active->remove(rtr_info);
986 mutex_exit(&index->rtr_track->rtr_active_mutex);
987 }
988
989 if (free_all) {
990 if (rtr_info->matches) {
991 if (rtr_info->matches->matched_recs != nullptr) {
992 UT_DELETE(rtr_info->matches->matched_recs);
993 }
994
995 rw_lock_free(&(rtr_info->matches->block.lock));
996
997 mutex_destroy(&rtr_info->matches->rtr_match_mutex);
998 }
999
1000 if (rtr_info->heap) {
1001 mem_heap_free(rtr_info->heap);
1002 }
1003
1004 if (initialized) {
1005 mutex_destroy(&rtr_info->rtr_path_mutex);
1006 }
1007
1008 if (rtr_info->allocated) {
1009 ut_free(rtr_info);
1010 }
1011 }
1012 }
1013
1014 /** Rebuilt the "path" to exclude the removing page no */
rtr_rebuild_path(rtr_info_t * rtr_info,page_no_t page_no)1015 static void rtr_rebuild_path(
1016 rtr_info_t *rtr_info, /*!< in: RTree search info */
1017 page_no_t page_no) /*!< in: need to free rtr_info itself */
1018 {
1019 rtr_node_path_t *new_path = UT_NEW_NOKEY(rtr_node_path_t());
1020
1021 rtr_node_path_t::iterator rit;
1022 #ifdef UNIV_DEBUG
1023 ulint before_size = rtr_info->path->size();
1024 #endif /* UNIV_DEBUG */
1025
1026 for (rit = rtr_info->path->begin(); rit != rtr_info->path->end(); ++rit) {
1027 node_visit_t next_rec = *rit;
1028
1029 if (next_rec.page_no == page_no) {
1030 continue;
1031 }
1032
1033 new_path->push_back(next_rec);
1034 #ifdef UNIV_DEBUG
1035 node_visit_t rec = new_path->back();
1036 ut_ad(rec.level < rtr_info->cursor->tree_height && rec.page_no > 0);
1037 #endif /* UNIV_DEBUG */
1038 }
1039
1040 UT_DELETE(rtr_info->path);
1041
1042 ut_ad(new_path->size() == before_size - 1);
1043
1044 rtr_info->path = new_path;
1045
1046 if (!rtr_info->parent_path->empty()) {
1047 rtr_node_path_t *new_parent_path = UT_NEW_NOKEY(rtr_node_path_t());
1048
1049 for (rit = rtr_info->parent_path->begin();
1050 rit != rtr_info->parent_path->end(); ++rit) {
1051 node_visit_t next_rec = *rit;
1052
1053 if (next_rec.child_no == page_no) {
1054 btr_pcur_t *cur = next_rec.cursor;
1055
1056 if (cur) {
1057 btr_pcur_close(cur);
1058 ut_free(cur);
1059 }
1060
1061 continue;
1062 }
1063
1064 new_parent_path->push_back(next_rec);
1065 }
1066 UT_DELETE(rtr_info->parent_path);
1067 rtr_info->parent_path = new_parent_path;
1068 }
1069 }
1070
1071 /** Check whether a discarding page is in anyone's search path */
rtr_check_discard_page(dict_index_t * index,btr_cur_t * cursor,buf_block_t * block)1072 void rtr_check_discard_page(
1073 dict_index_t *index, /*!< in: index */
1074 btr_cur_t *cursor, /*!< in: cursor on the page to discard: not on
1075 the root page */
1076 buf_block_t *block) /*!< in: block of page to be discarded */
1077 {
1078 page_no_t pageno = block->page.id.page_no();
1079 rtr_info_t *rtr_info;
1080 rtr_info_active::iterator it;
1081
1082 mutex_enter(&index->rtr_track->rtr_active_mutex);
1083
1084 for (it = index->rtr_track->rtr_active->begin();
1085 it != index->rtr_track->rtr_active->end(); ++it) {
1086 rtr_info = *it;
1087 rtr_node_path_t::iterator rit;
1088 bool found = false;
1089
1090 if (cursor && rtr_info == cursor->rtr_info) {
1091 continue;
1092 }
1093
1094 mutex_enter(&rtr_info->rtr_path_mutex);
1095 for (rit = rtr_info->path->begin(); rit != rtr_info->path->end(); ++rit) {
1096 node_visit_t node = *rit;
1097
1098 if (node.page_no == pageno) {
1099 found = true;
1100 break;
1101 }
1102 }
1103
1104 if (found) {
1105 rtr_rebuild_path(rtr_info, pageno);
1106 }
1107 mutex_exit(&rtr_info->rtr_path_mutex);
1108
1109 if (rtr_info->matches) {
1110 mutex_enter(&rtr_info->matches->rtr_match_mutex);
1111
1112 if ((&rtr_info->matches->block)->page.id.page_no() == pageno) {
1113 if (!rtr_info->matches->matched_recs->empty()) {
1114 rtr_info->matches->matched_recs->clear();
1115 }
1116 ut_ad(rtr_info->matches->matched_recs->empty());
1117 rtr_info->matches->valid = false;
1118 }
1119
1120 mutex_exit(&rtr_info->matches->rtr_match_mutex);
1121 }
1122 }
1123
1124 mutex_exit(&index->rtr_track->rtr_active_mutex);
1125
1126 locksys::Shard_latch_guard guard{block->get_page_id()};
1127 lock_prdt_page_free_from_discard(block, lock_sys->prdt_hash);
1128 lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
1129 }
1130
1131 /** Restore the stored position of a persistent cursor bufferfixing the page */
rtr_cur_restore_position(ulint latch_mode,btr_cur_t * btr_cur,ulint level,mtr_t * mtr)1132 static bool rtr_cur_restore_position(
1133 ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */
1134 btr_cur_t *btr_cur, /*!< in: detached persistent cursor */
1135 ulint level, /*!< in: index level */
1136 mtr_t *mtr) /*!< in: mtr */
1137 {
1138 dict_index_t *index;
1139 mem_heap_t *heap;
1140 btr_pcur_t *r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
1141 dtuple_t *tuple;
1142 bool ret = false;
1143
1144 ut_ad(mtr);
1145 ut_ad(r_cursor);
1146 ut_ad(mtr->is_active());
1147
1148 index = btr_cur_get_index(btr_cur);
1149
1150 if (r_cursor->m_rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE ||
1151 r_cursor->m_rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE) {
1152 return (false);
1153 }
1154
1155 DBUG_EXECUTE_IF("rtr_pessimistic_position", r_cursor->m_modify_clock = 100;);
1156
1157 ut_ad(latch_mode == BTR_CONT_MODIFY_TREE);
1158
1159 if (!buf_pool_is_obsolete(r_cursor->m_withdraw_clock) &&
1160 buf_page_optimistic_get(RW_X_LATCH, r_cursor->m_block_when_stored,
1161 r_cursor->m_modify_clock, Page_fetch::NORMAL,
1162 __FILE__, __LINE__, mtr)) {
1163 ut_ad(r_cursor->m_pos_state == BTR_PCUR_IS_POSITIONED);
1164
1165 ut_ad(r_cursor->m_rel_pos == BTR_PCUR_ON);
1166 #ifdef UNIV_DEBUG
1167 do {
1168 const rec_t *rec;
1169 const ulint *offsets1;
1170 const ulint *offsets2;
1171 ulint comp;
1172
1173 rec = btr_pcur_get_rec(r_cursor);
1174
1175 heap = mem_heap_create(256);
1176 offsets1 = rec_get_offsets(r_cursor->m_old_rec, index, nullptr,
1177 r_cursor->m_old_n_fields, &heap);
1178 offsets2 =
1179 rec_get_offsets(rec, index, nullptr, r_cursor->m_old_n_fields, &heap);
1180
1181 comp = rec_offs_comp(offsets1);
1182
1183 if (rec_get_info_bits(r_cursor->m_old_rec, comp) &
1184 REC_INFO_MIN_REC_FLAG) {
1185 ut_ad(rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG);
1186 } else {
1187 ut_ad(!cmp_rec_rec(r_cursor->m_old_rec, rec, offsets1, offsets2, index,
1188 page_is_spatial_non_leaf(rec, index)));
1189 }
1190
1191 mem_heap_free(heap);
1192 } while (false);
1193 #endif /* UNIV_DEBUG */
1194
1195 return (true);
1196 }
1197
1198 /* Page has changed, for R-Tree, the page cannot be shrunk away,
1199 so we search the page and its right siblings */
1200 buf_block_t *block;
1201 node_seq_t page_ssn;
1202 const page_t *page;
1203 page_cur_t *page_cursor;
1204 node_visit_t *node = rtr_get_parent_node(btr_cur, level, false);
1205 space_id_t space = dict_index_get_space(index);
1206 node_seq_t path_ssn = node->seq_no;
1207 page_size_t page_size = dict_table_page_size(index->table);
1208
1209 page_no_t page_no = node->page_no;
1210
1211 heap = mem_heap_create(256);
1212
1213 tuple = dict_index_build_data_tuple(index, r_cursor->m_old_rec,
1214 r_cursor->m_old_n_fields, heap);
1215
1216 page_cursor = btr_pcur_get_page_cur(r_cursor);
1217 ut_ad(r_cursor == node->cursor);
1218
1219 search_again:
1220 page_id_t page_id(space, page_no);
1221
1222 block = buf_page_get_gen(page_id, page_size, RW_X_LATCH, nullptr,
1223 Page_fetch::NORMAL, __FILE__, __LINE__, mtr);
1224
1225 ut_ad(block);
1226
1227 /* Get the page SSN */
1228 page = buf_block_get_frame(block);
1229 page_ssn = page_get_ssn_id(page);
1230
1231 ulint low_match =
1232 page_cur_search(block, index, tuple, PAGE_CUR_LE, page_cursor);
1233
1234 if (low_match == r_cursor->m_old_n_fields) {
1235 const rec_t *rec;
1236 const ulint *offsets1;
1237 const ulint *offsets2;
1238 ulint comp;
1239
1240 rec = btr_pcur_get_rec(r_cursor);
1241
1242 offsets1 = rec_get_offsets(r_cursor->m_old_rec, index, nullptr,
1243 r_cursor->m_old_n_fields, &heap);
1244 offsets2 =
1245 rec_get_offsets(rec, index, nullptr, r_cursor->m_old_n_fields, &heap);
1246
1247 comp = rec_offs_comp(offsets1);
1248
1249 if ((rec_get_info_bits(r_cursor->m_old_rec, comp) &
1250 REC_INFO_MIN_REC_FLAG) &&
1251 (rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG)) {
1252 r_cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
1253 ret = true;
1254 } else if (!cmp_rec_rec(r_cursor->m_old_rec, rec, offsets1, offsets2, index,
1255 page_is_spatial_non_leaf(rec, index))) {
1256 r_cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
1257 ret = true;
1258 }
1259 }
1260
1261 /* Check the page SSN to see if it has been splitted, if so, search
1262 the right page */
1263 if (!ret && page_ssn > path_ssn) {
1264 page_no = btr_page_get_next(page, mtr);
1265 goto search_again;
1266 }
1267
1268 mem_heap_free(heap);
1269
1270 return (ret);
1271 }
1272
1273 /** Copy the leaf level R-tree record, and push it to matched_rec in rtr_info */
rtr_leaf_push_match_rec(const rec_t * rec,rtr_info_t * rtr_info,ulint * offsets,bool is_comp)1274 static void rtr_leaf_push_match_rec(
1275 const rec_t *rec, /*!< in: record to copy */
1276 rtr_info_t *rtr_info, /*!< in/out: search stack */
1277 ulint *offsets, /*!< in: offsets */
1278 bool is_comp) /*!< in: is compact format */
1279 {
1280 byte *buf;
1281 matched_rec_t *match_rec = rtr_info->matches;
1282 rec_t *copy;
1283 ulint data_len;
1284 rtr_rec_t rtr_rec;
1285
1286 buf = match_rec->block.frame + match_rec->used;
1287
1288 copy = rec_copy(buf, rec, offsets);
1289
1290 if (is_comp) {
1291 rec_set_next_offs_new(copy, PAGE_NEW_SUPREMUM);
1292 } else {
1293 rec_set_next_offs_old(copy, PAGE_OLD_SUPREMUM);
1294 }
1295
1296 rtr_rec.r_rec = copy;
1297 rtr_rec.locked = false;
1298
1299 match_rec->matched_recs->push_back(rtr_rec);
1300 match_rec->valid = true;
1301
1302 data_len = rec_offs_data_size(offsets) + rec_offs_extra_size(offsets);
1303 match_rec->used += data_len;
1304
1305 ut_ad(match_rec->used < UNIV_PAGE_SIZE);
1306 }
1307
1308 /** Store the parent path cursor
1309 @return number of cursor stored */
rtr_store_parent_path(const buf_block_t * block,btr_cur_t * btr_cur,ulint latch_mode,ulint level,mtr_t * mtr)1310 ulint rtr_store_parent_path(
1311 const buf_block_t *block, /*!< in: block of the page */
1312 btr_cur_t *btr_cur, /*!< in/out: persistent cursor */
1313 ulint latch_mode,
1314 /*!< in: latch_mode */
1315 ulint level, /*!< in: index level */
1316 mtr_t *mtr) /*!< in: mtr */
1317 {
1318 ulint num = btr_cur->rtr_info->parent_path->size();
1319 ulint num_stored = 0;
1320
1321 while (num >= 1) {
1322 node_visit_t *node = &(*btr_cur->rtr_info->parent_path)[num - 1];
1323 btr_pcur_t *r_cursor = node->cursor;
1324 buf_block_t *cur_block;
1325
1326 if (node->level > level) {
1327 break;
1328 }
1329
1330 r_cursor->m_pos_state = BTR_PCUR_IS_POSITIONED;
1331 r_cursor->m_latch_mode = latch_mode;
1332
1333 cur_block = btr_pcur_get_block(r_cursor);
1334
1335 if (cur_block == block) {
1336 btr_pcur_store_position(r_cursor, mtr);
1337 num_stored++;
1338 } else {
1339 break;
1340 }
1341
1342 num--;
1343 }
1344
1345 return (num_stored);
1346 }
1347 /** push a nonleaf index node to the search path for insertion */
rtr_non_leaf_insert_stack_push(dict_index_t * index,rtr_node_path_t * path,ulint level,page_no_t child_no,const buf_block_t * block,const rec_t * rec,double mbr_inc)1348 static void rtr_non_leaf_insert_stack_push(
1349 dict_index_t *index, /*!< in: index descriptor */
1350 rtr_node_path_t *path, /*!< in/out: search path */
1351 ulint level, /*!< in: index page level */
1352 page_no_t child_no, /*!< in: child page no */
1353 const buf_block_t *block, /*!< in: block of the page */
1354 const rec_t *rec, /*!< in: positioned record */
1355 double mbr_inc) /*!< in: MBR needs to be enlarged */
1356 {
1357 node_seq_t new_seq;
1358 btr_pcur_t *my_cursor;
1359 page_no_t page_no = block->page.id.page_no();
1360
1361 my_cursor = static_cast<btr_pcur_t *>(ut_malloc_nokey(sizeof(*my_cursor)));
1362
1363 btr_pcur_init(my_cursor);
1364
1365 page_cur_position(rec, block, btr_pcur_get_page_cur(my_cursor));
1366
1367 (btr_pcur_get_btr_cur(my_cursor))->index = index;
1368
1369 new_seq = rtr_get_current_ssn_id(index);
1370 rtr_non_leaf_stack_push(path, page_no, new_seq, level, child_no, my_cursor,
1371 mbr_inc);
1372 }
1373
1374 /** Copy a buf_block_t strcuture, except "block->lock", "block->mutex" and
1375 "block->debug_latch."
1376 @param[in,out] matches copy to match->block
1377 @param[in] block block to copy */
rtr_copy_buf(matched_rec_t * matches,const buf_block_t * block)1378 static void rtr_copy_buf(matched_rec_t *matches, const buf_block_t *block) {
1379 /* Copy all members of "block" to "matches->block" except "mutex"
1380 and "lock". We skip "mutex" and "lock" because they are not used
1381 from the dummy buf_block_t we create here and because memcpy()ing
1382 them generates (valid) compiler warnings that the vtable pointer
1383 will be copied. It is also undefined what will happen with the
1384 newly memcpy()ed mutex if the source mutex was acquired by
1385 (another) thread while it was copied. */
1386 new (&matches->block.page) buf_page_t(block->page);
1387 matches->block.frame = block->frame;
1388 matches->block.unzip_LRU = block->unzip_LRU;
1389
1390 ut_d(matches->block.in_unzip_LRU_list = block->in_unzip_LRU_list);
1391 ut_d(matches->block.in_withdraw_list = block->in_withdraw_list);
1392
1393 /* Skip buf_block_t::mutex */
1394 /* Skip buf_block_t::lock */
1395 matches->block.lock_hash_val = block->lock_hash_val;
1396 matches->block.modify_clock = block->modify_clock;
1397 matches->block.n_hash_helps = block->n_hash_helps;
1398 matches->block.n_fields = block->n_fields;
1399 matches->block.left_side = block->left_side;
1400 #if defined UNIV_AHI_DEBUG || defined UNIV_DEBUG
1401 matches->block.n_pointers = block->n_pointers;
1402 #endif /* UNIV_AHI_DEBUG || UNIV_DEBUG */
1403 matches->block.curr_n_fields = block->curr_n_fields;
1404 matches->block.curr_left_side = block->curr_left_side;
1405 matches->block.index = block->index;
1406 matches->block.made_dirty_with_no_latch = block->made_dirty_with_no_latch;
1407
1408 #ifndef UNIV_HOTBACKUP
1409 #ifdef UNIV_DEBUG
1410 /* The buf_block_t copy does not contain a valid debug_latch object, mark it
1411 as invalid so that we can detect any uses in our valgrind tests. Copy
1412 semantics for locks are not well defined, so the previous code which simply
1413 defined the default copy assignment operator was not correct. This was changed
1414 because an std::atomic<bool> member was added and rw_lock_t became explicitly
1415 non copyable. */
1416 UNIV_MEM_INVALID(&matches->block.debug_latch, sizeof(rw_lock_t));
1417 #endif /* UNIV_DEBUG */
1418 #endif /* !UNIV_HOTBACKUP */
1419 }
1420
1421 /** Generate a shadow copy of the page block header to save the
1422 matched records */
rtr_init_match(matched_rec_t * matches,const buf_block_t * block,const page_t * page)1423 static void rtr_init_match(
1424 matched_rec_t *matches, /*!< in/out: match to initialize */
1425 const buf_block_t *block, /*!< in: buffer block */
1426 const page_t *page) /*!< in: buffer page */
1427 {
1428 ut_ad(matches->matched_recs->empty());
1429 matches->locked = false;
1430 rtr_copy_buf(matches, block);
1431 matches->block.frame = matches->bufp;
1432 matches->valid = false;
1433 /* We have to copy PAGE_W*_SUPREMUM_END bytes so that we can
1434 use infimum/supremum of this page as normal btr page for search. */
1435 memcpy(matches->block.frame, page,
1436 page_is_comp(page) ? PAGE_NEW_SUPREMUM_END : PAGE_OLD_SUPREMUM_END);
1437 matches->used =
1438 page_is_comp(page) ? PAGE_NEW_SUPREMUM_END : PAGE_OLD_SUPREMUM_END;
1439 #ifdef RTR_SEARCH_DIAGNOSTIC
1440 ulint pageno = page_get_page_no(page);
1441 fprintf(stderr, "INNODB_RTR: Searching leaf page %d\n",
1442 static_cast<int>(pageno));
1443 #endif /* RTR_SEARCH_DIAGNOSTIC */
1444 }
1445
1446 /** Get the bounding box content from an index record */
rtr_get_mbr_from_rec(const rec_t * rec,const ulint * offsets,rtr_mbr_t * mbr)1447 void rtr_get_mbr_from_rec(const rec_t *rec, /*!< in: data tuple */
1448 const ulint *offsets, /*!< in: offsets array */
1449 rtr_mbr_t *mbr) /*!< out MBR */
1450 {
1451 ulint rec_f_len;
1452 const byte *data;
1453
1454 data = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1455
1456 rtr_read_mbr(data, mbr);
1457 }
1458
1459 /** Get the bounding box content from a MBR data record */
rtr_get_mbr_from_tuple(const dtuple_t * dtuple,rtr_mbr * mbr)1460 void rtr_get_mbr_from_tuple(const dtuple_t *dtuple, /*!< in: data tuple */
1461 rtr_mbr *mbr) /*!< out: mbr to fill */
1462 {
1463 const dfield_t *dtuple_field;
1464 ulint dtuple_f_len;
1465 byte *data;
1466
1467 dtuple_field = dtuple_get_nth_field(dtuple, 0);
1468 dtuple_f_len = dfield_get_len(dtuple_field);
1469 ut_a(dtuple_f_len >= 4 * sizeof(double));
1470
1471 data = static_cast<byte *>(dfield_get_data(dtuple_field));
1472
1473 rtr_read_mbr(data, mbr);
1474 }
1475
1476 /** Searches the right position in rtree for a page cursor. */
rtr_cur_search_with_match(const buf_block_t * block,dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,page_cur_t * cursor,rtr_info_t * rtr_info)1477 bool rtr_cur_search_with_match(
1478 const buf_block_t *block, /*!< in: buffer block */
1479 dict_index_t *index, /*!< in: index descriptor */
1480 const dtuple_t *tuple, /*!< in: data tuple */
1481 page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_INSERT,
1482 PAGE_CUR_RTREE_LOCATE etc. */
1483 page_cur_t *cursor, /*!< in/out: page cursor */
1484 rtr_info_t *rtr_info) /*!< in/out: search stack */
1485 {
1486 bool found = false;
1487 const page_t *page;
1488 const rec_t *rec;
1489 const rec_t *last_rec;
1490 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1491 ulint *offsets = offsets_;
1492 mem_heap_t *heap = nullptr;
1493 int cmp = 1;
1494 bool is_leaf;
1495 double least_inc = DBL_MAX;
1496 const rec_t *best_rec;
1497 const rec_t *last_match_rec = nullptr;
1498 ulint level;
1499 bool match_init = false;
1500 space_id_t space = block->page.id.space();
1501 page_cur_mode_t orig_mode = mode;
1502 const rec_t *first_rec = nullptr;
1503
1504 rec_offs_init(offsets_);
1505
1506 ut_ad(RTREE_SEARCH_MODE(mode));
1507
1508 ut_ad(dict_index_is_spatial(index));
1509
1510 page = buf_block_get_frame(block);
1511
1512 is_leaf = page_is_leaf(page);
1513 level = btr_page_get_level(page, mtr);
1514
1515 if (mode == PAGE_CUR_RTREE_LOCATE) {
1516 ut_ad(level != 0);
1517 mode = PAGE_CUR_WITHIN;
1518 }
1519
1520 rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0));
1521
1522 last_rec = rec;
1523 best_rec = rec;
1524
1525 if (page_rec_is_infimum(rec)) {
1526 rec = page_rec_get_next_const(rec);
1527 }
1528
1529 /* Check insert tuple size is larger than first rec, and try to
1530 avoid it if possible */
1531 if (mode == PAGE_CUR_RTREE_INSERT && !page_rec_is_supremum(rec)) {
1532 ulint new_rec_size = rec_get_converted_size(index, tuple);
1533
1534 offsets = rec_get_offsets(rec, index, offsets,
1535 dtuple_get_n_fields_cmp(tuple), &heap);
1536
1537 if (rec_offs_size(offsets) < new_rec_size) {
1538 first_rec = rec;
1539 }
1540
1541 /* If this is the left-most page of this index level
1542 and the table is a compressed table, try to avoid
1543 first page as much as possible, as there will be problem
1544 when update MIN_REC rec in compress table */
1545 if (buf_block_get_page_zip(block) &&
1546 mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL &&
1547 page_get_n_recs(page) >= 2) {
1548 rec = page_rec_get_next_const(rec);
1549 }
1550 }
1551
1552 while (!page_rec_is_supremum(rec)) {
1553 offsets = rec_get_offsets(rec, index, offsets,
1554 dtuple_get_n_fields_cmp(tuple), &heap);
1555 if (!is_leaf) {
1556 switch (mode) {
1557 case PAGE_CUR_CONTAIN:
1558 case PAGE_CUR_INTERSECT:
1559 case PAGE_CUR_MBR_EQUAL:
1560 /* At non-leaf level, we will need to check
1561 both CONTAIN and INTERSECT for either of
1562 the search mode */
1563 cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, PAGE_CUR_CONTAIN,
1564 index->rtr_srs.get());
1565
1566 if (cmp != 0) {
1567 cmp = cmp_dtuple_rec_with_gis(
1568 tuple, rec, offsets, PAGE_CUR_INTERSECT, index->rtr_srs.get());
1569 }
1570 break;
1571 case PAGE_CUR_DISJOINT:
1572 cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, mode,
1573 index->rtr_srs.get());
1574
1575 if (cmp != 0) {
1576 cmp = cmp_dtuple_rec_with_gis(
1577 tuple, rec, offsets, PAGE_CUR_INTERSECT, index->rtr_srs.get());
1578 }
1579 break;
1580 case PAGE_CUR_RTREE_INSERT:
1581 double increase;
1582 double area;
1583
1584 cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, PAGE_CUR_WITHIN,
1585 index->rtr_srs.get());
1586
1587 if (cmp != 0) {
1588 increase = rtr_rec_cal_increase(tuple, rec, offsets, &area,
1589 index->rtr_srs.get());
1590 /* Once it goes beyond DBL_MAX or
1591 they are both DBL_MAX, it would not
1592 make sense to record such value,
1593 just make it DBL_MAX / 2 */
1594 if (increase >= DBL_MAX || (increase == 0 && std::isfinite(area))) {
1595 increase = DBL_MAX / 2;
1596 }
1597
1598 if (increase < least_inc) {
1599 least_inc = increase;
1600 best_rec = rec;
1601 } else if (best_rec && best_rec == first_rec) {
1602 /* if first_rec is set,
1603 we will try to avoid it */
1604 least_inc = increase;
1605 best_rec = rec;
1606 }
1607 }
1608 break;
1609 case PAGE_CUR_RTREE_GET_FATHER:
1610 cmp = cmp_dtuple_rec_with_gis_internal(tuple, rec, offsets,
1611 index->rtr_srs.get());
1612 break;
1613 default:
1614 /* WITHIN etc. */
1615 cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, mode,
1616 index->rtr_srs.get());
1617 }
1618 } else {
1619 /* At leaf level, INSERT should translate to LE */
1620 ut_ad(mode != PAGE_CUR_RTREE_INSERT);
1621
1622 cmp = cmp_dtuple_rec_with_gis(tuple, rec, offsets, mode,
1623 index->rtr_srs.get());
1624 }
1625
1626 if (cmp == 0) {
1627 found = true;
1628
1629 /* If located, the matching node/rec will be pushed
1630 to rtr_info->path for non-leaf nodes, or
1631 rtr_info->matches for leaf nodes */
1632 if (rtr_info && mode != PAGE_CUR_RTREE_INSERT) {
1633 if (!is_leaf) {
1634 page_no_t page_no;
1635 node_seq_t new_seq;
1636 bool is_loc;
1637
1638 is_loc = (orig_mode == PAGE_CUR_RTREE_LOCATE ||
1639 orig_mode == PAGE_CUR_RTREE_GET_FATHER);
1640
1641 offsets =
1642 rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1643
1644 page_no = btr_node_ptr_get_child_page_no(rec, offsets);
1645
1646 ut_ad(level >= 1);
1647
1648 /* Get current SSN, before we insert
1649 it into the path stack */
1650 new_seq = rtr_get_current_ssn_id(index);
1651
1652 rtr_non_leaf_stack_push(rtr_info->path, page_no, new_seq, level - 1,
1653 0, nullptr, 0);
1654
1655 if (is_loc) {
1656 rtr_non_leaf_insert_stack_push(index, rtr_info->parent_path, level,
1657 page_no, block, rec, 0);
1658 }
1659
1660 if (!srv_read_only_mode && (rtr_info->need_page_lock || !is_loc)) {
1661 /* Lock the page, preventing it
1662 from being shrunk */
1663 lock_place_prdt_page_lock({space, page_no}, index, rtr_info->thr);
1664 }
1665 } else {
1666 ut_ad(orig_mode != PAGE_CUR_RTREE_LOCATE);
1667
1668 if (!match_init) {
1669 rtr_init_match(rtr_info->matches, block, page);
1670 match_init = true;
1671 }
1672
1673 /* Collect matched records on page */
1674 offsets =
1675 rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1676 rtr_leaf_push_match_rec(rec, rtr_info, offsets, page_is_comp(page));
1677 }
1678
1679 last_match_rec = rec;
1680 } else {
1681 /* This is the insertion case, it will break
1682 once it finds the first MBR that can accomodate
1683 the inserting rec */
1684 break;
1685 }
1686 }
1687
1688 last_rec = rec;
1689
1690 rec = page_rec_get_next_const(rec);
1691 }
1692
1693 /* All records on page are searched */
1694 if (page_rec_is_supremum(rec)) {
1695 if (!is_leaf) {
1696 if (!found) {
1697 /* No match case, if it is for insertion,
1698 then we select the record that result in
1699 least increased area */
1700 if (mode == PAGE_CUR_RTREE_INSERT) {
1701 page_no_t child_no;
1702 ut_ad(least_inc < DBL_MAX);
1703 offsets =
1704 rec_get_offsets(best_rec, index, offsets, ULINT_UNDEFINED, &heap);
1705 child_no = btr_node_ptr_get_child_page_no(best_rec, offsets);
1706
1707 rtr_non_leaf_insert_stack_push(index, rtr_info->parent_path, level,
1708 child_no, block, best_rec, least_inc);
1709
1710 page_cur_position(best_rec, block, cursor);
1711 rtr_info->mbr_adj = true;
1712 } else {
1713 /* Position at the last rec of the
1714 page, if it is not the leaf page */
1715 page_cur_position(last_rec, block, cursor);
1716 }
1717 } else {
1718 /* There are matching records, position
1719 in the last matching records */
1720 if (rtr_info) {
1721 rec = last_match_rec;
1722 page_cur_position(rec, block, cursor);
1723 }
1724 }
1725 } else if (rtr_info) {
1726 /* Leaf level, no match, position at the
1727 last (supremum) rec */
1728 if (!last_match_rec) {
1729 page_cur_position(rec, block, cursor);
1730 goto func_exit;
1731 }
1732
1733 /* There are matched records */
1734 matched_rec_t *match_rec = rtr_info->matches;
1735
1736 rtr_rec_t test_rec;
1737
1738 test_rec = match_rec->matched_recs->back();
1739 #ifdef UNIV_DEBUG
1740 ulint offsets_2[REC_OFFS_NORMAL_SIZE];
1741 ulint *offsets2 = offsets_2;
1742 rec_offs_init(offsets_2);
1743
1744 ut_ad(found);
1745
1746 /* Verify the record to be positioned is the same
1747 as the last record in matched_rec vector */
1748 offsets2 = rec_get_offsets(test_rec.r_rec, index, offsets2,
1749 ULINT_UNDEFINED, &heap);
1750
1751 offsets = rec_get_offsets(last_match_rec, index, offsets, ULINT_UNDEFINED,
1752 &heap);
1753
1754 ut_ad(cmp_rec_rec(test_rec.r_rec, last_match_rec, offsets2, offsets,
1755 index, false) == 0);
1756 #endif /* UNIV_DEBUG */
1757 /* Pop the last match record and position on it */
1758 match_rec->matched_recs->pop_back();
1759 page_cur_position(test_rec.r_rec, &match_rec->block, cursor);
1760 }
1761 } else {
1762 if (mode == PAGE_CUR_RTREE_INSERT) {
1763 page_no_t child_no;
1764 ut_ad(!last_match_rec && rec);
1765
1766 offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1767
1768 child_no = btr_node_ptr_get_child_page_no(rec, offsets);
1769
1770 rtr_non_leaf_insert_stack_push(index, rtr_info->parent_path, level,
1771 child_no, block, rec, 0);
1772
1773 } else if (rtr_info && found && !is_leaf) {
1774 rec = last_match_rec;
1775 }
1776
1777 page_cur_position(rec, block, cursor);
1778 }
1779
1780 #ifdef UNIV_DEBUG
1781 /* Verify that we are positioned at the same child page as pushed in
1782 the path stack */
1783 if (!is_leaf && (!page_rec_is_supremum(rec) || found) &&
1784 mode != PAGE_CUR_RTREE_INSERT) {
1785 page_no_t page_no;
1786
1787 offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1788 page_no = btr_node_ptr_get_child_page_no(rec, offsets);
1789
1790 if (rtr_info && found) {
1791 rtr_node_path_t *path = rtr_info->path;
1792 node_visit_t last_visit = path->back();
1793
1794 ut_ad(last_visit.page_no == page_no);
1795 }
1796 }
1797 #endif /* UNIV_DEBUG */
1798
1799 func_exit:
1800 if (UNIV_LIKELY_NULL(heap)) {
1801 mem_heap_free(heap);
1802 }
1803
1804 return (found);
1805 }
1806