1 /*****************************************************************************
2
3 Copyright (c) 2016, 2018, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file gis/gis0sea.cc
22 InnoDB R-tree search interfaces
23
24 Created 2014/01/16 Jimmy Yang
25 ***********************************************************************/
26
27 #include "fsp0fsp.h"
28 #include "page0page.h"
29 #include "page0cur.h"
30 #include "page0zip.h"
31 #include "gis0rtree.h"
32 #include "btr0cur.h"
33 #include "btr0sea.h"
34 #include "btr0pcur.h"
35 #include "rem0cmp.h"
36 #include "lock0lock.h"
37 #include "ibuf0ibuf.h"
38 #include "trx0trx.h"
39 #include "srv0mon.h"
40 #include "que0que.h"
41 #include "gis0geo.h"
42
43 /** Restore the stored position of a persistent cursor bufferfixing the page */
44 static
45 bool
46 rtr_cur_restore_position(
47 ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */
48 btr_cur_t* cursor, /*!< in: detached persistent cursor */
49 ulint level, /*!< in: index level */
50 mtr_t* mtr); /*!< in: mtr */
51
52 /*************************************************************//**
53 Pop out used parent path entry, until we find the parent with matching
54 page number */
55 static
56 void
rtr_adjust_parent_path(rtr_info_t * rtr_info,ulint page_no)57 rtr_adjust_parent_path(
58 /*===================*/
59 rtr_info_t* rtr_info, /* R-Tree info struct */
60 ulint page_no) /* page number to look for */
61 {
62 while (!rtr_info->parent_path->empty()) {
63 if (rtr_info->parent_path->back().child_no == page_no) {
64 break;
65 } else {
66 if (rtr_info->parent_path->back().cursor) {
67 btr_pcur_close(
68 rtr_info->parent_path->back().cursor);
69 ut_free(rtr_info->parent_path->back().cursor);
70 }
71
72 rtr_info->parent_path->pop_back();
73 }
74 }
75 }
76
77 /*************************************************************//**
78 Find the next matching record. This function is used by search
79 or record locating during index delete/update.
80 @return true if there is suitable record found, otherwise false */
81 static
82 bool
rtr_pcur_getnext_from_path(const dtuple_t * tuple,page_cur_mode_t mode,btr_cur_t * btr_cur,ulint target_level,ulint latch_mode,bool index_locked,mtr_t * mtr)83 rtr_pcur_getnext_from_path(
84 /*=======================*/
85 const dtuple_t* tuple, /*!< in: data tuple */
86 page_cur_mode_t mode, /*!< in: cursor search mode */
87 btr_cur_t* btr_cur,/*!< in: persistent cursor; NOTE that the
88 function may release the page latch */
89 ulint target_level,
90 /*!< in: target level */
91 ulint latch_mode,
92 /*!< in: latch_mode */
93 bool index_locked,
94 /*!< in: index tree locked */
95 mtr_t* mtr) /*!< in: mtr */
96 {
97 dict_index_t* index = btr_cur->index;
98 bool found = false;
99 page_cur_t* page_cursor;
100 ulint level = 0;
101 node_visit_t next_rec;
102 rtr_info_t* rtr_info = btr_cur->rtr_info;
103 node_seq_t page_ssn;
104 ulint my_latch_mode;
105 ulint skip_parent = false;
106 bool new_split = false;
107 bool need_parent;
108 bool for_delete = false;
109 bool for_undo_ins = false;
110
111 /* exhausted all the pages to be searched */
112 if (rtr_info->path->empty()) {
113 return(false);
114 }
115
116 ut_ad(dtuple_get_n_fields_cmp(tuple));
117
118 my_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
119
120 for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
121 for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
122
123 /* There should be no insert coming to this function. Only
124 mode with BTR_MODIFY_* should be delete */
125 ut_ad(mode != PAGE_CUR_RTREE_INSERT);
126 ut_ad(my_latch_mode == BTR_SEARCH_LEAF
127 || my_latch_mode == BTR_MODIFY_LEAF
128 || my_latch_mode == BTR_MODIFY_TREE
129 || my_latch_mode == BTR_CONT_MODIFY_TREE);
130
131 /* Whether need to track parent information. Only need so
132 when we do tree altering operations (such as index page merge) */
133 need_parent = ((my_latch_mode == BTR_MODIFY_TREE
134 || my_latch_mode == BTR_CONT_MODIFY_TREE)
135 && mode == PAGE_CUR_RTREE_LOCATE);
136
137 if (!index_locked) {
138 ut_ad(latch_mode & BTR_SEARCH_LEAF
139 || latch_mode & BTR_MODIFY_LEAF);
140 mtr_s_lock_index(index, mtr);
141 } else {
142 ut_ad(mtr->memo_contains_flagged(&index->lock,
143 MTR_MEMO_SX_LOCK
144 | MTR_MEMO_S_LOCK
145 | MTR_MEMO_X_LOCK));
146 }
147
148 const ulint zip_size = index->table->space->zip_size();
149
150 /* Pop each node/page to be searched from "path" structure
151 and do a search on it. Please note, any pages that are in
152 the "path" structure are protected by "page" lock, so tey
153 cannot be shrunk away */
154 do {
155 buf_block_t* block;
156 node_seq_t path_ssn;
157 const page_t* page;
158 ulint rw_latch = RW_X_LATCH;
159 ulint tree_idx;
160
161 mutex_enter(&rtr_info->rtr_path_mutex);
162 next_rec = rtr_info->path->back();
163 rtr_info->path->pop_back();
164 level = next_rec.level;
165 path_ssn = next_rec.seq_no;
166 tree_idx = btr_cur->tree_height - level - 1;
167
168 /* Maintain the parent path info as well, if needed */
169 if (need_parent && !skip_parent && !new_split) {
170 ulint old_level;
171 ulint new_level;
172
173 ut_ad(!rtr_info->parent_path->empty());
174
175 /* Cleanup unused parent info */
176 if (rtr_info->parent_path->back().cursor) {
177 btr_pcur_close(
178 rtr_info->parent_path->back().cursor);
179 ut_free(rtr_info->parent_path->back().cursor);
180 }
181
182 old_level = rtr_info->parent_path->back().level;
183
184 rtr_info->parent_path->pop_back();
185
186 ut_ad(!rtr_info->parent_path->empty());
187
188 /* check whether there is a level change. If so,
189 the current parent path needs to pop enough
190 nodes to adjust to the new search page */
191 new_level = rtr_info->parent_path->back().level;
192
193 if (old_level < new_level) {
194 rtr_adjust_parent_path(
195 rtr_info, next_rec.page_no);
196 }
197
198 ut_ad(!rtr_info->parent_path->empty());
199
200 ut_ad(next_rec.page_no
201 == rtr_info->parent_path->back().child_no);
202 }
203
204 mutex_exit(&rtr_info->rtr_path_mutex);
205
206 skip_parent = false;
207 new_split = false;
208
209 /* Once we have pages in "path", these pages are
210 predicate page locked, so they can't be shrunk away.
211 They also have SSN (split sequence number) to detect
212 splits, so we can directly latch single page while
213 getting them. They can be unlatched if not qualified.
214 One reason for pre-latch is that we might need to position
215 some parent position (requires latch) during search */
216 if (level == 0) {
217 /* S latched for SEARCH_LEAF, and X latched
218 for MODIFY_LEAF */
219 if (my_latch_mode <= BTR_MODIFY_LEAF) {
220 rw_latch = my_latch_mode;
221 }
222
223 if (my_latch_mode == BTR_CONT_MODIFY_TREE
224 || my_latch_mode == BTR_MODIFY_TREE) {
225 rw_latch = RW_NO_LATCH;
226 }
227
228 } else if (level == target_level) {
229 rw_latch = RW_X_LATCH;
230 }
231
232 /* Release previous locked blocks */
233 if (my_latch_mode != BTR_SEARCH_LEAF) {
234 for (ulint idx = 0; idx < btr_cur->tree_height;
235 idx++) {
236 if (rtr_info->tree_blocks[idx]) {
237 mtr_release_block_at_savepoint(
238 mtr,
239 rtr_info->tree_savepoints[idx],
240 rtr_info->tree_blocks[idx]);
241 rtr_info->tree_blocks[idx] = NULL;
242 }
243 }
244 for (ulint idx = RTR_MAX_LEVELS; idx < RTR_MAX_LEVELS + 3;
245 idx++) {
246 if (rtr_info->tree_blocks[idx]) {
247 mtr_release_block_at_savepoint(
248 mtr,
249 rtr_info->tree_savepoints[idx],
250 rtr_info->tree_blocks[idx]);
251 rtr_info->tree_blocks[idx] = NULL;
252 }
253 }
254 }
255
256 /* set up savepoint to record any locks to be taken */
257 rtr_info->tree_savepoints[tree_idx] = mtr_set_savepoint(mtr);
258
259 #ifdef UNIV_RTR_DEBUG
260 ut_ad(!(rw_lock_own_flagged(&btr_cur->page_cur.block->lock,
261 RW_LOCK_FLAG_X | RW_LOCK_FLAG_S))
262 || my_latch_mode == BTR_MODIFY_TREE
263 || my_latch_mode == BTR_CONT_MODIFY_TREE
264 || !page_is_leaf(buf_block_get_frame(
265 btr_cur->page_cur.block)));
266 #endif /* UNIV_RTR_DEBUG */
267
268 dberr_t err = DB_SUCCESS;
269
270 block = buf_page_get_gen(
271 page_id_t(index->table->space_id,
272 next_rec.page_no), zip_size,
273 rw_latch, NULL, BUF_GET, __FILE__, __LINE__, mtr, &err);
274
275 if (block == NULL) {
276 continue;
277 } else if (rw_latch != RW_NO_LATCH) {
278 ut_ad(!dict_index_is_ibuf(index));
279 buf_block_dbg_add_level(block, SYNC_TREE_NODE);
280 }
281
282 rtr_info->tree_blocks[tree_idx] = block;
283
284 page = buf_block_get_frame(block);
285 page_ssn = page_get_ssn_id(page);
286
287 /* If there are splits, push the splitted page.
288 Note that we have SX lock on index->lock, there
289 should not be any split/shrink happening here */
290 if (page_ssn > path_ssn) {
291 uint32_t next_page_no = btr_page_get_next(page);
292 rtr_non_leaf_stack_push(
293 rtr_info->path, next_page_no, path_ssn,
294 level, 0, NULL, 0);
295
296 if (!srv_read_only_mode
297 && mode != PAGE_CUR_RTREE_INSERT
298 && mode != PAGE_CUR_RTREE_LOCATE) {
299 ut_ad(rtr_info->thr);
300 lock_place_prdt_page_lock(
301 page_id_t(block->page.id().space(),
302 next_page_no),
303 index,
304 rtr_info->thr);
305 }
306 new_split = true;
307 #if defined(UNIV_GIS_DEBUG)
308 fprintf(stderr,
309 "GIS_DIAG: Splitted page found: %d, %ld\n",
310 static_cast<int>(need_parent), next_page_no);
311 #endif
312 }
313
314 page_cursor = btr_cur_get_page_cur(btr_cur);
315 page_cursor->rec = NULL;
316
317 if (mode == PAGE_CUR_RTREE_LOCATE) {
318 if (level == target_level && level == 0) {
319 ulint low_match;
320
321 found = false;
322
323 low_match = page_cur_search(
324 block, index, tuple,
325 PAGE_CUR_LE,
326 btr_cur_get_page_cur(btr_cur));
327
328 if (low_match == dtuple_get_n_fields_cmp(
329 tuple)) {
330 rec_t* rec = btr_cur_get_rec(btr_cur);
331
332 if (!rec_get_deleted_flag(rec,
333 dict_table_is_comp(index->table))
334 || (!for_delete && !for_undo_ins)) {
335 found = true;
336 btr_cur->low_match = low_match;
337 } else {
338 /* mark we found deleted row */
339 btr_cur->rtr_info->fd_del
340 = true;
341 }
342 }
343 } else {
344 page_cur_mode_t page_mode = mode;
345
346 if (level == target_level
347 && target_level != 0) {
348 page_mode = PAGE_CUR_RTREE_GET_FATHER;
349 }
350 found = rtr_cur_search_with_match(
351 block, index, tuple, page_mode,
352 page_cursor, btr_cur->rtr_info);
353
354 /* Save the position of parent if needed */
355 if (found && need_parent) {
356 btr_pcur_t* r_cursor =
357 rtr_get_parent_cursor(
358 btr_cur, level, false);
359
360 rec_t* rec = page_cur_get_rec(
361 page_cursor);
362 page_cur_position(
363 rec, block,
364 btr_pcur_get_page_cur(r_cursor));
365 r_cursor->pos_state =
366 BTR_PCUR_IS_POSITIONED;
367 r_cursor->latch_mode = my_latch_mode;
368 btr_pcur_store_position(r_cursor, mtr);
369 #ifdef UNIV_DEBUG
370 ulint num_stored =
371 rtr_store_parent_path(
372 block, btr_cur,
373 rw_latch, level, mtr);
374 ut_ad(num_stored > 0);
375 #else
376 rtr_store_parent_path(
377 block, btr_cur, rw_latch,
378 level, mtr);
379 #endif /* UNIV_DEBUG */
380 }
381 }
382 } else {
383 found = rtr_cur_search_with_match(
384 block, index, tuple, mode, page_cursor,
385 btr_cur->rtr_info);
386 }
387
388 /* Attach predicate lock if needed, no matter whether
389 there are matched records */
390 if (mode != PAGE_CUR_RTREE_INSERT
391 && mode != PAGE_CUR_RTREE_LOCATE
392 && mode >= PAGE_CUR_CONTAIN
393 && btr_cur->rtr_info->need_prdt_lock) {
394 lock_prdt_t prdt;
395
396 trx_t* trx = thr_get_trx(
397 btr_cur->rtr_info->thr);
398 lock_mutex_enter();
399 lock_init_prdt_from_mbr(
400 &prdt, &btr_cur->rtr_info->mbr,
401 mode, trx->lock.lock_heap);
402 lock_mutex_exit();
403
404 if (rw_latch == RW_NO_LATCH) {
405 rw_lock_s_lock(&(block->lock));
406 }
407
408 lock_prdt_lock(block, &prdt, index, LOCK_S,
409 LOCK_PREDICATE, btr_cur->rtr_info->thr);
410
411 if (rw_latch == RW_NO_LATCH) {
412 rw_lock_s_unlock(&(block->lock));
413 }
414 }
415
416 if (found) {
417 if (level == target_level) {
418 page_cur_t* r_cur;;
419
420 if (my_latch_mode == BTR_MODIFY_TREE
421 && level == 0) {
422 ut_ad(rw_latch == RW_NO_LATCH);
423
424 btr_cur_latch_leaves(
425 block,
426 BTR_MODIFY_TREE,
427 btr_cur, mtr);
428 }
429
430 r_cur = btr_cur_get_page_cur(btr_cur);
431
432 page_cur_position(
433 page_cur_get_rec(page_cursor),
434 page_cur_get_block(page_cursor),
435 r_cur);
436
437 btr_cur->low_match = level != 0 ?
438 DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1
439 : btr_cur->low_match;
440 break;
441 }
442
443 /* Keep the parent path node, which points to
444 last node just located */
445 skip_parent = true;
446 } else {
447 /* Release latch on the current page */
448 ut_ad(rtr_info->tree_blocks[tree_idx]);
449
450 mtr_release_block_at_savepoint(
451 mtr, rtr_info->tree_savepoints[tree_idx],
452 rtr_info->tree_blocks[tree_idx]);
453 rtr_info->tree_blocks[tree_idx] = NULL;
454 }
455
456 } while (!rtr_info->path->empty());
457
458 const rec_t* rec = btr_cur_get_rec(btr_cur);
459
460 if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
461 mtr_commit(mtr);
462 mtr_start(mtr);
463 } else if (!index_locked) {
464 mtr_memo_release(mtr, dict_index_get_lock(index),
465 MTR_MEMO_X_LOCK);
466 }
467
468 return(found);
469 }
470
471 /*************************************************************//**
472 Find the next matching record. This function will first exhaust
473 the copied record listed in the rtr_info->matches vector before
474 moving to the next page
475 @return true if there is suitable record found, otherwise false */
476 bool
rtr_pcur_move_to_next(const dtuple_t * tuple,page_cur_mode_t mode,btr_pcur_t * cursor,ulint level,mtr_t * mtr)477 rtr_pcur_move_to_next(
478 /*==================*/
479 const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
480 tuple must be set so that it cannot get
481 compared to the node ptr page number field! */
482 page_cur_mode_t mode, /*!< in: cursor search mode */
483 btr_pcur_t* cursor, /*!< in: persistent cursor; NOTE that the
484 function may release the page latch */
485 ulint level, /*!< in: target level */
486 mtr_t* mtr) /*!< in: mtr */
487 {
488 rtr_info_t* rtr_info = cursor->btr_cur.rtr_info;
489
490 ut_a(cursor->pos_state == BTR_PCUR_IS_POSITIONED);
491
492 mutex_enter(&rtr_info->matches->rtr_match_mutex);
493 /* First retrieve the next record on the current page */
494 if (!rtr_info->matches->matched_recs->empty()) {
495 rtr_rec_t rec;
496 rec = rtr_info->matches->matched_recs->back();
497 rtr_info->matches->matched_recs->pop_back();
498 mutex_exit(&rtr_info->matches->rtr_match_mutex);
499
500 cursor->btr_cur.page_cur.rec = rec.r_rec;
501 cursor->btr_cur.page_cur.block = &rtr_info->matches->block;
502
503 DEBUG_SYNC_C("rtr_pcur_move_to_next_return");
504 return(true);
505 }
506
507 mutex_exit(&rtr_info->matches->rtr_match_mutex);
508
509 /* Fetch the next page */
510 return(rtr_pcur_getnext_from_path(tuple, mode, &cursor->btr_cur,
511 level, cursor->latch_mode,
512 false, mtr));
513 }
514
515 /*************************************************************//**
516 Check if the cursor holds record pointing to the specified child page
517 @return true if it is (pointing to the child page) false otherwise */
518 static
519 bool
rtr_compare_cursor_rec(dict_index_t * index,btr_cur_t * cursor,ulint page_no,mem_heap_t ** heap)520 rtr_compare_cursor_rec(
521 /*===================*/
522 dict_index_t* index, /*!< in: index */
523 btr_cur_t* cursor, /*!< in: Cursor to check */
524 ulint page_no, /*!< in: desired child page number */
525 mem_heap_t** heap) /*!< in: memory heap */
526 {
527 const rec_t* rec;
528 rec_offs* offsets;
529
530 rec = btr_cur_get_rec(cursor);
531
532 offsets = rec_get_offsets(rec, index, NULL, 0, ULINT_UNDEFINED, heap);
533
534 return(btr_node_ptr_get_child_page_no(rec, offsets) == page_no);
535 }
536
537 /**************************************************************//**
538 Initializes and opens a persistent cursor to an index tree. It should be
539 closed with btr_pcur_close. Mainly called by row_search_index_entry() */
540 void
rtr_pcur_open_low(dict_index_t * index,ulint level,const dtuple_t * tuple,page_cur_mode_t mode,ulint latch_mode,btr_pcur_t * cursor,const char * file,unsigned line,mtr_t * mtr)541 rtr_pcur_open_low(
542 /*==============*/
543 dict_index_t* index, /*!< in: index */
544 ulint level, /*!< in: level in the rtree */
545 const dtuple_t* tuple, /*!< in: tuple on which search done */
546 page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_LOCATE, ... */
547 ulint latch_mode,/*!< in: BTR_SEARCH_LEAF, ... */
548 btr_pcur_t* cursor, /*!< in: memory buffer for persistent cursor */
549 const char* file, /*!< in: file name */
550 unsigned line, /*!< in: line where called */
551 mtr_t* mtr) /*!< in: mtr */
552 {
553 btr_cur_t* btr_cursor;
554 ulint n_fields;
555 ulint low_match;
556 rec_t* rec;
557 bool tree_latched = false;
558 bool for_delete = false;
559 bool for_undo_ins = false;
560
561 ut_ad(level == 0);
562
563 ut_ad(latch_mode & BTR_MODIFY_LEAF || latch_mode & BTR_MODIFY_TREE);
564 ut_ad(mode == PAGE_CUR_RTREE_LOCATE);
565
566 /* Initialize the cursor */
567
568 btr_pcur_init(cursor);
569
570 for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
571 for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
572
573 cursor->latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
574 cursor->search_mode = mode;
575
576 /* Search with the tree cursor */
577
578 btr_cursor = btr_pcur_get_btr_cur(cursor);
579
580 btr_cursor->rtr_info = rtr_create_rtr_info(false, false,
581 btr_cursor, index);
582
583 /* Purge will SX lock the tree instead of take Page Locks */
584 if (btr_cursor->thr) {
585 btr_cursor->rtr_info->need_page_lock = true;
586 btr_cursor->rtr_info->thr = btr_cursor->thr;
587 }
588
589 btr_cur_search_to_nth_level(index, level, tuple, mode, latch_mode,
590 btr_cursor, 0, file, line, mtr);
591 cursor->pos_state = BTR_PCUR_IS_POSITIONED;
592
593 cursor->trx_if_known = NULL;
594
595 low_match = btr_pcur_get_low_match(cursor);
596
597 rec = btr_pcur_get_rec(cursor);
598
599 n_fields = dtuple_get_n_fields(tuple);
600
601 if (latch_mode & BTR_ALREADY_S_LATCHED) {
602 ut_ad(mtr->memo_contains(index->lock, MTR_MEMO_S_LOCK));
603 tree_latched = true;
604 }
605
606 if (latch_mode & BTR_MODIFY_TREE) {
607 ut_ad(mtr->memo_contains_flagged(&index->lock,
608 MTR_MEMO_X_LOCK
609 | MTR_MEMO_SX_LOCK));
610 tree_latched = true;
611 }
612
613 if (page_rec_is_infimum(rec) || low_match != n_fields
614 || (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
615 && (for_delete || for_undo_ins))) {
616
617 if (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
618 && for_delete) {
619 btr_cursor->rtr_info->fd_del = true;
620 btr_cursor->low_match = 0;
621 }
622 /* Did not find matched row in first dive. Release
623 latched block if any before search more pages */
624 if (latch_mode & BTR_MODIFY_LEAF) {
625 ulint tree_idx = btr_cursor->tree_height - 1;
626 rtr_info_t* rtr_info = btr_cursor->rtr_info;
627
628 ut_ad(level == 0);
629
630 if (rtr_info->tree_blocks[tree_idx]) {
631 mtr_release_block_at_savepoint(
632 mtr,
633 rtr_info->tree_savepoints[tree_idx],
634 rtr_info->tree_blocks[tree_idx]);
635 rtr_info->tree_blocks[tree_idx] = NULL;
636 }
637 }
638
639 bool ret = rtr_pcur_getnext_from_path(
640 tuple, mode, btr_cursor, level, latch_mode,
641 tree_latched, mtr);
642
643 if (ret) {
644 low_match = btr_pcur_get_low_match(cursor);
645 ut_ad(low_match == n_fields);
646 }
647 }
648 }
649
650 /* Get the rtree page father.
651 @param[in] index rtree index
652 @param[in] block child page in the index
653 @param[in] mtr mtr
654 @param[in] sea_cur search cursor, contains information
655 about parent nodes in search
656 @param[in] cursor cursor on node pointer record,
657 its page x-latched */
658 void
rtr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)659 rtr_page_get_father(
660 dict_index_t* index,
661 buf_block_t* block,
662 mtr_t* mtr,
663 btr_cur_t* sea_cur,
664 btr_cur_t* cursor)
665 {
666 mem_heap_t* heap = mem_heap_create(100);
667 #ifdef UNIV_DEBUG
668 rec_offs* offsets;
669
670 offsets = rtr_page_get_father_block(
671 NULL, heap, index, block, mtr, sea_cur, cursor);
672
673 ulint page_no = btr_node_ptr_get_child_page_no(cursor->page_cur.rec,
674 offsets);
675
676 ut_ad(page_no == block->page.id().page_no());
677 #else
678 rtr_page_get_father_block(
679 NULL, heap, index, block, mtr, sea_cur, cursor);
680 #endif
681
682 mem_heap_free(heap);
683 }
684
685 /********************************************************************//**
686 Returns the upper level node pointer to a R-Tree page. It is assumed
687 that mtr holds an x-latch on the tree. */
rtr_get_father_node(dict_index_t * index,ulint level,const dtuple_t * tuple,btr_cur_t * sea_cur,btr_cur_t * btr_cur,ulint page_no,mtr_t * mtr)688 static void rtr_get_father_node(
689 dict_index_t* index, /*!< in: index */
690 ulint level, /*!< in: the tree level of search */
691 const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
692 tuple must be set so that it cannot get
693 compared to the node ptr page number field! */
694 btr_cur_t* sea_cur,/*!< in: search cursor */
695 btr_cur_t* btr_cur,/*!< in/out: tree cursor; the cursor page is
696 s- or x-latched, but see also above! */
697 ulint page_no,/*!< Current page no */
698 mtr_t* mtr) /*!< in: mtr */
699 {
700 mem_heap_t* heap = NULL;
701 bool ret = false;
702 const rec_t* rec;
703 ulint n_fields;
704 bool new_rtr = false;
705
706 /* Try to optimally locate the parent node. Level should always
707 less than sea_cur->tree_height unless the root is splitting */
708 if (sea_cur && sea_cur->tree_height > level) {
709 ut_ad(mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
710 | MTR_MEMO_SX_LOCK));
711 ret = rtr_cur_restore_position(
712 BTR_CONT_MODIFY_TREE, sea_cur, level, mtr);
713
714 /* Once we block shrink tree nodes while there are
715 active search on it, this optimal locating should always
716 succeeds */
717 ut_ad(ret);
718
719 if (ret) {
720 btr_pcur_t* r_cursor = rtr_get_parent_cursor(
721 sea_cur, level, false);
722
723 rec = btr_pcur_get_rec(r_cursor);
724
725 ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
726 page_cur_position(rec,
727 btr_pcur_get_block(r_cursor),
728 btr_cur_get_page_cur(btr_cur));
729 btr_cur->rtr_info = sea_cur->rtr_info;
730 btr_cur->tree_height = sea_cur->tree_height;
731 ut_ad(rtr_compare_cursor_rec(
732 index, btr_cur, page_no, &heap));
733 goto func_exit;
734 }
735 }
736
737 /* We arrive here in one of two scenario
738 1) check table and btr_valide
739 2) index root page being raised */
740 ut_ad(!sea_cur || sea_cur->tree_height == level);
741
742 if (btr_cur->rtr_info) {
743 rtr_clean_rtr_info(btr_cur->rtr_info, true);
744 } else {
745 new_rtr = true;
746 }
747
748 btr_cur->rtr_info = rtr_create_rtr_info(false, false, btr_cur, index);
749
750 if (sea_cur && sea_cur->tree_height == level) {
751 /* root split, and search the new root */
752 btr_cur_search_to_nth_level(
753 index, level, tuple, PAGE_CUR_RTREE_LOCATE,
754 BTR_CONT_MODIFY_TREE, btr_cur, 0,
755 __FILE__, __LINE__, mtr);
756
757 } else {
758 /* btr_validate */
759 ut_ad(level >= 1);
760 ut_ad(!sea_cur);
761
762 btr_cur_search_to_nth_level(
763 index, level, tuple, PAGE_CUR_RTREE_LOCATE,
764 BTR_CONT_MODIFY_TREE, btr_cur, 0,
765 __FILE__, __LINE__, mtr);
766
767 rec = btr_cur_get_rec(btr_cur);
768 n_fields = dtuple_get_n_fields_cmp(tuple);
769
770 if (page_rec_is_infimum(rec)
771 || (btr_cur->low_match != n_fields)) {
772 ret = rtr_pcur_getnext_from_path(
773 tuple, PAGE_CUR_RTREE_LOCATE, btr_cur,
774 level, BTR_CONT_MODIFY_TREE,
775 true, mtr);
776
777 ut_ad(ret && btr_cur->low_match == n_fields);
778 }
779 }
780
781 ret = rtr_compare_cursor_rec(
782 index, btr_cur, page_no, &heap);
783
784 ut_ad(ret);
785
786 func_exit:
787 if (heap) {
788 mem_heap_free(heap);
789 }
790
791 if (new_rtr && btr_cur->rtr_info) {
792 rtr_clean_rtr_info(btr_cur->rtr_info, true);
793 btr_cur->rtr_info = NULL;
794 }
795 }
796
797 /** Returns the upper level node pointer to a R-Tree page. It is assumed
798 that mtr holds an SX-latch or X-latch on the tree.
799 @return rec_get_offsets() of the node pointer record */
800 static
801 rec_offs*
rtr_page_get_father_node_ptr(rec_offs * offsets,mem_heap_t * heap,btr_cur_t * sea_cur,btr_cur_t * cursor,mtr_t * mtr)802 rtr_page_get_father_node_ptr(
803 rec_offs* offsets,/*!< in: work area for the return value */
804 mem_heap_t* heap, /*!< in: memory heap to use */
805 btr_cur_t* sea_cur,/*!< in: search cursor */
806 btr_cur_t* cursor, /*!< in: cursor pointing to user record,
807 out: cursor on node pointer record,
808 its page x-latched */
809 mtr_t* mtr) /*!< in: mtr */
810 {
811 dtuple_t* tuple;
812 rec_t* user_rec;
813 rec_t* node_ptr;
814 ulint level;
815 ulint page_no;
816 dict_index_t* index;
817 rtr_mbr_t mbr;
818
819 page_no = btr_cur_get_block(cursor)->page.id().page_no();
820 index = btr_cur_get_index(cursor);
821
822 ut_ad(srv_read_only_mode
823 || mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
824 | MTR_MEMO_SX_LOCK));
825
826 ut_ad(dict_index_get_page(index) != page_no);
827
828 level = btr_page_get_level(btr_cur_get_page(cursor));
829
830 user_rec = btr_cur_get_rec(cursor);
831 ut_a(page_rec_is_user_rec(user_rec));
832
833 offsets = rec_get_offsets(user_rec, index, offsets,
834 level ? 0 : index->n_fields,
835 ULINT_UNDEFINED, &heap);
836 rtr_get_mbr_from_rec(user_rec, offsets, &mbr);
837
838 tuple = rtr_index_build_node_ptr(
839 index, &mbr, user_rec, page_no, heap);
840
841 if (sea_cur && !sea_cur->rtr_info) {
842 sea_cur = NULL;
843 }
844
845 rtr_get_father_node(index, level + 1, tuple, sea_cur, cursor,
846 page_no, mtr);
847
848 node_ptr = btr_cur_get_rec(cursor);
849 ut_ad(!page_rec_is_comp(node_ptr)
850 || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
851 offsets = rec_get_offsets(node_ptr, index, offsets, 0,
852 ULINT_UNDEFINED, &heap);
853
854 ulint child_page = btr_node_ptr_get_child_page_no(node_ptr, offsets);
855
856 if (child_page != page_no) {
857 const rec_t* print_rec;
858
859 ib::fatal error;
860
861 error << "Corruption of index " << index->name
862 << " of table " << index->table->name
863 << " parent page " << page_no
864 << " child page " << child_page;
865
866 print_rec = page_rec_get_next(
867 page_get_infimum_rec(page_align(user_rec)));
868 offsets = rec_get_offsets(print_rec, index, offsets,
869 page_rec_is_leaf(user_rec)
870 ? index->n_fields : 0,
871 ULINT_UNDEFINED, &heap);
872 error << "; child ";
873 rec_print(error.m_oss, print_rec,
874 rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
875 offsets);
876 offsets = rec_get_offsets(node_ptr, index, offsets, 0,
877 ULINT_UNDEFINED, &heap);
878 error << "; parent ";
879 rec_print(error.m_oss, print_rec,
880 rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
881 offsets);
882
883 error << ". You should dump + drop + reimport the table to"
884 " fix the corruption. If the crash happens at"
885 " database startup, see "
886 "https://mariadb.com/kb/en/library/innodb-recovery-modes/"
887 " about forcing"
888 " recovery. Then dump + drop + reimport.";
889 }
890
891 return(offsets);
892 }
893
894 /************************************************************//**
895 Returns the father block to a page. It is assumed that mtr holds
896 an X or SX latch on the tree.
897 @return rec_get_offsets() of the node pointer record */
898 rec_offs*
rtr_page_get_father_block(rec_offs * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)899 rtr_page_get_father_block(
900 /*======================*/
901 rec_offs* offsets,/*!< in: work area for the return value */
902 mem_heap_t* heap, /*!< in: memory heap to use */
903 dict_index_t* index, /*!< in: b-tree index */
904 buf_block_t* block, /*!< in: child page in the index */
905 mtr_t* mtr, /*!< in: mtr */
906 btr_cur_t* sea_cur,/*!< in: search cursor, contains information
907 about parent nodes in search */
908 btr_cur_t* cursor) /*!< out: cursor on node pointer record,
909 its page x-latched */
910 {
911 rec_t* rec = page_rec_get_next(
912 page_get_infimum_rec(buf_block_get_frame(block)));
913 btr_cur_position(index, rec, block, cursor);
914
915 return(rtr_page_get_father_node_ptr(offsets, heap, sea_cur,
916 cursor, mtr));
917 }
918
919 /*******************************************************************//**
920 Create a RTree search info structure */
921 rtr_info_t*
rtr_create_rtr_info(bool need_prdt,bool init_matches,btr_cur_t * cursor,dict_index_t * index)922 rtr_create_rtr_info(
923 /******************/
924 bool need_prdt, /*!< in: Whether predicate lock
925 is needed */
926 bool init_matches, /*!< in: Whether to initiate the
927 "matches" structure for collecting
928 matched leaf records */
929 btr_cur_t* cursor, /*!< in: tree search cursor */
930 dict_index_t* index) /*!< in: index struct */
931 {
932 rtr_info_t* rtr_info;
933
934 index = index ? index : cursor->index;
935 ut_ad(index);
936
937 rtr_info = static_cast<rtr_info_t*>(ut_zalloc_nokey(sizeof(*rtr_info)));
938
939 rtr_info->allocated = true;
940 rtr_info->cursor = cursor;
941 rtr_info->index = index;
942
943 if (init_matches) {
944 rtr_info->heap = mem_heap_create(sizeof(*(rtr_info->matches)));
945 rtr_info->matches = static_cast<matched_rec_t*>(
946 mem_heap_zalloc(
947 rtr_info->heap,
948 sizeof(*rtr_info->matches)));
949
950 rtr_info->matches->matched_recs
951 = UT_NEW_NOKEY(rtr_rec_vector());
952
953 rtr_info->matches->bufp = page_align(rtr_info->matches->rec_buf
954 + UNIV_PAGE_SIZE_MAX + 1);
955 mutex_create(LATCH_ID_RTR_MATCH_MUTEX,
956 &rtr_info->matches->rtr_match_mutex);
957 rw_lock_create(PFS_NOT_INSTRUMENTED,
958 &(rtr_info->matches->block.lock),
959 SYNC_LEVEL_VARYING);
960 }
961
962 rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
963 rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
964 rtr_info->need_prdt_lock = need_prdt;
965 mutex_create(LATCH_ID_RTR_PATH_MUTEX,
966 &rtr_info->rtr_path_mutex);
967
968 mutex_enter(&index->rtr_track->rtr_active_mutex);
969 index->rtr_track->rtr_active.push_front(rtr_info);
970 mutex_exit(&index->rtr_track->rtr_active_mutex);
971 return(rtr_info);
972 }
973
974 /*******************************************************************//**
975 Update a btr_cur_t with rtr_info */
976 void
rtr_info_update_btr(btr_cur_t * cursor,rtr_info_t * rtr_info)977 rtr_info_update_btr(
978 /******************/
979 btr_cur_t* cursor, /*!< in/out: tree cursor */
980 rtr_info_t* rtr_info) /*!< in: rtr_info to set to the
981 cursor */
982 {
983 ut_ad(rtr_info);
984
985 cursor->rtr_info = rtr_info;
986 }
987
988 /*******************************************************************//**
989 Initialize a R-Tree Search structure */
990 void
rtr_init_rtr_info(rtr_info_t * rtr_info,bool need_prdt,btr_cur_t * cursor,dict_index_t * index,bool reinit)991 rtr_init_rtr_info(
992 /****************/
993 rtr_info_t* rtr_info, /*!< in: rtr_info to set to the
994 cursor */
995 bool need_prdt, /*!< in: Whether predicate lock is
996 needed */
997 btr_cur_t* cursor, /*!< in: tree search cursor */
998 dict_index_t* index, /*!< in: index structure */
999 bool reinit) /*!< in: Whether this is a reinit */
1000 {
1001 ut_ad(rtr_info);
1002
1003 if (!reinit) {
1004 /* Reset all members. */
1005 rtr_info->path = NULL;
1006 rtr_info->parent_path = NULL;
1007 rtr_info->matches = NULL;
1008
1009 mutex_create(LATCH_ID_RTR_PATH_MUTEX,
1010 &rtr_info->rtr_path_mutex);
1011
1012 memset(rtr_info->tree_blocks, 0x0,
1013 sizeof(rtr_info->tree_blocks));
1014 memset(rtr_info->tree_savepoints, 0x0,
1015 sizeof(rtr_info->tree_savepoints));
1016 rtr_info->mbr.xmin = 0.0;
1017 rtr_info->mbr.xmax = 0.0;
1018 rtr_info->mbr.ymin = 0.0;
1019 rtr_info->mbr.ymax = 0.0;
1020 rtr_info->thr = NULL;
1021 rtr_info->heap = NULL;
1022 rtr_info->cursor = NULL;
1023 rtr_info->index = NULL;
1024 rtr_info->need_prdt_lock = false;
1025 rtr_info->need_page_lock = false;
1026 rtr_info->allocated = false;
1027 rtr_info->mbr_adj = false;
1028 rtr_info->fd_del = false;
1029 rtr_info->search_tuple = NULL;
1030 rtr_info->search_mode = PAGE_CUR_UNSUPP;
1031 }
1032
1033 ut_ad(!rtr_info->matches || rtr_info->matches->matched_recs->empty());
1034
1035 rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
1036 rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
1037 rtr_info->need_prdt_lock = need_prdt;
1038 rtr_info->cursor = cursor;
1039 rtr_info->index = index;
1040
1041 mutex_enter(&index->rtr_track->rtr_active_mutex);
1042 index->rtr_track->rtr_active.push_front(rtr_info);
1043 mutex_exit(&index->rtr_track->rtr_active_mutex);
1044 }
1045
1046 /**************************************************************//**
1047 Clean up R-Tree search structure */
1048 void
rtr_clean_rtr_info(rtr_info_t * rtr_info,bool free_all)1049 rtr_clean_rtr_info(
1050 /*===============*/
1051 rtr_info_t* rtr_info, /*!< in: RTree search info */
1052 bool free_all) /*!< in: need to free rtr_info itself */
1053 {
1054 dict_index_t* index;
1055 bool initialized = false;
1056
1057 if (!rtr_info) {
1058 return;
1059 }
1060
1061 index = rtr_info->index;
1062
1063 if (index) {
1064 mutex_enter(&index->rtr_track->rtr_active_mutex);
1065 }
1066
1067 while (rtr_info->parent_path && !rtr_info->parent_path->empty()) {
1068 btr_pcur_t* cur = rtr_info->parent_path->back().cursor;
1069 rtr_info->parent_path->pop_back();
1070
1071 if (cur) {
1072 btr_pcur_close(cur);
1073 ut_free(cur);
1074 }
1075 }
1076
1077 UT_DELETE(rtr_info->parent_path);
1078 rtr_info->parent_path = NULL;
1079
1080 if (rtr_info->path != NULL) {
1081 UT_DELETE(rtr_info->path);
1082 rtr_info->path = NULL;
1083 initialized = true;
1084 }
1085
1086 if (rtr_info->matches) {
1087 rtr_info->matches->used = false;
1088 rtr_info->matches->locked = false;
1089 rtr_info->matches->valid = false;
1090 rtr_info->matches->matched_recs->clear();
1091 }
1092
1093 if (index) {
1094 index->rtr_track->rtr_active.remove(rtr_info);
1095 mutex_exit(&index->rtr_track->rtr_active_mutex);
1096 }
1097
1098 if (free_all) {
1099 if (rtr_info->matches) {
1100 if (rtr_info->matches->matched_recs != NULL) {
1101 UT_DELETE(rtr_info->matches->matched_recs);
1102 }
1103
1104 rw_lock_free(&(rtr_info->matches->block.lock));
1105
1106 mutex_destroy(&rtr_info->matches->rtr_match_mutex);
1107 }
1108
1109 if (rtr_info->heap) {
1110 mem_heap_free(rtr_info->heap);
1111 }
1112
1113 if (initialized) {
1114 mutex_destroy(&rtr_info->rtr_path_mutex);
1115 }
1116
1117 if (rtr_info->allocated) {
1118 ut_free(rtr_info);
1119 }
1120 }
1121 }
1122
1123 /**************************************************************//**
1124 Rebuilt the "path" to exclude the removing page no */
1125 static
1126 void
rtr_rebuild_path(rtr_info_t * rtr_info,ulint page_no)1127 rtr_rebuild_path(
1128 /*=============*/
1129 rtr_info_t* rtr_info, /*!< in: RTree search info */
1130 ulint page_no) /*!< in: need to free rtr_info itself */
1131 {
1132 rtr_node_path_t* new_path
1133 = UT_NEW_NOKEY(rtr_node_path_t());
1134
1135 rtr_node_path_t::iterator rit;
1136 #ifdef UNIV_DEBUG
1137 ulint before_size = rtr_info->path->size();
1138 #endif /* UNIV_DEBUG */
1139
1140 for (rit = rtr_info->path->begin();
1141 rit != rtr_info->path->end(); ++rit) {
1142 node_visit_t next_rec = *rit;
1143
1144 if (next_rec.page_no == page_no) {
1145 continue;
1146 }
1147
1148 new_path->push_back(next_rec);
1149 #ifdef UNIV_DEBUG
1150 node_visit_t rec = new_path->back();
1151 ut_ad(rec.level < rtr_info->cursor->tree_height
1152 && rec.page_no > 0);
1153 #endif /* UNIV_DEBUG */
1154 }
1155
1156 UT_DELETE(rtr_info->path);
1157
1158 ut_ad(new_path->size() == before_size - 1);
1159
1160 rtr_info->path = new_path;
1161
1162 if (!rtr_info->parent_path->empty()) {
1163 rtr_node_path_t* new_parent_path = UT_NEW_NOKEY(
1164 rtr_node_path_t());
1165
1166 for (rit = rtr_info->parent_path->begin();
1167 rit != rtr_info->parent_path->end(); ++rit) {
1168 node_visit_t next_rec = *rit;
1169
1170 if (next_rec.child_no == page_no) {
1171 btr_pcur_t* cur = next_rec.cursor;
1172
1173 if (cur) {
1174 btr_pcur_close(cur);
1175 ut_free(cur);
1176 }
1177
1178 continue;
1179 }
1180
1181 new_parent_path->push_back(next_rec);
1182 }
1183 UT_DELETE(rtr_info->parent_path);
1184 rtr_info->parent_path = new_parent_path;
1185 }
1186
1187 }
1188
1189 /**************************************************************//**
1190 Check whether a discarding page is in anyone's search path */
1191 void
rtr_check_discard_page(dict_index_t * index,btr_cur_t * cursor,buf_block_t * block)1192 rtr_check_discard_page(
1193 /*===================*/
1194 dict_index_t* index, /*!< in: index */
1195 btr_cur_t* cursor, /*!< in: cursor on the page to discard: not on
1196 the root page */
1197 buf_block_t* block) /*!< in: block of page to be discarded */
1198 {
1199 const ulint pageno = block->page.id().page_no();
1200
1201 mutex_enter(&index->rtr_track->rtr_active_mutex);
1202
1203 for (const auto& rtr_info : index->rtr_track->rtr_active) {
1204 if (cursor && rtr_info == cursor->rtr_info) {
1205 continue;
1206 }
1207
1208 mutex_enter(&rtr_info->rtr_path_mutex);
1209 for (const node_visit_t& node : *rtr_info->path) {
1210 if (node.page_no == pageno) {
1211 rtr_rebuild_path(rtr_info, pageno);
1212 break;
1213 }
1214 }
1215 mutex_exit(&rtr_info->rtr_path_mutex);
1216
1217 if (rtr_info->matches) {
1218 mutex_enter(&rtr_info->matches->rtr_match_mutex);
1219
1220 if ((&rtr_info->matches->block)->page.id().page_no()
1221 == pageno) {
1222 if (!rtr_info->matches->matched_recs->empty()) {
1223 rtr_info->matches->matched_recs->clear();
1224 }
1225 ut_ad(rtr_info->matches->matched_recs->empty());
1226 rtr_info->matches->valid = false;
1227 }
1228
1229 mutex_exit(&rtr_info->matches->rtr_match_mutex);
1230 }
1231 }
1232
1233 mutex_exit(&index->rtr_track->rtr_active_mutex);
1234
1235 lock_mutex_enter();
1236 lock_prdt_page_free_from_discard(block, &lock_sys.prdt_hash);
1237 lock_prdt_page_free_from_discard(block, &lock_sys.prdt_page_hash);
1238 lock_mutex_exit();
1239 }
1240
1241 /** Structure acts as functor to get the optimistic access of the page.
1242 It returns true if it successfully gets the page. */
1243 struct optimistic_get
1244 {
1245 btr_pcur_t *const r_cursor;
1246 mtr_t *const mtr;
1247
optimistic_getoptimistic_get1248 optimistic_get(btr_pcur_t *r_cursor,mtr_t *mtr)
1249 :r_cursor(r_cursor), mtr(mtr) {}
1250
operator ()optimistic_get1251 bool operator()(buf_block_t *hint) const
1252 {
1253 return hint && buf_page_optimistic_get(
1254 RW_X_LATCH, hint, r_cursor->modify_clock, __FILE__,
1255 __LINE__, mtr);
1256 }
1257 };
1258
1259 /** Restore the stored position of a persistent cursor bufferfixing the page */
1260 static
1261 bool
rtr_cur_restore_position(ulint latch_mode,btr_cur_t * btr_cur,ulint level,mtr_t * mtr)1262 rtr_cur_restore_position(
1263 ulint latch_mode, /*!< in: BTR_SEARCH_LEAF, ... */
1264 btr_cur_t* btr_cur, /*!< in: detached persistent cursor */
1265 ulint level, /*!< in: index level */
1266 mtr_t* mtr) /*!< in: mtr */
1267 {
1268 dict_index_t* index;
1269 mem_heap_t* heap;
1270 btr_pcur_t* r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
1271 dtuple_t* tuple;
1272 bool ret = false;
1273
1274 ut_ad(mtr);
1275 ut_ad(r_cursor);
1276 ut_ad(mtr->is_active());
1277
1278 index = btr_cur_get_index(btr_cur);
1279
1280 if (r_cursor->rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE
1281 || r_cursor->rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE) {
1282 return(false);
1283 }
1284
1285 DBUG_EXECUTE_IF(
1286 "rtr_pessimistic_position",
1287 r_cursor->modify_clock = 100;
1288 );
1289
1290 ut_ad(latch_mode == BTR_CONT_MODIFY_TREE);
1291
1292 if (r_cursor->block_when_stored.run_with_hint(
1293 optimistic_get(r_cursor, mtr))) {
1294 ut_ad(r_cursor->pos_state == BTR_PCUR_IS_POSITIONED);
1295
1296 ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
1297 #ifdef UNIV_DEBUG
1298 do {
1299 const rec_t* rec;
1300 const rec_offs* offsets1;
1301 const rec_offs* offsets2;
1302 ulint comp;
1303
1304 rec = btr_pcur_get_rec(r_cursor);
1305
1306 heap = mem_heap_create(256);
1307 offsets1 = rec_get_offsets(
1308 r_cursor->old_rec, index, NULL,
1309 level ? 0 : r_cursor->old_n_fields,
1310 r_cursor->old_n_fields, &heap);
1311 offsets2 = rec_get_offsets(
1312 rec, index, NULL,
1313 level ? 0 : r_cursor->old_n_fields,
1314 r_cursor->old_n_fields, &heap);
1315
1316 comp = rec_offs_comp(offsets1);
1317
1318 if (rec_get_info_bits(r_cursor->old_rec, comp)
1319 & REC_INFO_MIN_REC_FLAG) {
1320 ut_ad(rec_get_info_bits(rec, comp)
1321 & REC_INFO_MIN_REC_FLAG);
1322 } else {
1323
1324 ut_ad(!cmp_rec_rec(r_cursor->old_rec,
1325 rec, offsets1, offsets2,
1326 index));
1327 }
1328
1329 mem_heap_free(heap);
1330 } while (0);
1331 #endif /* UNIV_DEBUG */
1332
1333 return(true);
1334 }
1335
1336 /* Page has changed, for R-Tree, the page cannot be shrunk away,
1337 so we search the page and its right siblings */
1338 buf_block_t* block;
1339 node_seq_t page_ssn;
1340 const page_t* page;
1341 page_cur_t* page_cursor;
1342 node_visit_t* node = rtr_get_parent_node(btr_cur, level, false);
1343 node_seq_t path_ssn = node->seq_no;
1344 const unsigned zip_size = index->table->space->zip_size();
1345 uint32_t page_no = node->page_no;
1346
1347 heap = mem_heap_create(256);
1348
1349 tuple = dict_index_build_data_tuple(r_cursor->old_rec, index, !level,
1350 r_cursor->old_n_fields, heap);
1351
1352 page_cursor = btr_pcur_get_page_cur(r_cursor);
1353 ut_ad(r_cursor == node->cursor);
1354
1355 search_again:
1356 dberr_t err = DB_SUCCESS;
1357
1358 block = buf_page_get_gen(
1359 page_id_t(index->table->space_id, page_no),
1360 zip_size, RW_X_LATCH, NULL,
1361 BUF_GET, __FILE__, __LINE__, mtr, &err);
1362
1363 ut_ad(block);
1364
1365 /* Get the page SSN */
1366 page = buf_block_get_frame(block);
1367 page_ssn = page_get_ssn_id(page);
1368
1369 ulint low_match = page_cur_search(
1370 block, index, tuple, PAGE_CUR_LE, page_cursor);
1371
1372 if (low_match == r_cursor->old_n_fields) {
1373 const rec_t* rec;
1374 const rec_offs* offsets1;
1375 const rec_offs* offsets2;
1376 ulint comp;
1377
1378 rec = btr_pcur_get_rec(r_cursor);
1379
1380 offsets1 = rec_get_offsets(r_cursor->old_rec, index, NULL,
1381 level ? 0 : r_cursor->old_n_fields,
1382 r_cursor->old_n_fields, &heap);
1383 offsets2 = rec_get_offsets(rec, index, NULL,
1384 level ? 0 : r_cursor->old_n_fields,
1385 r_cursor->old_n_fields, &heap);
1386
1387 comp = rec_offs_comp(offsets1);
1388
1389 if ((rec_get_info_bits(r_cursor->old_rec, comp)
1390 & REC_INFO_MIN_REC_FLAG)
1391 && (rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG)) {
1392 r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1393 ret = true;
1394 } else if (!cmp_rec_rec(r_cursor->old_rec, rec, offsets1, offsets2,
1395 index)) {
1396 r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1397 ret = true;
1398 }
1399 }
1400
1401 /* Check the page SSN to see if it has been splitted, if so, search
1402 the right page */
1403 if (!ret && page_ssn > path_ssn) {
1404 page_no = btr_page_get_next(page);
1405 goto search_again;
1406 }
1407
1408 mem_heap_free(heap);
1409
1410 return(ret);
1411 }
1412
1413 /****************************************************************//**
1414 Copy the leaf level R-tree record, and push it to matched_rec in rtr_info */
1415 static
1416 void
rtr_leaf_push_match_rec(const rec_t * rec,rtr_info_t * rtr_info,rec_offs * offsets,bool is_comp)1417 rtr_leaf_push_match_rec(
1418 /*====================*/
1419 const rec_t* rec, /*!< in: record to copy */
1420 rtr_info_t* rtr_info, /*!< in/out: search stack */
1421 rec_offs* offsets, /*!< in: offsets */
1422 bool is_comp) /*!< in: is compact format */
1423 {
1424 byte* buf;
1425 matched_rec_t* match_rec = rtr_info->matches;
1426 rec_t* copy;
1427 ulint data_len;
1428 rtr_rec_t rtr_rec;
1429
1430 buf = match_rec->block.frame + match_rec->used;
1431 ut_ad(page_rec_is_leaf(rec));
1432
1433 copy = rec_copy(buf, rec, offsets);
1434
1435 if (is_comp) {
1436 rec_set_next_offs_new(copy, PAGE_NEW_SUPREMUM);
1437 } else {
1438 rec_set_next_offs_old(copy, PAGE_OLD_SUPREMUM);
1439 }
1440
1441 rtr_rec.r_rec = copy;
1442 rtr_rec.locked = false;
1443
1444 match_rec->matched_recs->push_back(rtr_rec);
1445 match_rec->valid = true;
1446
1447 data_len = rec_offs_data_size(offsets) + rec_offs_extra_size(offsets);
1448 match_rec->used += data_len;
1449
1450 ut_ad(match_rec->used < srv_page_size);
1451 }
1452
1453 /**************************************************************//**
1454 Store the parent path cursor
1455 @return number of cursor stored */
1456 ulint
rtr_store_parent_path(const buf_block_t * block,btr_cur_t * btr_cur,ulint latch_mode,ulint level,mtr_t * mtr)1457 rtr_store_parent_path(
1458 /*==================*/
1459 const buf_block_t* block, /*!< in: block of the page */
1460 btr_cur_t* btr_cur,/*!< in/out: persistent cursor */
1461 ulint latch_mode,
1462 /*!< in: latch_mode */
1463 ulint level, /*!< in: index level */
1464 mtr_t* mtr) /*!< in: mtr */
1465 {
1466 ulint num = btr_cur->rtr_info->parent_path->size();
1467 ulint num_stored = 0;
1468
1469 while (num >= 1) {
1470 node_visit_t* node = &(*btr_cur->rtr_info->parent_path)[
1471 num - 1];
1472 btr_pcur_t* r_cursor = node->cursor;
1473 buf_block_t* cur_block;
1474
1475 if (node->level > level) {
1476 break;
1477 }
1478
1479 r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1480 r_cursor->latch_mode = latch_mode;
1481
1482 cur_block = btr_pcur_get_block(r_cursor);
1483
1484 if (cur_block == block) {
1485 btr_pcur_store_position(r_cursor, mtr);
1486 num_stored++;
1487 } else {
1488 break;
1489 }
1490
1491 num--;
1492 }
1493
1494 return(num_stored);
1495 }
1496 /**************************************************************//**
1497 push a nonleaf index node to the search path for insertion */
1498 static
1499 void
rtr_non_leaf_insert_stack_push(dict_index_t * index,rtr_node_path_t * path,ulint level,uint32_t child_no,const buf_block_t * block,const rec_t * rec,double mbr_inc)1500 rtr_non_leaf_insert_stack_push(
1501 /*===========================*/
1502 dict_index_t* index, /*!< in: index descriptor */
1503 rtr_node_path_t* path, /*!< in/out: search path */
1504 ulint level, /*!< in: index page level */
1505 uint32_t child_no,/*!< in: child page no */
1506 const buf_block_t* block, /*!< in: block of the page */
1507 const rec_t* rec, /*!< in: positioned record */
1508 double mbr_inc)/*!< in: MBR needs to be enlarged */
1509 {
1510 node_seq_t new_seq;
1511 btr_pcur_t* my_cursor;
1512
1513 my_cursor = static_cast<btr_pcur_t*>(
1514 ut_malloc_nokey(sizeof(*my_cursor)));
1515
1516 btr_pcur_init(my_cursor);
1517
1518 page_cur_position(rec, block, btr_pcur_get_page_cur(my_cursor));
1519
1520 (btr_pcur_get_btr_cur(my_cursor))->index = index;
1521
1522 new_seq = rtr_get_current_ssn_id(index);
1523 rtr_non_leaf_stack_push(path, block->page.id().page_no(),
1524 new_seq, level, child_no, my_cursor, mbr_inc);
1525 }
1526
1527 /** Copy a buf_block_t, except "block->lock".
1528 @param[in,out] matches copy to match->block
1529 @param[in] block block to copy */
1530 static
1531 void
rtr_copy_buf(matched_rec_t * matches,const buf_block_t * block)1532 rtr_copy_buf(
1533 matched_rec_t* matches,
1534 const buf_block_t* block)
1535 {
1536 /* Copy all members of "block" to "matches->block" except "lock".
1537 We skip "lock" because it is not used
1538 from the dummy buf_block_t we create here and because memcpy()ing
1539 it generates (valid) compiler warnings that the vtable pointer
1540 will be copied. */
1541 new (&matches->block.page) buf_page_t(block->page);
1542 matches->block.frame = block->frame;
1543 matches->block.unzip_LRU = block->unzip_LRU;
1544
1545 ut_d(matches->block.in_unzip_LRU_list = block->in_unzip_LRU_list);
1546 ut_d(matches->block.in_withdraw_list = block->in_withdraw_list);
1547
1548 /* Skip buf_block_t::lock */
1549 matches->block.modify_clock = block->modify_clock;
1550 #ifdef BTR_CUR_HASH_ADAPT
1551 matches->block.n_hash_helps = block->n_hash_helps;
1552 matches->block.n_fields = block->n_fields;
1553 matches->block.left_side = block->left_side;
1554 #if defined UNIV_AHI_DEBUG || defined UNIV_DEBUG
1555 matches->block.n_pointers = 0;
1556 #endif /* UNIV_AHI_DEBUG || UNIV_DEBUG */
1557 matches->block.curr_n_fields = block->curr_n_fields;
1558 matches->block.curr_left_side = block->curr_left_side;
1559 matches->block.index = block->index;
1560 #endif /* BTR_CUR_HASH_ADAPT */
1561 ut_d(matches->block.debug_latch = NULL);
1562 }
1563
1564 /****************************************************************//**
1565 Generate a shadow copy of the page block header to save the
1566 matched records */
1567 static
1568 void
rtr_init_match(matched_rec_t * matches,const buf_block_t * block,const page_t * page)1569 rtr_init_match(
1570 /*===========*/
1571 matched_rec_t* matches,/*!< in/out: match to initialize */
1572 const buf_block_t* block, /*!< in: buffer block */
1573 const page_t* page) /*!< in: buffer page */
1574 {
1575 ut_ad(matches->matched_recs->empty());
1576 matches->locked = false;
1577 rtr_copy_buf(matches, block);
1578 matches->block.frame = matches->bufp;
1579 matches->valid = false;
1580 /* We have to copy PAGE_W*_SUPREMUM_END bytes so that we can
1581 use infimum/supremum of this page as normal btr page for search. */
1582 memcpy(matches->block.frame, page, page_is_comp(page)
1583 ? PAGE_NEW_SUPREMUM_END
1584 : PAGE_OLD_SUPREMUM_END);
1585 matches->used = page_is_comp(page)
1586 ? PAGE_NEW_SUPREMUM_END
1587 : PAGE_OLD_SUPREMUM_END;
1588 #ifdef RTR_SEARCH_DIAGNOSTIC
1589 ulint pageno = page_get_page_no(page);
1590 fprintf(stderr, "INNODB_RTR: Searching leaf page %d\n",
1591 static_cast<int>(pageno));
1592 #endif /* RTR_SEARCH_DIAGNOSTIC */
1593 }
1594
1595 /****************************************************************//**
1596 Get the bounding box content from an index record */
1597 void
rtr_get_mbr_from_rec(const rec_t * rec,const rec_offs * offsets,rtr_mbr_t * mbr)1598 rtr_get_mbr_from_rec(
1599 /*=================*/
1600 const rec_t* rec, /*!< in: data tuple */
1601 const rec_offs* offsets,/*!< in: offsets array */
1602 rtr_mbr_t* mbr) /*!< out MBR */
1603 {
1604 ulint rec_f_len;
1605 const byte* data;
1606
1607 data = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1608
1609 rtr_read_mbr(data, mbr);
1610 }
1611
1612 /****************************************************************//**
1613 Get the bounding box content from a MBR data record */
1614 void
rtr_get_mbr_from_tuple(const dtuple_t * dtuple,rtr_mbr * mbr)1615 rtr_get_mbr_from_tuple(
1616 /*===================*/
1617 const dtuple_t* dtuple, /*!< in: data tuple */
1618 rtr_mbr* mbr) /*!< out: mbr to fill */
1619 {
1620 const dfield_t* dtuple_field;
1621 ulint dtuple_f_len;
1622
1623 dtuple_field = dtuple_get_nth_field(dtuple, 0);
1624 dtuple_f_len = dfield_get_len(dtuple_field);
1625 ut_a(dtuple_f_len >= 4 * sizeof(double));
1626
1627 rtr_read_mbr(static_cast<const byte*>(dfield_get_data(dtuple_field)),
1628 mbr);
1629 }
1630
1631 /** Compare minimum bounding rectangles.
1632 @return 1, 0, -1, if mode == PAGE_CUR_MBR_EQUAL. And return
1633 1, 0 for rest compare modes, depends on a and b qualifies the
1634 relationship (CONTAINS, WITHIN etc.) */
cmp_gis_field(page_cur_mode_t mode,const void * a,const void * b)1635 static int cmp_gis_field(page_cur_mode_t mode, const void *a, const void *b)
1636 {
1637 return mode == PAGE_CUR_MBR_EQUAL
1638 ? cmp_geometry_field(a, b)
1639 : rtree_key_cmp(mode, a, b);
1640 }
1641
1642 /** Compare a GIS data tuple to a physical record in rtree non-leaf node.
1643 We need to check the page number field, since we don't store pk field in
1644 rtree non-leaf node.
1645 @param[in] dtuple data tuple
1646 @param[in] rec R-tree record
1647 @return whether dtuple is less than rec */
1648 static bool
cmp_dtuple_rec_with_gis_internal(const dtuple_t * dtuple,const rec_t * rec)1649 cmp_dtuple_rec_with_gis_internal(const dtuple_t* dtuple, const rec_t* rec)
1650 {
1651 const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0);
1652 ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN);
1653
1654 if (cmp_gis_field(PAGE_CUR_WITHIN, dfield_get_data(dtuple_field), rec))
1655 return true;
1656
1657 dtuple_field= dtuple_get_nth_field(dtuple, 1);
1658 ut_ad(dfield_get_len(dtuple_field) == 4); /* child page number */
1659 ut_ad(dtuple_field->type.mtype == DATA_SYS_CHILD);
1660 ut_ad(!(dtuple_field->type.prtype & ~DATA_NOT_NULL));
1661
1662 return memcmp(dtuple_field->data, rec + DATA_MBR_LEN, 4) != 0;
1663 }
1664
1665 #ifndef UNIV_DEBUG
1666 static
1667 #endif
1668 /** Compare a GIS data tuple to a physical record.
1669 @param[in] dtuple data tuple
1670 @param[in] rec R-tree record
1671 @param[in] mode compare mode
1672 @retval negative if dtuple is less than rec */
cmp_dtuple_rec_with_gis(const dtuple_t * dtuple,const rec_t * rec,page_cur_mode_t mode)1673 int cmp_dtuple_rec_with_gis(const dtuple_t *dtuple, const rec_t *rec,
1674 page_cur_mode_t mode)
1675 {
1676 const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0);
1677 /* FIXME: TABLE_SHARE::init_from_binary_frm_image() is adding
1678 field->key_part_length_bytes() to the key length */
1679 ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN ||
1680 dfield_get_len(dtuple_field) == DATA_MBR_LEN + 2);
1681
1682 return cmp_gis_field(mode, dfield_get_data(dtuple_field), rec);
1683 }
1684
1685 /****************************************************************//**
1686 Searches the right position in rtree for a page cursor. */
1687 bool
rtr_cur_search_with_match(const buf_block_t * block,dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,page_cur_t * cursor,rtr_info_t * rtr_info)1688 rtr_cur_search_with_match(
1689 /*======================*/
1690 const buf_block_t* block, /*!< in: buffer block */
1691 dict_index_t* index, /*!< in: index descriptor */
1692 const dtuple_t* tuple, /*!< in: data tuple */
1693 page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_INSERT,
1694 PAGE_CUR_RTREE_LOCATE etc. */
1695 page_cur_t* cursor, /*!< in/out: page cursor */
1696 rtr_info_t* rtr_info)/*!< in/out: search stack */
1697 {
1698 bool found = false;
1699 const page_t* page;
1700 const rec_t* rec;
1701 const rec_t* last_rec;
1702 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
1703 rec_offs* offsets = offsets_;
1704 mem_heap_t* heap = NULL;
1705 int cmp = 1;
1706 double least_inc = DBL_MAX;
1707 const rec_t* best_rec;
1708 const rec_t* last_match_rec = NULL;
1709 bool match_init = false;
1710 page_cur_mode_t orig_mode = mode;
1711 const rec_t* first_rec = NULL;
1712
1713 rec_offs_init(offsets_);
1714
1715 ut_ad(RTREE_SEARCH_MODE(mode));
1716
1717 ut_ad(dict_index_is_spatial(index));
1718
1719 page = buf_block_get_frame(block);
1720
1721 const ulint level = btr_page_get_level(page);
1722 const ulint n_core = level ? 0 : index->n_fields;
1723
1724 if (mode == PAGE_CUR_RTREE_LOCATE) {
1725 ut_ad(level != 0);
1726 mode = PAGE_CUR_WITHIN;
1727 }
1728
1729 rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0));
1730
1731 last_rec = rec;
1732 best_rec = rec;
1733
1734 if (page_rec_is_infimum(rec)) {
1735 rec = page_rec_get_next_const(rec);
1736 }
1737
1738 /* Check insert tuple size is larger than first rec, and try to
1739 avoid it if possible */
1740 if (mode == PAGE_CUR_RTREE_INSERT && !page_rec_is_supremum(rec)) {
1741
1742 ulint new_rec_size = rec_get_converted_size(index, tuple, 0);
1743
1744 offsets = rec_get_offsets(rec, index, offsets, n_core,
1745 dtuple_get_n_fields_cmp(tuple),
1746 &heap);
1747
1748 if (rec_offs_size(offsets) < new_rec_size) {
1749 first_rec = rec;
1750 }
1751
1752 /* If this is the left-most page of this index level
1753 and the table is a compressed table, try to avoid
1754 first page as much as possible, as there will be problem
1755 when update MIN_REC rec in compress table */
1756 if (is_buf_block_get_page_zip(block)
1757 && !page_has_prev(page)
1758 && page_get_n_recs(page) >= 2) {
1759
1760 rec = page_rec_get_next_const(rec);
1761 }
1762 }
1763
1764 while (!page_rec_is_supremum(rec)) {
1765 if (!n_core) {
1766 switch (mode) {
1767 case PAGE_CUR_CONTAIN:
1768 case PAGE_CUR_INTERSECT:
1769 case PAGE_CUR_MBR_EQUAL:
1770 /* At non-leaf level, we will need to check
1771 both CONTAIN and INTERSECT for either of
1772 the search mode */
1773 cmp = cmp_dtuple_rec_with_gis(
1774 tuple, rec, PAGE_CUR_CONTAIN);
1775
1776 if (cmp != 0) {
1777 cmp = cmp_dtuple_rec_with_gis(
1778 tuple, rec,
1779 PAGE_CUR_INTERSECT);
1780 }
1781 break;
1782 case PAGE_CUR_DISJOINT:
1783 cmp = cmp_dtuple_rec_with_gis(
1784 tuple, rec, mode);
1785
1786 if (cmp != 0) {
1787 cmp = cmp_dtuple_rec_with_gis(
1788 tuple, rec,
1789 PAGE_CUR_INTERSECT);
1790 }
1791 break;
1792 case PAGE_CUR_RTREE_INSERT:
1793 double increase;
1794 double area;
1795
1796 cmp = cmp_dtuple_rec_with_gis(
1797 tuple, rec, PAGE_CUR_WITHIN);
1798
1799 if (cmp != 0) {
1800 increase = rtr_rec_cal_increase(
1801 tuple, rec, &area);
1802 /* Once it goes beyond DBL_MAX,
1803 it would not make sense to record
1804 such value, just make it
1805 DBL_MAX / 2 */
1806 if (increase >= DBL_MAX) {
1807 increase = DBL_MAX / 2;
1808 }
1809
1810 if (increase < least_inc) {
1811 least_inc = increase;
1812 best_rec = rec;
1813 } else if (best_rec
1814 && best_rec == first_rec) {
1815 /* if first_rec is set,
1816 we will try to avoid it */
1817 least_inc = increase;
1818 best_rec = rec;
1819 }
1820 }
1821 break;
1822 case PAGE_CUR_RTREE_GET_FATHER:
1823 cmp = cmp_dtuple_rec_with_gis_internal(
1824 tuple, rec);
1825 break;
1826 default:
1827 /* WITHIN etc. */
1828 cmp = cmp_dtuple_rec_with_gis(
1829 tuple, rec, mode);
1830 }
1831 } else {
1832 /* At leaf level, INSERT should translate to LE */
1833 ut_ad(mode != PAGE_CUR_RTREE_INSERT);
1834
1835 cmp = cmp_dtuple_rec_with_gis(
1836 tuple, rec, mode);
1837 }
1838
1839 if (cmp == 0) {
1840 found = true;
1841
1842 /* If located, the matching node/rec will be pushed
1843 to rtr_info->path for non-leaf nodes, or
1844 rtr_info->matches for leaf nodes */
1845 if (rtr_info && mode != PAGE_CUR_RTREE_INSERT) {
1846 if (!n_core) {
1847 uint32_t page_no;
1848 node_seq_t new_seq;
1849 bool is_loc;
1850
1851 is_loc = (orig_mode
1852 == PAGE_CUR_RTREE_LOCATE
1853 || orig_mode
1854 == PAGE_CUR_RTREE_GET_FATHER);
1855
1856 offsets = rec_get_offsets(
1857 rec, index, offsets, 0,
1858 ULINT_UNDEFINED, &heap);
1859
1860 page_no = btr_node_ptr_get_child_page_no(
1861 rec, offsets);
1862
1863 ut_ad(level >= 1);
1864
1865 /* Get current SSN, before we insert
1866 it into the path stack */
1867 new_seq = rtr_get_current_ssn_id(index);
1868
1869 rtr_non_leaf_stack_push(
1870 rtr_info->path,
1871 page_no,
1872 new_seq, level - 1, 0,
1873 NULL, 0);
1874
1875 if (is_loc) {
1876 rtr_non_leaf_insert_stack_push(
1877 index,
1878 rtr_info->parent_path,
1879 level, page_no, block,
1880 rec, 0);
1881 }
1882
1883 if (!srv_read_only_mode
1884 && (rtr_info->need_page_lock
1885 || !is_loc)) {
1886
1887 /* Lock the page, preventing it
1888 from being shrunk */
1889 lock_place_prdt_page_lock(
1890 page_id_t(block->page
1891 .id()
1892 .space(),
1893 page_no),
1894 index,
1895 rtr_info->thr);
1896 }
1897 } else {
1898 ut_ad(orig_mode
1899 != PAGE_CUR_RTREE_LOCATE);
1900
1901 if (!match_init) {
1902 rtr_init_match(
1903 rtr_info->matches,
1904 block, page);
1905 match_init = true;
1906 }
1907
1908 /* Collect matched records on page */
1909 offsets = rec_get_offsets(
1910 rec, index, offsets,
1911 index->n_fields,
1912 ULINT_UNDEFINED, &heap);
1913 rtr_leaf_push_match_rec(
1914 rec, rtr_info, offsets,
1915 page_is_comp(page));
1916 }
1917
1918 last_match_rec = rec;
1919 } else {
1920 /* This is the insertion case, it will break
1921 once it finds the first MBR that can accomodate
1922 the inserting rec */
1923 break;
1924 }
1925 }
1926
1927 last_rec = rec;
1928
1929 rec = page_rec_get_next_const(rec);
1930 }
1931
1932 /* All records on page are searched */
1933 if (page_rec_is_supremum(rec)) {
1934 if (!n_core) {
1935 if (!found) {
1936 /* No match case, if it is for insertion,
1937 then we select the record that result in
1938 least increased area */
1939 if (mode == PAGE_CUR_RTREE_INSERT) {
1940 ut_ad(least_inc < DBL_MAX);
1941 offsets = rec_get_offsets(
1942 best_rec, index, offsets,
1943 0, ULINT_UNDEFINED, &heap);
1944 uint32_t child_no =
1945 btr_node_ptr_get_child_page_no(
1946 best_rec, offsets);
1947
1948 rtr_non_leaf_insert_stack_push(
1949 index, rtr_info->parent_path,
1950 level, child_no, block,
1951 best_rec, least_inc);
1952
1953 page_cur_position(best_rec, block,
1954 cursor);
1955 rtr_info->mbr_adj = true;
1956 } else {
1957 /* Position at the last rec of the
1958 page, if it is not the leaf page */
1959 page_cur_position(last_rec, block,
1960 cursor);
1961 }
1962 } else {
1963 /* There are matching records, position
1964 in the last matching records */
1965 if (rtr_info) {
1966 rec = last_match_rec;
1967 page_cur_position(
1968 rec, block, cursor);
1969 }
1970 }
1971 } else if (rtr_info) {
1972 /* Leaf level, no match, position at the
1973 last (supremum) rec */
1974 if (!last_match_rec) {
1975 page_cur_position(rec, block, cursor);
1976 goto func_exit;
1977 }
1978
1979 /* There are matched records */
1980 matched_rec_t* match_rec = rtr_info->matches;
1981
1982 rtr_rec_t test_rec;
1983
1984 test_rec = match_rec->matched_recs->back();
1985 #ifdef UNIV_DEBUG
1986 rec_offs offsets_2[REC_OFFS_NORMAL_SIZE];
1987 rec_offs* offsets2 = offsets_2;
1988 rec_offs_init(offsets_2);
1989
1990 ut_ad(found);
1991
1992 /* Verify the record to be positioned is the same
1993 as the last record in matched_rec vector */
1994 offsets2 = rec_get_offsets(test_rec.r_rec, index,
1995 offsets2, index->n_fields,
1996 ULINT_UNDEFINED, &heap);
1997
1998 offsets = rec_get_offsets(last_match_rec, index,
1999 offsets, index->n_fields,
2000 ULINT_UNDEFINED, &heap);
2001
2002 ut_ad(cmp_rec_rec(test_rec.r_rec, last_match_rec,
2003 offsets2, offsets, index) == 0);
2004 #endif /* UNIV_DEBUG */
2005 /* Pop the last match record and position on it */
2006 match_rec->matched_recs->pop_back();
2007 page_cur_position(test_rec.r_rec, &match_rec->block,
2008 cursor);
2009 }
2010 } else {
2011
2012 if (mode == PAGE_CUR_RTREE_INSERT) {
2013 ut_ad(!last_match_rec);
2014 rtr_non_leaf_insert_stack_push(
2015 index, rtr_info->parent_path, level,
2016 mach_read_from_4(rec + DATA_MBR_LEN),
2017 block, rec, 0);
2018
2019 } else if (rtr_info && found && !n_core) {
2020 rec = last_match_rec;
2021 }
2022
2023 page_cur_position(rec, block, cursor);
2024 }
2025
2026 #ifdef UNIV_DEBUG
2027 /* Verify that we are positioned at the same child page as pushed in
2028 the path stack */
2029 if (!n_core && (!page_rec_is_supremum(rec) || found)
2030 && mode != PAGE_CUR_RTREE_INSERT) {
2031 ulint page_no;
2032
2033 offsets = rec_get_offsets(rec, index, offsets, 0,
2034 ULINT_UNDEFINED, &heap);
2035 page_no = btr_node_ptr_get_child_page_no(rec, offsets);
2036
2037 if (rtr_info && found) {
2038 rtr_node_path_t* path = rtr_info->path;
2039 node_visit_t last_visit = path->back();
2040
2041 ut_ad(last_visit.page_no == page_no);
2042 }
2043 }
2044 #endif /* UNIV_DEBUG */
2045
2046 func_exit:
2047 if (UNIV_LIKELY_NULL(heap)) {
2048 mem_heap_free(heap);
2049 }
2050
2051 return(found);
2052 }
2053