1 /*****************************************************************************
2
3 Copyright (c) 2017, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file gis/gis0sea.cc
29 InnoDB R-tree search interfaces
30
31 Created 2014/01/16 Jimmy Yang
32 ***********************************************************************/
33
34 #include "fsp0fsp.h"
35 #include "page0page.h"
36 #include "page0cur.h"
37 #include "page0zip.h"
38 #include "gis0rtree.h"
39
40 #ifndef UNIV_HOTBACKUP
41 #include "btr0cur.h"
42 #include "btr0sea.h"
43 #include "btr0pcur.h"
44 #include "rem0cmp.h"
45 #include "lock0lock.h"
46 #include "ibuf0ibuf.h"
47 #include "trx0trx.h"
48 #include "srv0mon.h"
49 #include "gis0geo.h"
50 #include "sync0sync.h"
51
52 #endif /* UNIV_HOTBACKUP */
53
54 /*************************************************************//**
55 Pop out used parent path entry, until we find the parent with matching
56 page number */
57 static
58 void
rtr_adjust_parent_path(rtr_info_t * rtr_info,ulint page_no)59 rtr_adjust_parent_path(
60 /*===================*/
61 rtr_info_t* rtr_info, /* R-Tree info struct */
62 ulint page_no) /* page number to look for */
63 {
64 while (!rtr_info->parent_path->empty()) {
65 if (rtr_info->parent_path->back().child_no == page_no) {
66 break;
67 } else {
68 if (rtr_info->parent_path->back().cursor) {
69 btr_pcur_close(
70 rtr_info->parent_path->back().cursor);
71 ut_free(rtr_info->parent_path->back().cursor);
72 }
73
74 rtr_info->parent_path->pop_back();
75 }
76 }
77 }
78
79 /*************************************************************//**
80 Find the next matching record. This function is used by search
81 or record locating during index delete/update.
82 @return true if there is suitable record found, otherwise false */
83 static
84 bool
rtr_pcur_getnext_from_path(const dtuple_t * tuple,page_cur_mode_t mode,btr_cur_t * btr_cur,ulint target_level,ulint latch_mode,bool index_locked,mtr_t * mtr)85 rtr_pcur_getnext_from_path(
86 /*=======================*/
87 const dtuple_t* tuple, /*!< in: data tuple */
88 page_cur_mode_t mode, /*!< in: cursor search mode */
89 btr_cur_t* btr_cur,/*!< in: persistent cursor; NOTE that the
90 function may release the page latch */
91 ulint target_level,
92 /*!< in: target level */
93 ulint latch_mode,
94 /*!< in: latch_mode */
95 bool index_locked,
96 /*!< in: index tree locked */
97 mtr_t* mtr) /*!< in: mtr */
98 {
99 dict_index_t* index = btr_cur->index;
100 bool found = false;
101 ulint space = dict_index_get_space(index);
102 page_cur_t* page_cursor;
103 ulint level = 0;
104 node_visit_t next_rec;
105 rtr_info_t* rtr_info = btr_cur->rtr_info;
106 node_seq_t page_ssn;
107 ulint my_latch_mode;
108 ulint skip_parent = false;
109 bool new_split = false;
110 bool need_parent;
111 bool for_delete = false;
112 bool for_undo_ins = false;
113
114 /* exhausted all the pages to be searched */
115 if (rtr_info->path->empty()) {
116 return(false);
117 }
118
119 ut_ad(dtuple_get_n_fields_cmp(tuple));
120
121 my_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
122
123 for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
124 for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
125
126 /* There should be no insert coming to this function. Only
127 mode with BTR_MODIFY_* should be delete */
128 ut_ad(mode != PAGE_CUR_RTREE_INSERT);
129 ut_ad(my_latch_mode == BTR_SEARCH_LEAF
130 || my_latch_mode == BTR_MODIFY_LEAF
131 || my_latch_mode == BTR_MODIFY_TREE
132 || my_latch_mode == BTR_CONT_MODIFY_TREE);
133
134 /* Whether need to track parent information. Only need so
135 when we do tree altering operations (such as index page merge) */
136 need_parent = ((my_latch_mode == BTR_MODIFY_TREE
137 || my_latch_mode == BTR_CONT_MODIFY_TREE)
138 && mode == PAGE_CUR_RTREE_LOCATE);
139
140 if (!index_locked) {
141 ut_ad(latch_mode & BTR_SEARCH_LEAF
142 || latch_mode & BTR_MODIFY_LEAF);
143 mtr_s_lock(dict_index_get_lock(index), mtr);
144 } else {
145 ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
146 MTR_MEMO_SX_LOCK)
147 || mtr_memo_contains(mtr, dict_index_get_lock(index),
148 MTR_MEMO_S_LOCK)
149 || mtr_memo_contains(mtr, dict_index_get_lock(index),
150 MTR_MEMO_X_LOCK));
151 }
152
153 const page_size_t& page_size = dict_table_page_size(index->table);
154
155 /* Pop each node/page to be searched from "path" structure
156 and do a search on it. Please note, any pages that are in
157 the "path" structure are protected by "page" lock, so tey
158 cannot be shrunk away */
159 do {
160 buf_block_t* block;
161 node_seq_t path_ssn;
162 const page_t* page;
163 ulint rw_latch = RW_X_LATCH;
164 ulint tree_idx;
165
166 mutex_enter(&rtr_info->rtr_path_mutex);
167 next_rec = rtr_info->path->back();
168 rtr_info->path->pop_back();
169 level = next_rec.level;
170 path_ssn = next_rec.seq_no;
171 tree_idx = btr_cur->tree_height - level - 1;
172
173 /* Maintain the parent path info as well, if needed */
174 if (need_parent && !skip_parent && !new_split) {
175 ulint old_level;
176 ulint new_level;
177
178 ut_ad(!rtr_info->parent_path->empty());
179
180 /* Cleanup unused parent info */
181 if (rtr_info->parent_path->back().cursor) {
182 btr_pcur_close(
183 rtr_info->parent_path->back().cursor);
184 ut_free(rtr_info->parent_path->back().cursor);
185 }
186
187 old_level = rtr_info->parent_path->back().level;
188
189 rtr_info->parent_path->pop_back();
190
191 ut_ad(!rtr_info->parent_path->empty());
192
193 /* check whether there is a level change. If so,
194 the current parent path needs to pop enough
195 nodes to adjust to the new search page */
196 new_level = rtr_info->parent_path->back().level;
197
198 if (old_level < new_level) {
199 rtr_adjust_parent_path(
200 rtr_info, next_rec.page_no);
201 }
202
203 ut_ad(!rtr_info->parent_path->empty());
204
205 ut_ad(next_rec.page_no
206 == rtr_info->parent_path->back().child_no);
207 }
208
209 mutex_exit(&rtr_info->rtr_path_mutex);
210
211 skip_parent = false;
212 new_split = false;
213
214 /* Once we have pages in "path", these pages are
215 predicate page locked, so they can't be shrunk away.
216 They also have SSN (split sequence number) to detect
217 splits, so we can directly latch single page while
218 getting them. They can be unlatched if not qualified.
219 One reason for pre-latch is that we might need to position
220 some parent position (requires latch) during search */
221 if (level == 0) {
222 /* S latched for SEARCH_LEAF, and X latched
223 for MODIFY_LEAF */
224 if (my_latch_mode <= BTR_MODIFY_LEAF) {
225 rw_latch = my_latch_mode;
226 }
227
228 if (my_latch_mode == BTR_CONT_MODIFY_TREE
229 || my_latch_mode == BTR_MODIFY_TREE) {
230 rw_latch = RW_NO_LATCH;
231 }
232
233 } else if (level == target_level) {
234 rw_latch = RW_X_LATCH;
235 }
236
237 /* Release previous locked blocks */
238 if (my_latch_mode != BTR_SEARCH_LEAF) {
239 for (ulint idx = 0; idx < btr_cur->tree_height;
240 idx++) {
241 if (rtr_info->tree_blocks[idx]) {
242 mtr_release_block_at_savepoint(
243 mtr,
244 rtr_info->tree_savepoints[idx],
245 rtr_info->tree_blocks[idx]);
246 rtr_info->tree_blocks[idx] = NULL;
247 }
248 }
249 for (ulint idx = RTR_MAX_LEVELS; idx < RTR_MAX_LEVELS + 3;
250 idx++) {
251 if (rtr_info->tree_blocks[idx]) {
252 mtr_release_block_at_savepoint(
253 mtr,
254 rtr_info->tree_savepoints[idx],
255 rtr_info->tree_blocks[idx]);
256 rtr_info->tree_blocks[idx] = NULL;
257 }
258 }
259 }
260
261 /* set up savepoint to record any locks to be taken */
262 rtr_info->tree_savepoints[tree_idx] = mtr_set_savepoint(mtr);
263
264 #ifdef UNIV_RTR_DEBUG
265 ut_ad(!(rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_X)
266 ||
267 rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_S))
268 || my_latch_mode == BTR_MODIFY_TREE
269 || my_latch_mode == BTR_CONT_MODIFY_TREE
270 || !page_is_leaf(buf_block_get_frame(
271 btr_cur->page_cur.block)));
272 #endif /* UNIV_RTR_DEBUG */
273
274 page_id_t page_id(space, next_rec.page_no);
275
276 block = buf_page_get_gen(
277 page_id, page_size,
278 rw_latch, NULL, BUF_GET, __FILE__, __LINE__, mtr);
279
280 if (block == NULL) {
281 continue;
282 } else if (rw_latch != RW_NO_LATCH) {
283 ut_ad(!dict_index_is_ibuf(index));
284 buf_block_dbg_add_level(block, SYNC_TREE_NODE);
285 }
286
287 rtr_info->tree_blocks[tree_idx] = block;
288
289 page = buf_block_get_frame(block);
290 page_ssn = page_get_ssn_id(page);
291
292 /* If there are splits, push the splitted page.
293 Note that we have SX lock on index->lock, there
294 should not be any split/shrink happening here */
295 if (page_ssn > path_ssn) {
296 ulint next_page_no = btr_page_get_next(page, mtr);
297 rtr_non_leaf_stack_push(
298 rtr_info->path, next_page_no, path_ssn,
299 level, 0, NULL, 0);
300
301 if (!srv_read_only_mode
302 && mode != PAGE_CUR_RTREE_INSERT
303 && mode != PAGE_CUR_RTREE_LOCATE) {
304 ut_ad(rtr_info->thr);
305 lock_place_prdt_page_lock(
306 space, next_page_no, index,
307 rtr_info->thr);
308 }
309 new_split = true;
310 #if UNIV_GIS_DEBUG
311 fprintf(stderr,
312 "GIS_DIAG: Splitted page found: %d, %ld\n",
313 static_cast<int>(need_parent), next_page_no);
314 #endif
315 }
316
317 page_cursor = btr_cur_get_page_cur(btr_cur);
318 page_cursor->rec = NULL;
319
320 if (mode == PAGE_CUR_RTREE_LOCATE) {
321 if (level == target_level && level == 0) {
322 ulint low_match;
323
324 found = false;
325
326 low_match = page_cur_search(
327 block, index, tuple,
328 PAGE_CUR_LE,
329 btr_cur_get_page_cur(btr_cur));
330
331 if (low_match == dtuple_get_n_fields_cmp(
332 tuple)) {
333 rec_t* rec = btr_cur_get_rec(btr_cur);
334
335 if (!rec_get_deleted_flag(rec,
336 dict_table_is_comp(index->table))
337 || (!for_delete && !for_undo_ins)) {
338 found = true;
339 btr_cur->low_match = low_match;
340 } else {
341 /* mark we found deleted row */
342 btr_cur->rtr_info->fd_del
343 = true;
344 }
345 }
346 } else {
347 page_cur_mode_t page_mode = mode;
348
349 if (level == target_level
350 && target_level != 0) {
351 page_mode = PAGE_CUR_RTREE_GET_FATHER;
352 }
353 found = rtr_cur_search_with_match(
354 block, index, tuple, page_mode,
355 page_cursor, btr_cur->rtr_info);
356
357 /* Save the position of parent if needed */
358 if (found && need_parent) {
359 btr_pcur_t* r_cursor =
360 rtr_get_parent_cursor(
361 btr_cur, level, false);
362
363 rec_t* rec = page_cur_get_rec(
364 page_cursor);
365 page_cur_position(
366 rec, block,
367 btr_pcur_get_page_cur(r_cursor));
368 r_cursor->pos_state =
369 BTR_PCUR_IS_POSITIONED;
370 r_cursor->latch_mode = my_latch_mode;
371 btr_pcur_store_position(r_cursor, mtr);
372 #ifdef UNIV_DEBUG
373 ulint num_stored =
374 rtr_store_parent_path(
375 block, btr_cur,
376 rw_latch, level, mtr);
377 ut_ad(num_stored > 0);
378 #else
379 rtr_store_parent_path(
380 block, btr_cur, rw_latch,
381 level, mtr);
382 #endif /* UNIV_DEBUG */
383 }
384 }
385 } else {
386 found = rtr_cur_search_with_match(
387 block, index, tuple, mode, page_cursor,
388 btr_cur->rtr_info);
389 }
390
391 /* Attach predicate lock if needed, no matter whether
392 there are matched records */
393 if (mode != PAGE_CUR_RTREE_INSERT
394 && mode != PAGE_CUR_RTREE_LOCATE
395 && mode >= PAGE_CUR_CONTAIN
396 && btr_cur->rtr_info->need_prdt_lock) {
397 lock_prdt_t prdt;
398
399 trx_t* trx = thr_get_trx(
400 btr_cur->rtr_info->thr);
401 lock_mutex_enter();
402 lock_init_prdt_from_mbr(
403 &prdt, &btr_cur->rtr_info->mbr,
404 mode, trx->lock.lock_heap);
405 lock_mutex_exit();
406
407 if (rw_latch == RW_NO_LATCH) {
408 rw_lock_s_lock(&(block->lock));
409 }
410
411 lock_prdt_lock(block, &prdt, index, LOCK_S,
412 LOCK_PREDICATE, btr_cur->rtr_info->thr,
413 mtr);
414
415 if (rw_latch == RW_NO_LATCH) {
416 rw_lock_s_unlock(&(block->lock));
417 }
418 }
419
420 if (found) {
421 if (level == target_level) {
422 page_cur_t* r_cur;;
423
424 if (my_latch_mode == BTR_MODIFY_TREE
425 && level == 0) {
426 ut_ad(rw_latch == RW_NO_LATCH);
427 page_id_t my_page_id(
428 space, block->page.id.page_no());
429
430 btr_cur_latch_leaves(
431 block, my_page_id,
432 page_size, BTR_MODIFY_TREE,
433 btr_cur, mtr);
434 }
435
436 r_cur = btr_cur_get_page_cur(btr_cur);
437
438 page_cur_position(
439 page_cur_get_rec(page_cursor),
440 page_cur_get_block(page_cursor),
441 r_cur);
442
443 btr_cur->low_match = level != 0 ?
444 DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1
445 : btr_cur->low_match;
446 break;
447 }
448
449 /* Keep the parent path node, which points to
450 last node just located */
451 skip_parent = true;
452 } else {
453 /* Release latch on the current page */
454 ut_ad(rtr_info->tree_blocks[tree_idx]);
455
456 mtr_release_block_at_savepoint(
457 mtr, rtr_info->tree_savepoints[tree_idx],
458 rtr_info->tree_blocks[tree_idx]);
459 rtr_info->tree_blocks[tree_idx] = NULL;
460 }
461
462 } while (!rtr_info->path->empty());
463
464 const rec_t* rec = btr_cur_get_rec(btr_cur);
465
466 if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
467 mtr_commit(mtr);
468 mtr_start(mtr);
469 } else if (!index_locked) {
470 mtr_memo_release(mtr, dict_index_get_lock(index),
471 MTR_MEMO_X_LOCK);
472 }
473
474 return(found);
475 }
476
477 /*************************************************************//**
478 Find the next matching record. This function will first exhaust
479 the copied record listed in the rtr_info->matches vector before
480 moving to the next page
481 @return true if there is suitable record found, otherwise false */
482 bool
rtr_pcur_move_to_next(const dtuple_t * tuple,page_cur_mode_t mode,btr_pcur_t * cursor,ulint level,mtr_t * mtr)483 rtr_pcur_move_to_next(
484 /*==================*/
485 const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
486 tuple must be set so that it cannot get
487 compared to the node ptr page number field! */
488 page_cur_mode_t mode, /*!< in: cursor search mode */
489 btr_pcur_t* cursor, /*!< in: persistent cursor; NOTE that the
490 function may release the page latch */
491 ulint level, /*!< in: target level */
492 mtr_t* mtr) /*!< in: mtr */
493 {
494 rtr_info_t* rtr_info = cursor->btr_cur.rtr_info;
495
496 ut_a(cursor->pos_state == BTR_PCUR_IS_POSITIONED);
497
498 mutex_enter(&rtr_info->matches->rtr_match_mutex);
499 /* First retrieve the next record on the current page */
500 if (!rtr_info->matches->matched_recs->empty()) {
501 rtr_rec_t rec;
502 rec = rtr_info->matches->matched_recs->back();
503 rtr_info->matches->matched_recs->pop_back();
504 mutex_exit(&rtr_info->matches->rtr_match_mutex);
505
506 cursor->btr_cur.page_cur.rec = rec.r_rec;
507 cursor->btr_cur.page_cur.block = &rtr_info->matches->block;
508
509 DEBUG_SYNC_C("rtr_pcur_move_to_next_return");
510 return(true);
511 }
512
513 mutex_exit(&rtr_info->matches->rtr_match_mutex);
514
515 /* Fetch the next page */
516 return(rtr_pcur_getnext_from_path(tuple, mode, &cursor->btr_cur,
517 level, cursor->latch_mode,
518 false, mtr));
519 }
520
521 /*************************************************************//**
522 Check if the cursor holds record pointing to the specified child page
523 @return true if it is (pointing to the child page) false otherwise */
524 static
525 bool
rtr_compare_cursor_rec(dict_index_t * index,btr_cur_t * cursor,ulint page_no,mem_heap_t ** heap)526 rtr_compare_cursor_rec(
527 /*===================*/
528 dict_index_t* index, /*!< in: index */
529 btr_cur_t* cursor, /*!< in: Cursor to check */
530 ulint page_no, /*!< in: desired child page number */
531 mem_heap_t** heap) /*!< in: memory heap */
532 {
533 const rec_t* rec;
534 ulint* offsets;
535
536 rec = btr_cur_get_rec(cursor);
537
538 offsets = rec_get_offsets(
539 rec, index, NULL, ULINT_UNDEFINED, heap);
540
541 return(btr_node_ptr_get_child_page_no(rec, offsets) == page_no);
542 }
543
544 /**************************************************************//**
545 Initializes and opens a persistent cursor to an index tree. It should be
546 closed with btr_pcur_close. Mainly called by row_search_index_entry() */
547 void
rtr_pcur_open_low(dict_index_t * index,ulint level,const dtuple_t * tuple,page_cur_mode_t mode,ulint latch_mode,btr_pcur_t * cursor,const char * file,ulint line,mtr_t * mtr)548 rtr_pcur_open_low(
549 /*==============*/
550 dict_index_t* index, /*!< in: index */
551 ulint level, /*!< in: level in the rtree */
552 const dtuple_t* tuple, /*!< in: tuple on which search done */
553 page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_LOCATE, ... */
554 ulint latch_mode,/*!< in: BTR_SEARCH_LEAF, ... */
555 btr_pcur_t* cursor, /*!< in: memory buffer for persistent cursor */
556 const char* file, /*!< in: file name */
557 ulint line, /*!< in: line where called */
558 mtr_t* mtr) /*!< in: mtr */
559 {
560 btr_cur_t* btr_cursor;
561 ulint n_fields;
562 ulint low_match;
563 rec_t* rec;
564 bool tree_latched = false;
565 bool for_delete = false;
566 bool for_undo_ins = false;
567
568 ut_ad(level == 0);
569
570 ut_ad(latch_mode & BTR_MODIFY_LEAF || latch_mode & BTR_MODIFY_TREE);
571 ut_ad(mode == PAGE_CUR_RTREE_LOCATE);
572
573 /* Initialize the cursor */
574
575 btr_pcur_init(cursor);
576
577 for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
578 for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
579
580 cursor->latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
581 cursor->search_mode = mode;
582
583 /* Search with the tree cursor */
584
585 btr_cursor = btr_pcur_get_btr_cur(cursor);
586
587 btr_cursor->rtr_info = rtr_create_rtr_info(false, false,
588 btr_cursor, index);
589
590 /* Purge will SX lock the tree instead of take Page Locks */
591 if (btr_cursor->thr) {
592 btr_cursor->rtr_info->need_page_lock = true;
593 btr_cursor->rtr_info->thr = btr_cursor->thr;
594 }
595
596 btr_cur_search_to_nth_level(index, level, tuple, mode, latch_mode,
597 btr_cursor, 0, file, line, mtr);
598 cursor->pos_state = BTR_PCUR_IS_POSITIONED;
599
600 cursor->trx_if_known = NULL;
601
602 low_match = btr_pcur_get_low_match(cursor);
603
604 rec = btr_pcur_get_rec(cursor);
605
606 n_fields = dtuple_get_n_fields(tuple);
607
608 if (latch_mode & BTR_ALREADY_S_LATCHED) {
609 ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
610 MTR_MEMO_S_LOCK));
611 tree_latched = true;
612 }
613
614 if (latch_mode & BTR_MODIFY_TREE) {
615 ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
616 MTR_MEMO_X_LOCK)
617 || mtr_memo_contains(mtr, dict_index_get_lock(index),
618 MTR_MEMO_SX_LOCK));
619 tree_latched = true;
620 }
621
622 if (page_rec_is_infimum(rec) || low_match != n_fields
623 || (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
624 && (for_delete || for_undo_ins))) {
625
626 if (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
627 && for_delete) {
628 btr_cursor->rtr_info->fd_del = true;
629 btr_cursor->low_match = 0;
630 }
631 /* Did not find matched row in first dive. Release
632 latched block if any before search more pages */
633 if (latch_mode & BTR_MODIFY_LEAF) {
634 ulint tree_idx = btr_cursor->tree_height - 1;
635 rtr_info_t* rtr_info = btr_cursor->rtr_info;
636
637 ut_ad(level == 0);
638
639 if (rtr_info->tree_blocks[tree_idx]) {
640 mtr_release_block_at_savepoint(
641 mtr,
642 rtr_info->tree_savepoints[tree_idx],
643 rtr_info->tree_blocks[tree_idx]);
644 rtr_info->tree_blocks[tree_idx] = NULL;
645 }
646 }
647
648 bool ret = rtr_pcur_getnext_from_path(
649 tuple, mode, btr_cursor, level, latch_mode,
650 tree_latched, mtr);
651
652 if (ret) {
653 low_match = btr_pcur_get_low_match(cursor);
654 ut_ad(low_match == n_fields);
655 }
656 }
657 }
658
659 /* Get the rtree page father.
660 @param[in] index rtree index
661 @param[in] block child page in the index
662 @param[in] mtr mtr
663 @param[in] sea_cur search cursor, contains information
664 about parent nodes in search
665 @param[in] cursor cursor on node pointer record,
666 its page x-latched */
667 void
rtr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)668 rtr_page_get_father(
669 dict_index_t* index,
670 buf_block_t* block,
671 mtr_t* mtr,
672 btr_cur_t* sea_cur,
673 btr_cur_t* cursor)
674 {
675 mem_heap_t* heap = mem_heap_create(100);
676 #ifdef UNIV_DEBUG
677 ulint* offsets;
678
679 offsets = rtr_page_get_father_block(
680 NULL, heap, index, block, mtr, sea_cur, cursor);
681
682 ulint page_no = btr_node_ptr_get_child_page_no(cursor->page_cur.rec,
683 offsets);
684
685 ut_ad(page_no == block->page.id.page_no());
686 #else
687 rtr_page_get_father_block(
688 NULL, heap, index, block, mtr, sea_cur, cursor);
689 #endif
690
691 mem_heap_free(heap);
692 }
693
694 /************************************************************//**
695 Returns the father block to a page. It is assumed that mtr holds
696 an X or SX latch on the tree.
697 @return rec_get_offsets() of the node pointer record */
698 ulint*
rtr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)699 rtr_page_get_father_block(
700 /*======================*/
701 ulint* offsets,/*!< in: work area for the return value */
702 mem_heap_t* heap, /*!< in: memory heap to use */
703 dict_index_t* index, /*!< in: b-tree index */
704 buf_block_t* block, /*!< in: child page in the index */
705 mtr_t* mtr, /*!< in: mtr */
706 btr_cur_t* sea_cur,/*!< in: search cursor, contains information
707 about parent nodes in search */
708 btr_cur_t* cursor) /*!< out: cursor on node pointer record,
709 its page x-latched */
710 {
711
712 rec_t* rec = page_rec_get_next(
713 page_get_infimum_rec(buf_block_get_frame(block)));
714 btr_cur_position(index, rec, block, cursor);
715
716 return(rtr_page_get_father_node_ptr(offsets, heap, sea_cur,
717 cursor, mtr));
718 }
719
720 /************************************************************//**
721 Returns the upper level node pointer to a R-Tree page. It is assumed
722 that mtr holds an x-latch on the tree.
723 @return rec_get_offsets() of the node pointer record */
724 ulint*
rtr_page_get_father_node_ptr_func(ulint * offsets,mem_heap_t * heap,btr_cur_t * sea_cur,btr_cur_t * cursor,const char * file,ulint line,mtr_t * mtr)725 rtr_page_get_father_node_ptr_func(
726 /*==============================*/
727 ulint* offsets,/*!< in: work area for the return value */
728 mem_heap_t* heap, /*!< in: memory heap to use */
729 btr_cur_t* sea_cur,/*!< in: search cursor */
730 btr_cur_t* cursor, /*!< in: cursor pointing to user record,
731 out: cursor on node pointer record,
732 its page x-latched */
733 const char* file, /*!< in: file name */
734 ulint line, /*!< in: line where called */
735 mtr_t* mtr) /*!< in: mtr */
736 {
737 dtuple_t* tuple;
738 rec_t* user_rec;
739 rec_t* node_ptr;
740 ulint level;
741 ulint page_no;
742 dict_index_t* index;
743 rtr_mbr_t mbr;
744
745 page_no = btr_cur_get_block(cursor)->page.id.page_no();
746 index = btr_cur_get_index(cursor);
747
748 ut_ad(srv_read_only_mode
749 || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
750 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
751
752 ut_ad(dict_index_get_page(index) != page_no);
753
754 level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
755
756 user_rec = btr_cur_get_rec(cursor);
757 ut_a(page_rec_is_user_rec(user_rec));
758
759 offsets = rec_get_offsets(user_rec, index, offsets,
760 ULINT_UNDEFINED, &heap);
761 rtr_get_mbr_from_rec(user_rec, offsets, &mbr);
762
763 tuple = rtr_index_build_node_ptr(
764 index, &mbr, user_rec, page_no, heap, level);
765
766 if (sea_cur && !sea_cur->rtr_info) {
767 sea_cur = NULL;
768 }
769
770 rtr_get_father_node(index, level + 1, tuple, sea_cur, cursor,
771 page_no, mtr);
772
773 node_ptr = btr_cur_get_rec(cursor);
774 ut_ad(!page_rec_is_comp(node_ptr)
775 || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
776 offsets = rec_get_offsets(node_ptr, index, offsets,
777 ULINT_UNDEFINED, &heap);
778
779 ulint child_page = btr_node_ptr_get_child_page_no(node_ptr, offsets);
780
781 if (child_page != page_no) {
782 const rec_t* print_rec;
783
784 ib::fatal error;
785
786 error << "Corruption of index " << index->name
787 << " of table " << index->table->name
788 << " parent page " << page_no
789 << " child page " << child_page;
790
791 print_rec = page_rec_get_next(
792 page_get_infimum_rec(page_align(user_rec)));
793 offsets = rec_get_offsets(print_rec, index,
794 offsets, ULINT_UNDEFINED, &heap);
795 error << "; child ";
796 rec_print(error.m_oss, print_rec,
797 rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
798 offsets);
799 offsets = rec_get_offsets(node_ptr, index, offsets,
800 ULINT_UNDEFINED, &heap);
801 error << "; parent ";
802 rec_print(error.m_oss, print_rec,
803 rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
804 offsets);
805
806 error << ". You should dump + drop + reimport the table to"
807 " fix the corruption. If the crash happens at"
808 " database startup, see " REFMAN
809 "forcing-innodb-recovery.html about forcing"
810 " recovery. Then dump + drop + reimport.";
811 }
812
813 return(offsets);
814 }
815
816 /********************************************************************//**
817 Returns the upper level node pointer to a R-Tree page. It is assumed
818 that mtr holds an x-latch on the tree. */
819 void
rtr_get_father_node(dict_index_t * index,ulint level,const dtuple_t * tuple,btr_cur_t * sea_cur,btr_cur_t * btr_cur,ulint page_no,mtr_t * mtr)820 rtr_get_father_node(
821 /*================*/
822 dict_index_t* index, /*!< in: index */
823 ulint level, /*!< in: the tree level of search */
824 const dtuple_t* tuple, /*!< in: data tuple; NOTE: n_fields_cmp in
825 tuple must be set so that it cannot get
826 compared to the node ptr page number field! */
827 btr_cur_t* sea_cur,/*!< in: search cursor */
828 btr_cur_t* btr_cur,/*!< in/out: tree cursor; the cursor page is
829 s- or x-latched, but see also above! */
830 ulint page_no,/*!< Current page no */
831 mtr_t* mtr) /*!< in: mtr */
832 {
833 mem_heap_t* heap = NULL;
834 bool ret = false;
835 const rec_t* rec;
836 ulint n_fields;
837 bool new_rtr = false;
838
839 /* Try to optimally locate the parent node. Level should always
840 less than sea_cur->tree_height unless the root is splitting */
841 if (sea_cur && sea_cur->tree_height > level) {
842
843 ut_ad(mtr_memo_contains_flagged(mtr,
844 dict_index_get_lock(index),
845 MTR_MEMO_X_LOCK
846 | MTR_MEMO_SX_LOCK));
847 ret = rtr_cur_restore_position(
848 BTR_CONT_MODIFY_TREE, sea_cur, level, mtr);
849
850 /* Once we block shrink tree nodes while there are
851 active search on it, this optimal locating should always
852 succeeds */
853 ut_ad(ret);
854
855 if (ret) {
856 btr_pcur_t* r_cursor = rtr_get_parent_cursor(
857 sea_cur, level, false);
858
859 rec = btr_pcur_get_rec(r_cursor);
860
861 ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
862 page_cur_position(rec,
863 btr_pcur_get_block(r_cursor),
864 btr_cur_get_page_cur(btr_cur));
865 btr_cur->rtr_info = sea_cur->rtr_info;
866 btr_cur->tree_height = sea_cur->tree_height;
867 ut_ad(rtr_compare_cursor_rec(
868 index, btr_cur, page_no, &heap));
869 goto func_exit;
870 }
871 }
872
873 /* We arrive here in one of two scenario
874 1) check table and btr_valide
875 2) index root page being raised */
876 ut_ad(!sea_cur || sea_cur->tree_height == level);
877
878 if (btr_cur->rtr_info) {
879 rtr_clean_rtr_info(btr_cur->rtr_info, true);
880 } else {
881 new_rtr = true;
882 }
883
884 btr_cur->rtr_info = rtr_create_rtr_info(false, false, btr_cur, index);
885
886 if (sea_cur && sea_cur->tree_height == level) {
887 /* root split, and search the new root */
888 btr_cur_search_to_nth_level(
889 index, level, tuple, PAGE_CUR_RTREE_LOCATE,
890 BTR_CONT_MODIFY_TREE, btr_cur, 0,
891 __FILE__, __LINE__, mtr);
892
893 } else {
894 /* btr_validate */
895 ut_ad(level >= 1);
896 ut_ad(!sea_cur);
897
898 btr_cur_search_to_nth_level(
899 index, level, tuple, PAGE_CUR_RTREE_LOCATE,
900 BTR_CONT_MODIFY_TREE, btr_cur, 0,
901 __FILE__, __LINE__, mtr);
902
903 rec = btr_cur_get_rec(btr_cur);
904 n_fields = dtuple_get_n_fields_cmp(tuple);
905
906 if (page_rec_is_infimum(rec)
907 || (btr_cur->low_match != n_fields)) {
908 ret = rtr_pcur_getnext_from_path(
909 tuple, PAGE_CUR_RTREE_LOCATE, btr_cur,
910 level, BTR_CONT_MODIFY_TREE,
911 true, mtr);
912
913 ut_ad(ret && btr_cur->low_match == n_fields);
914 }
915 }
916
917 ret = rtr_compare_cursor_rec(
918 index, btr_cur, page_no, &heap);
919
920 ut_ad(ret);
921
922 func_exit:
923 if (heap) {
924 mem_heap_free(heap);
925 }
926
927 if (new_rtr && btr_cur->rtr_info) {
928 rtr_clean_rtr_info(btr_cur->rtr_info, true);
929 btr_cur->rtr_info = NULL;
930 }
931 }
932
933 /*******************************************************************//**
934 Create a RTree search info structure */
935 rtr_info_t*
rtr_create_rtr_info(bool need_prdt,bool init_matches,btr_cur_t * cursor,dict_index_t * index)936 rtr_create_rtr_info(
937 /******************/
938 bool need_prdt, /*!< in: Whether predicate lock
939 is needed */
940 bool init_matches, /*!< in: Whether to initiate the
941 "matches" structure for collecting
942 matched leaf records */
943 btr_cur_t* cursor, /*!< in: tree search cursor */
944 dict_index_t* index) /*!< in: index struct */
945 {
946 rtr_info_t* rtr_info;
947
948 index = index ? index : cursor->index;
949 ut_ad(index);
950
951 rtr_info = static_cast<rtr_info_t*>(ut_zalloc_nokey(sizeof(*rtr_info)));
952
953 rtr_info->allocated = true;
954 rtr_info->cursor = cursor;
955 rtr_info->index = index;
956
957 if (init_matches) {
958 rtr_info->heap = mem_heap_create(sizeof(*(rtr_info->matches)));
959 rtr_info->matches = static_cast<matched_rec_t*>(
960 mem_heap_zalloc(
961 rtr_info->heap,
962 sizeof(*rtr_info->matches)));
963
964 rtr_info->matches->matched_recs
965 = UT_NEW_NOKEY(rtr_rec_vector());
966
967 rtr_info->matches->bufp = page_align(rtr_info->matches->rec_buf
968 + UNIV_PAGE_SIZE_MAX + 1);
969 mutex_create(LATCH_ID_RTR_MATCH_MUTEX,
970 &rtr_info->matches->rtr_match_mutex);
971 rw_lock_create(PFS_NOT_INSTRUMENTED,
972 &(rtr_info->matches->block.lock),
973 SYNC_LEVEL_VARYING);
974 }
975
976 rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
977 rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
978 rtr_info->need_prdt_lock = need_prdt;
979 mutex_create(LATCH_ID_RTR_PATH_MUTEX,
980 &rtr_info->rtr_path_mutex);
981
982 mutex_enter(&index->rtr_track->rtr_active_mutex);
983 index->rtr_track->rtr_active->push_back(rtr_info);
984 mutex_exit(&index->rtr_track->rtr_active_mutex);
985 return(rtr_info);
986 }
987
988 /*******************************************************************//**
989 Update a btr_cur_t with rtr_info */
990 void
rtr_info_update_btr(btr_cur_t * cursor,rtr_info_t * rtr_info)991 rtr_info_update_btr(
992 /******************/
993 btr_cur_t* cursor, /*!< in/out: tree cursor */
994 rtr_info_t* rtr_info) /*!< in: rtr_info to set to the
995 cursor */
996 {
997 ut_ad(rtr_info);
998
999 cursor->rtr_info = rtr_info;
1000 }
1001
1002 /*******************************************************************//**
1003 Initialize a R-Tree Search structure */
1004 void
rtr_init_rtr_info(rtr_info_t * rtr_info,bool need_prdt,btr_cur_t * cursor,dict_index_t * index,bool reinit)1005 rtr_init_rtr_info(
1006 /****************/
1007 rtr_info_t* rtr_info, /*!< in: rtr_info to set to the
1008 cursor */
1009 bool need_prdt, /*!< in: Whether predicate lock is
1010 needed */
1011 btr_cur_t* cursor, /*!< in: tree search cursor */
1012 dict_index_t* index, /*!< in: index structure */
1013 bool reinit) /*!< in: Whether this is a reinit */
1014 {
1015 ut_ad(rtr_info);
1016
1017 if (!reinit) {
1018 /* Reset all members. */
1019 rtr_info->path = NULL;
1020 rtr_info->parent_path = NULL;
1021 rtr_info->matches = NULL;
1022
1023 mutex_create(LATCH_ID_RTR_PATH_MUTEX,
1024 &rtr_info->rtr_path_mutex);
1025
1026 memset(rtr_info->tree_blocks, 0x0,
1027 sizeof(rtr_info->tree_blocks));
1028 memset(rtr_info->tree_savepoints, 0x0,
1029 sizeof(rtr_info->tree_savepoints));
1030 rtr_info->mbr.xmin = 0.0;
1031 rtr_info->mbr.xmax = 0.0;
1032 rtr_info->mbr.ymin = 0.0;
1033 rtr_info->mbr.ymax = 0.0;
1034 rtr_info->thr = NULL;
1035 rtr_info->heap = NULL;
1036 rtr_info->cursor = NULL;
1037 rtr_info->index = NULL;
1038 rtr_info->need_prdt_lock = false;
1039 rtr_info->need_page_lock = false;
1040 rtr_info->allocated = false;
1041 rtr_info->mbr_adj = false;
1042 rtr_info->fd_del = false;
1043 rtr_info->search_tuple = NULL;
1044 rtr_info->search_mode = PAGE_CUR_UNSUPP;
1045 }
1046
1047 ut_ad(!rtr_info->matches || rtr_info->matches->matched_recs->empty());
1048
1049 rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
1050 rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
1051 rtr_info->need_prdt_lock = need_prdt;
1052 rtr_info->cursor = cursor;
1053 rtr_info->index = index;
1054
1055 mutex_enter(&index->rtr_track->rtr_active_mutex);
1056 index->rtr_track->rtr_active->push_back(rtr_info);
1057 mutex_exit(&index->rtr_track->rtr_active_mutex);
1058 }
1059
1060 /**************************************************************//**
1061 Clean up R-Tree search structure */
1062 void
rtr_clean_rtr_info(rtr_info_t * rtr_info,bool free_all)1063 rtr_clean_rtr_info(
1064 /*===============*/
1065 rtr_info_t* rtr_info, /*!< in: RTree search info */
1066 bool free_all) /*!< in: need to free rtr_info itself */
1067 {
1068 dict_index_t* index;
1069 bool initialized = false;
1070
1071 if (!rtr_info) {
1072 return;
1073 }
1074
1075 index = rtr_info->index;
1076
1077 if (index) {
1078 mutex_enter(&index->rtr_track->rtr_active_mutex);
1079 }
1080
1081 while (rtr_info->parent_path && !rtr_info->parent_path->empty()) {
1082 btr_pcur_t* cur = rtr_info->parent_path->back().cursor;
1083 rtr_info->parent_path->pop_back();
1084
1085 if (cur) {
1086 btr_pcur_close(cur);
1087 ut_free(cur);
1088 }
1089 }
1090
1091 UT_DELETE(rtr_info->parent_path);
1092 rtr_info->parent_path = NULL;
1093
1094 if (rtr_info->path != NULL) {
1095 UT_DELETE(rtr_info->path);
1096 rtr_info->path = NULL;
1097 initialized = true;
1098 }
1099
1100 if (rtr_info->matches) {
1101 rtr_info->matches->used = false;
1102 rtr_info->matches->locked = false;
1103 rtr_info->matches->valid = false;
1104 rtr_info->matches->matched_recs->clear();
1105 }
1106
1107 if (index) {
1108 index->rtr_track->rtr_active->remove(rtr_info);
1109 mutex_exit(&index->rtr_track->rtr_active_mutex);
1110 }
1111
1112 if (free_all) {
1113 if (rtr_info->matches) {
1114 if (rtr_info->matches->matched_recs != NULL) {
1115 UT_DELETE(rtr_info->matches->matched_recs);
1116 }
1117
1118 rw_lock_free(&(rtr_info->matches->block.lock));
1119
1120 mutex_destroy(&rtr_info->matches->rtr_match_mutex);
1121 }
1122
1123 if (rtr_info->heap) {
1124 mem_heap_free(rtr_info->heap);
1125 }
1126
1127 if (initialized) {
1128 mutex_destroy(&rtr_info->rtr_path_mutex);
1129 }
1130
1131 if (rtr_info->allocated) {
1132 ut_free(rtr_info);
1133 }
1134 }
1135 }
1136
1137 /**************************************************************//**
1138 Rebuilt the "path" to exclude the removing page no */
1139 static
1140 void
rtr_rebuild_path(rtr_info_t * rtr_info,ulint page_no)1141 rtr_rebuild_path(
1142 /*=============*/
1143 rtr_info_t* rtr_info, /*!< in: RTree search info */
1144 ulint page_no) /*!< in: need to free rtr_info itself */
1145 {
1146 rtr_node_path_t* new_path
1147 = UT_NEW_NOKEY(rtr_node_path_t());
1148
1149 rtr_node_path_t::iterator rit;
1150 #ifdef UNIV_DEBUG
1151 ulint before_size = rtr_info->path->size();
1152 #endif /* UNIV_DEBUG */
1153
1154 for (rit = rtr_info->path->begin();
1155 rit != rtr_info->path->end(); ++rit) {
1156 node_visit_t next_rec = *rit;
1157
1158 if (next_rec.page_no == page_no) {
1159 continue;
1160 }
1161
1162 new_path->push_back(next_rec);
1163 #ifdef UNIV_DEBUG
1164 node_visit_t rec = new_path->back();
1165 ut_ad(rec.level < rtr_info->cursor->tree_height
1166 && rec.page_no > 0);
1167 #endif /* UNIV_DEBUG */
1168 }
1169
1170 UT_DELETE(rtr_info->path);
1171
1172 ut_ad(new_path->size() == before_size - 1);
1173
1174 rtr_info->path = new_path;
1175
1176 if (!rtr_info->parent_path->empty()) {
1177 rtr_node_path_t* new_parent_path = UT_NEW_NOKEY(
1178 rtr_node_path_t());
1179
1180 for (rit = rtr_info->parent_path->begin();
1181 rit != rtr_info->parent_path->end(); ++rit) {
1182 node_visit_t next_rec = *rit;
1183
1184 if (next_rec.child_no == page_no) {
1185 btr_pcur_t* cur = next_rec.cursor;
1186
1187 if (cur) {
1188 btr_pcur_close(cur);
1189 ut_free(cur);
1190 }
1191
1192 continue;
1193 }
1194
1195 new_parent_path->push_back(next_rec);
1196 }
1197 UT_DELETE(rtr_info->parent_path);
1198 rtr_info->parent_path = new_parent_path;
1199 }
1200
1201 }
1202
1203 /**************************************************************//**
1204 Check whether a discarding page is in anyone's search path */
1205 void
rtr_check_discard_page(dict_index_t * index,btr_cur_t * cursor,buf_block_t * block)1206 rtr_check_discard_page(
1207 /*===================*/
1208 dict_index_t* index, /*!< in: index */
1209 btr_cur_t* cursor, /*!< in: cursor on the page to discard: not on
1210 the root page */
1211 buf_block_t* block) /*!< in: block of page to be discarded */
1212 {
1213 ulint pageno = block->page.id.page_no();
1214 rtr_info_t* rtr_info;
1215 rtr_info_active::iterator it;
1216
1217 mutex_enter(&index->rtr_track->rtr_active_mutex);
1218
1219 for (it = index->rtr_track->rtr_active->begin();
1220 it != index->rtr_track->rtr_active->end(); ++it) {
1221 rtr_info = *it;
1222 rtr_node_path_t::iterator rit;
1223 bool found = false;
1224
1225 if (cursor && rtr_info == cursor->rtr_info) {
1226 continue;
1227 }
1228
1229 mutex_enter(&rtr_info->rtr_path_mutex);
1230 for (rit = rtr_info->path->begin();
1231 rit != rtr_info->path->end(); ++rit) {
1232 node_visit_t node = *rit;
1233
1234 if (node.page_no == pageno) {
1235 found = true;
1236 break;
1237 }
1238 }
1239
1240 if (found) {
1241 rtr_rebuild_path(rtr_info, pageno);
1242 }
1243 mutex_exit(&rtr_info->rtr_path_mutex);
1244
1245 if (rtr_info->matches) {
1246 mutex_enter(&rtr_info->matches->rtr_match_mutex);
1247
1248 if ((&rtr_info->matches->block)->page.id.page_no()
1249 == pageno) {
1250 if (!rtr_info->matches->matched_recs->empty()) {
1251 rtr_info->matches->matched_recs->clear();
1252 }
1253 ut_ad(rtr_info->matches->matched_recs->empty());
1254 rtr_info->matches->valid = false;
1255 }
1256
1257 mutex_exit(&rtr_info->matches->rtr_match_mutex);
1258 }
1259 }
1260
1261 mutex_exit(&index->rtr_track->rtr_active_mutex);
1262
1263 lock_mutex_enter();
1264 lock_prdt_page_free_from_discard(block, lock_sys->prdt_hash);
1265 lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
1266 lock_mutex_exit();
1267 }
1268 /** This is a backported version of a lambda expression:
1269 [&](buf_block_t *hint) {
1270 return hint != nullptr && buf_page_optimistic_get(...hint...);
1271 }
1272 for compilers which do not support lambda expressions, nor passing local types
1273 as template arguments. */
1274 struct Buf_page_optimistic_get_functor_t{
1275 btr_pcur_t * &r_cursor;
1276 const char * &file;
1277 ulint &line;
1278 mtr_t * &mtr;
operator ()Buf_page_optimistic_get_functor_t1279 bool operator()(buf_block_t *hint) const {
1280 return hint != NULL && buf_page_optimistic_get(
1281 RW_X_LATCH, hint, r_cursor->modify_clock, file, line, mtr
1282 );
1283 }
1284 };
1285 /**************************************************************//**
1286 Restores the stored position of a persistent cursor bufferfixing the page */
1287 bool
rtr_cur_restore_position_func(ulint latch_mode,btr_cur_t * btr_cur,ulint level,const char * file,ulint line,mtr_t * mtr)1288 rtr_cur_restore_position_func(
1289 /*==========================*/
1290 ulint latch_mode, /*!< in: BTR_CONT_MODIFY_TREE, ... */
1291 btr_cur_t* btr_cur, /*!< in: detached persistent cursor */
1292 ulint level, /*!< in: index level */
1293 const char* file, /*!< in: file name */
1294 ulint line, /*!< in: line where called */
1295 mtr_t* mtr) /*!< in: mtr */
1296 {
1297 dict_index_t* index;
1298 mem_heap_t* heap;
1299 btr_pcur_t* r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
1300 dtuple_t* tuple;
1301 bool ret = false;
1302
1303 ut_ad(mtr);
1304 ut_ad(r_cursor);
1305 ut_ad(mtr->is_active());
1306
1307 index = btr_cur_get_index(btr_cur);
1308
1309 if (r_cursor->rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE
1310 || r_cursor->rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE) {
1311 return(false);
1312 }
1313
1314 DBUG_EXECUTE_IF(
1315 "rtr_pessimistic_position",
1316 r_cursor->modify_clock = 100;
1317 );
1318
1319 ut_ad(latch_mode == BTR_CONT_MODIFY_TREE);
1320 Buf_page_optimistic_get_functor_t functor={r_cursor,file,line,mtr};
1321 if (r_cursor->block_when_stored.run_with_hint(functor)){
1322 ut_ad(r_cursor->pos_state == BTR_PCUR_IS_POSITIONED);
1323
1324 ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
1325 #ifdef UNIV_DEBUG
1326 do {
1327 const rec_t* rec;
1328 const ulint* offsets1;
1329 const ulint* offsets2;
1330 ulint comp;
1331
1332 rec = btr_pcur_get_rec(r_cursor);
1333
1334 heap = mem_heap_create(256);
1335 offsets1 = rec_get_offsets(
1336 r_cursor->old_rec, index, NULL,
1337 r_cursor->old_n_fields, &heap);
1338 offsets2 = rec_get_offsets(
1339 rec, index, NULL,
1340 r_cursor->old_n_fields, &heap);
1341
1342 comp = rec_offs_comp(offsets1);
1343
1344 if (rec_get_info_bits(r_cursor->old_rec, comp)
1345 & REC_INFO_MIN_REC_FLAG) {
1346 ut_ad(rec_get_info_bits(rec, comp)
1347 & REC_INFO_MIN_REC_FLAG);
1348 } else {
1349
1350 ut_ad(!cmp_rec_rec(r_cursor->old_rec,
1351 rec, offsets1, offsets2,
1352 index,page_is_spatial_non_leaf(rec, index)));
1353 }
1354
1355 mem_heap_free(heap);
1356 } while (0);
1357 #endif /* UNIV_DEBUG */
1358
1359 return(true);
1360 }
1361
1362 /* Page has changed, for R-Tree, the page cannot be shrunk away,
1363 so we search the page and its right siblings */
1364 buf_block_t* block;
1365 node_seq_t page_ssn;
1366 const page_t* page;
1367 page_cur_t* page_cursor;
1368 node_visit_t* node = rtr_get_parent_node(btr_cur, level, false);
1369 ulint space = dict_index_get_space(index);
1370 node_seq_t path_ssn = node->seq_no;
1371 page_size_t page_size = dict_table_page_size(index->table);
1372
1373 ulint page_no = node->page_no;
1374
1375 heap = mem_heap_create(256);
1376
1377 tuple = dict_index_build_data_tuple(index, r_cursor->old_rec,
1378 r_cursor->old_n_fields, heap);
1379
1380 page_cursor = btr_pcur_get_page_cur(r_cursor);
1381 ut_ad(r_cursor == node->cursor);
1382
1383 search_again:
1384 page_id_t page_id(space, page_no);
1385
1386 block = buf_page_get_gen(
1387 page_id, page_size, RW_X_LATCH, NULL,
1388 BUF_GET, __FILE__, __LINE__, mtr);
1389
1390 ut_ad(block);
1391
1392 /* Get the page SSN */
1393 page = buf_block_get_frame(block);
1394 page_ssn = page_get_ssn_id(page);
1395
1396 ulint low_match = page_cur_search(
1397 block, index, tuple, PAGE_CUR_LE, page_cursor);
1398
1399 if (low_match == r_cursor->old_n_fields) {
1400 const rec_t* rec;
1401 const ulint* offsets1;
1402 const ulint* offsets2;
1403 ulint comp;
1404
1405 rec = btr_pcur_get_rec(r_cursor);
1406
1407 offsets1 = rec_get_offsets(
1408 r_cursor->old_rec, index, NULL,
1409 r_cursor->old_n_fields, &heap);
1410 offsets2 = rec_get_offsets(
1411 rec, index, NULL,
1412 r_cursor->old_n_fields, &heap);
1413
1414 comp = rec_offs_comp(offsets1);
1415
1416 if ((rec_get_info_bits(r_cursor->old_rec, comp)
1417 & REC_INFO_MIN_REC_FLAG)
1418 && (rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG)) {
1419 r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1420 ret = true;
1421 } else if (!cmp_rec_rec(r_cursor->old_rec, rec, offsets1, offsets2,
1422 index,page_is_spatial_non_leaf(rec, index))) {
1423 r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1424 ret = true;
1425 }
1426 }
1427
1428 /* Check the page SSN to see if it has been splitted, if so, search
1429 the right page */
1430 if (!ret && page_ssn > path_ssn) {
1431 page_no = btr_page_get_next(page, mtr);
1432 goto search_again;
1433 }
1434
1435 mem_heap_free(heap);
1436
1437 return(ret);
1438 }
1439
1440 /****************************************************************//**
1441 Copy the leaf level R-tree record, and push it to matched_rec in rtr_info */
1442 static
1443 void
rtr_leaf_push_match_rec(const rec_t * rec,rtr_info_t * rtr_info,ulint * offsets,bool is_comp)1444 rtr_leaf_push_match_rec(
1445 /*====================*/
1446 const rec_t* rec, /*!< in: record to copy */
1447 rtr_info_t* rtr_info, /*!< in/out: search stack */
1448 ulint* offsets, /*!< in: offsets */
1449 bool is_comp) /*!< in: is compact format */
1450 {
1451 byte* buf;
1452 matched_rec_t* match_rec = rtr_info->matches;
1453 rec_t* copy;
1454 ulint data_len;
1455 rtr_rec_t rtr_rec;
1456
1457 buf = match_rec->block.frame + match_rec->used;
1458
1459 copy = rec_copy(buf, rec, offsets);
1460
1461 if (is_comp) {
1462 rec_set_next_offs_new(copy, PAGE_NEW_SUPREMUM);
1463 } else {
1464 rec_set_next_offs_old(copy, PAGE_OLD_SUPREMUM);
1465 }
1466
1467 rtr_rec.r_rec = copy;
1468 rtr_rec.locked = false;
1469
1470 match_rec->matched_recs->push_back(rtr_rec);
1471 match_rec->valid = true;
1472
1473 data_len = rec_offs_data_size(offsets) + rec_offs_extra_size(offsets);
1474 match_rec->used += data_len;
1475
1476 ut_ad(match_rec->used < UNIV_PAGE_SIZE);
1477 }
1478
1479 /**************************************************************//**
1480 Store the parent path cursor
1481 @return number of cursor stored */
1482 ulint
rtr_store_parent_path(const buf_block_t * block,btr_cur_t * btr_cur,ulint latch_mode,ulint level,mtr_t * mtr)1483 rtr_store_parent_path(
1484 /*==================*/
1485 const buf_block_t* block, /*!< in: block of the page */
1486 btr_cur_t* btr_cur,/*!< in/out: persistent cursor */
1487 ulint latch_mode,
1488 /*!< in: latch_mode */
1489 ulint level, /*!< in: index level */
1490 mtr_t* mtr) /*!< in: mtr */
1491 {
1492 ulint num = btr_cur->rtr_info->parent_path->size();
1493 ulint num_stored = 0;
1494
1495 while (num >= 1) {
1496 node_visit_t* node = &(*btr_cur->rtr_info->parent_path)[
1497 num - 1];
1498 btr_pcur_t* r_cursor = node->cursor;
1499 buf_block_t* cur_block;
1500
1501 if (node->level > level) {
1502 break;
1503 }
1504
1505 r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1506 r_cursor->latch_mode = latch_mode;
1507
1508 cur_block = btr_pcur_get_block(r_cursor);
1509
1510 if (cur_block == block) {
1511 btr_pcur_store_position(r_cursor, mtr);
1512 num_stored++;
1513 } else {
1514 break;
1515 }
1516
1517 num--;
1518 }
1519
1520 return(num_stored);
1521 }
1522 /**************************************************************//**
1523 push a nonleaf index node to the search path for insertion */
1524 static
1525 void
rtr_non_leaf_insert_stack_push(dict_index_t * index,rtr_node_path_t * path,ulint level,ulint child_no,const buf_block_t * block,const rec_t * rec,double mbr_inc)1526 rtr_non_leaf_insert_stack_push(
1527 /*===========================*/
1528 dict_index_t* index, /*!< in: index descriptor */
1529 rtr_node_path_t* path, /*!< in/out: search path */
1530 ulint level, /*!< in: index page level */
1531 ulint child_no,/*!< in: child page no */
1532 const buf_block_t* block, /*!< in: block of the page */
1533 const rec_t* rec, /*!< in: positioned record */
1534 double mbr_inc)/*!< in: MBR needs to be enlarged */
1535 {
1536 node_seq_t new_seq;
1537 btr_pcur_t* my_cursor;
1538 ulint page_no = block->page.id.page_no();
1539
1540 my_cursor = static_cast<btr_pcur_t*>(
1541 ut_malloc_nokey(sizeof(*my_cursor)));
1542
1543 btr_pcur_init(my_cursor);
1544
1545 page_cur_position(rec, block, btr_pcur_get_page_cur(my_cursor));
1546
1547 (btr_pcur_get_btr_cur(my_cursor))->index = index;
1548
1549 new_seq = rtr_get_current_ssn_id(index);
1550 rtr_non_leaf_stack_push(path, page_no, new_seq, level, child_no,
1551 my_cursor, mbr_inc);
1552 }
1553
1554 /** Copy a buf_block_t strcuture, except "block->lock" and "block->mutex".
1555 @param[in,out] matches copy to match->block
1556 @param[in] block block to copy */
1557 static
1558 void
rtr_copy_buf(matched_rec_t * matches,const buf_block_t * block)1559 rtr_copy_buf(
1560 matched_rec_t* matches,
1561 const buf_block_t* block)
1562 {
1563 /* Copy all members of "block" to "matches->block" except "mutex"
1564 and "lock". We skip "mutex" and "lock" because they are not used
1565 from the dummy buf_block_t we create here and because memcpy()ing
1566 them generates (valid) compiler warnings that the vtable pointer
1567 will be copied. It is also undefined what will happen with the
1568 newly memcpy()ed mutex if the source mutex was acquired by
1569 (another) thread while it was copied. */
1570 new (&matches->block.page) buf_page_t(block->page);
1571 matches->block.frame = block->frame;
1572 #ifndef UNIV_HOTBACKUP
1573 matches->block.unzip_LRU = block->unzip_LRU;
1574
1575 ut_d(matches->block.in_unzip_LRU_list = block->in_unzip_LRU_list);
1576 ut_d(matches->block.in_withdraw_list = block->in_withdraw_list);
1577
1578 /* Skip buf_block_t::mutex */
1579 /* Skip buf_block_t::lock */
1580 matches->block.lock_hash_val = block->lock_hash_val;
1581 matches->block.modify_clock = block->modify_clock;
1582 matches->block.n_hash_helps = block->n_hash_helps;
1583 matches->block.n_fields = block->n_fields;
1584 matches->block.left_side = block->left_side;
1585 #if defined UNIV_AHI_DEBUG || defined UNIV_DEBUG
1586 matches->block.n_pointers = block->n_pointers;
1587 #endif /* UNIV_AHI_DEBUG || UNIV_DEBUG */
1588 matches->block.curr_n_fields = block->curr_n_fields;
1589 matches->block.curr_left_side = block->curr_left_side;
1590 matches->block.index = block->index;
1591 matches->block.made_dirty_with_no_latch
1592 = block->made_dirty_with_no_latch;
1593
1594 ut_d(matches->block.debug_latch = block->debug_latch);
1595
1596 #endif /* !UNIV_HOTBACKUP */
1597 }
1598
1599 /****************************************************************//**
1600 Generate a shadow copy of the page block header to save the
1601 matched records */
1602 static
1603 void
rtr_init_match(matched_rec_t * matches,const buf_block_t * block,const page_t * page)1604 rtr_init_match(
1605 /*===========*/
1606 matched_rec_t* matches,/*!< in/out: match to initialize */
1607 const buf_block_t* block, /*!< in: buffer block */
1608 const page_t* page) /*!< in: buffer page */
1609 {
1610 ut_ad(matches->matched_recs->empty());
1611 matches->locked = false;
1612 rtr_copy_buf(matches, block);
1613 matches->block.frame = matches->bufp;
1614 matches->valid = false;
1615 /* We have to copy PAGE_W*_SUPREMUM_END bytes so that we can
1616 use infimum/supremum of this page as normal btr page for search. */
1617 memcpy(matches->block.frame, page, page_is_comp(page)
1618 ? PAGE_NEW_SUPREMUM_END
1619 : PAGE_OLD_SUPREMUM_END);
1620 matches->used = page_is_comp(page)
1621 ? PAGE_NEW_SUPREMUM_END
1622 : PAGE_OLD_SUPREMUM_END;
1623 #ifdef RTR_SEARCH_DIAGNOSTIC
1624 ulint pageno = page_get_page_no(page);
1625 fprintf(stderr, "INNODB_RTR: Searching leaf page %d\n",
1626 static_cast<int>(pageno));
1627 #endif /* RTR_SEARCH_DIAGNOSTIC */
1628 }
1629
1630 /****************************************************************//**
1631 Get the bounding box content from an index record */
1632 void
rtr_get_mbr_from_rec(const rec_t * rec,const ulint * offsets,rtr_mbr_t * mbr)1633 rtr_get_mbr_from_rec(
1634 /*=================*/
1635 const rec_t* rec, /*!< in: data tuple */
1636 const ulint* offsets,/*!< in: offsets array */
1637 rtr_mbr_t* mbr) /*!< out MBR */
1638 {
1639 ulint rec_f_len;
1640 const byte* data;
1641
1642 data = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1643
1644 rtr_read_mbr(data, mbr);
1645 }
1646
1647 /****************************************************************//**
1648 Get the bounding box content from a MBR data record */
1649 void
rtr_get_mbr_from_tuple(const dtuple_t * dtuple,rtr_mbr * mbr)1650 rtr_get_mbr_from_tuple(
1651 /*===================*/
1652 const dtuple_t* dtuple, /*!< in: data tuple */
1653 rtr_mbr* mbr) /*!< out: mbr to fill */
1654 {
1655 const dfield_t* dtuple_field;
1656 ulint dtuple_f_len;
1657 byte* data;
1658
1659 dtuple_field = dtuple_get_nth_field(dtuple, 0);
1660 dtuple_f_len = dfield_get_len(dtuple_field);
1661 ut_a(dtuple_f_len >= 4 * sizeof(double));
1662
1663 data = static_cast<byte*>(dfield_get_data(dtuple_field));
1664
1665 rtr_read_mbr(data, mbr);
1666 }
1667
1668 /****************************************************************//**
1669 Searches the right position in rtree for a page cursor. */
1670 bool
rtr_cur_search_with_match(const buf_block_t * block,dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,page_cur_t * cursor,rtr_info_t * rtr_info)1671 rtr_cur_search_with_match(
1672 /*======================*/
1673 const buf_block_t* block, /*!< in: buffer block */
1674 dict_index_t* index, /*!< in: index descriptor */
1675 const dtuple_t* tuple, /*!< in: data tuple */
1676 page_cur_mode_t mode, /*!< in: PAGE_CUR_RTREE_INSERT,
1677 PAGE_CUR_RTREE_LOCATE etc. */
1678 page_cur_t* cursor, /*!< in/out: page cursor */
1679 rtr_info_t* rtr_info)/*!< in/out: search stack */
1680 {
1681 bool found = false;
1682 const page_t* page;
1683 const rec_t* rec;
1684 const rec_t* last_rec;
1685 ulint offsets_[REC_OFFS_NORMAL_SIZE];
1686 ulint* offsets = offsets_;
1687 mem_heap_t* heap = NULL;
1688 int cmp = 1;
1689 bool is_leaf;
1690 double least_inc = DBL_MAX;
1691 const rec_t* best_rec;
1692 const rec_t* last_match_rec = NULL;
1693 ulint level;
1694 bool match_init = false;
1695 ulint space = block->page.id.space();
1696 page_cur_mode_t orig_mode = mode;
1697 const rec_t* first_rec = NULL;
1698
1699 rec_offs_init(offsets_);
1700
1701 ut_ad(RTREE_SEARCH_MODE(mode));
1702
1703 ut_ad(dict_index_is_spatial(index));
1704
1705 page = buf_block_get_frame(block);
1706
1707 is_leaf = page_is_leaf(page);
1708 level = btr_page_get_level(page, mtr);
1709
1710 if (mode == PAGE_CUR_RTREE_LOCATE) {
1711 ut_ad(level != 0);
1712 mode = PAGE_CUR_WITHIN;
1713 }
1714
1715 rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0));
1716
1717 last_rec = rec;
1718 best_rec = rec;
1719
1720 if (page_rec_is_infimum(rec)) {
1721 rec = page_rec_get_next_const(rec);
1722 }
1723
1724 /* Check insert tuple size is larger than first rec, and try to
1725 avoid it if possible */
1726 if (mode == PAGE_CUR_RTREE_INSERT && !page_rec_is_supremum(rec)) {
1727
1728 ulint new_rec_size = rec_get_converted_size(index, tuple, 0);
1729
1730 offsets = rec_get_offsets(rec, index, offsets,
1731 dtuple_get_n_fields_cmp(tuple),
1732 &heap);
1733
1734 if (rec_offs_size(offsets) < new_rec_size) {
1735 first_rec = rec;
1736 }
1737
1738 /* If this is the left-most page of this index level
1739 and the table is a compressed table, try to avoid
1740 first page as much as possible, as there will be problem
1741 when update MIN_REC rec in compress table */
1742 if (buf_block_get_page_zip(block)
1743 && mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL
1744 && page_get_n_recs(page) >= 2) {
1745
1746 rec = page_rec_get_next_const(rec);
1747 }
1748 }
1749
1750 while (!page_rec_is_supremum(rec)) {
1751 offsets = rec_get_offsets(rec, index, offsets,
1752 dtuple_get_n_fields_cmp(tuple),
1753 &heap);
1754 if (!is_leaf) {
1755 switch (mode) {
1756 case PAGE_CUR_CONTAIN:
1757 case PAGE_CUR_INTERSECT:
1758 case PAGE_CUR_MBR_EQUAL:
1759 /* At non-leaf level, we will need to check
1760 both CONTAIN and INTERSECT for either of
1761 the search mode */
1762 cmp = cmp_dtuple_rec_with_gis(
1763 tuple, rec, offsets, PAGE_CUR_CONTAIN);
1764
1765 if (cmp != 0) {
1766 cmp = cmp_dtuple_rec_with_gis(
1767 tuple, rec, offsets,
1768 PAGE_CUR_INTERSECT);
1769 }
1770 break;
1771 case PAGE_CUR_DISJOINT:
1772 cmp = cmp_dtuple_rec_with_gis(
1773 tuple, rec, offsets, mode);
1774
1775 if (cmp != 0) {
1776 cmp = cmp_dtuple_rec_with_gis(
1777 tuple, rec, offsets,
1778 PAGE_CUR_INTERSECT);
1779 }
1780 break;
1781 case PAGE_CUR_RTREE_INSERT:
1782 double increase;
1783 double area;
1784
1785 cmp = cmp_dtuple_rec_with_gis(
1786 tuple, rec, offsets, PAGE_CUR_WITHIN);
1787
1788 if (cmp != 0) {
1789 increase = rtr_rec_cal_increase(
1790 tuple, rec, offsets, &area);
1791 /* Once it goes beyond DBL_MAX,
1792 it would not make sense to record
1793 such value, just make it
1794 DBL_MAX / 2 */
1795 if (increase >= DBL_MAX) {
1796 increase = DBL_MAX / 2;
1797 }
1798
1799 if (increase < least_inc) {
1800 least_inc = increase;
1801 best_rec = rec;
1802 } else if (best_rec
1803 && best_rec == first_rec) {
1804 /* if first_rec is set,
1805 we will try to avoid it */
1806 least_inc = increase;
1807 best_rec = rec;
1808 }
1809 }
1810 break;
1811 case PAGE_CUR_RTREE_GET_FATHER:
1812 cmp = cmp_dtuple_rec_with_gis_internal(
1813 tuple, rec, offsets);
1814 break;
1815 default:
1816 /* WITHIN etc. */
1817 cmp = cmp_dtuple_rec_with_gis(
1818 tuple, rec, offsets, mode);
1819 }
1820 } else {
1821 /* At leaf level, INSERT should translate to LE */
1822 ut_ad(mode != PAGE_CUR_RTREE_INSERT);
1823
1824 cmp = cmp_dtuple_rec_with_gis(
1825 tuple, rec, offsets, mode);
1826 }
1827
1828 if (cmp == 0) {
1829 found = true;
1830
1831 /* If located, the matching node/rec will be pushed
1832 to rtr_info->path for non-leaf nodes, or
1833 rtr_info->matches for leaf nodes */
1834 if (rtr_info && mode != PAGE_CUR_RTREE_INSERT) {
1835 if (!is_leaf) {
1836 ulint page_no;
1837 node_seq_t new_seq;
1838 bool is_loc;
1839
1840 is_loc = (orig_mode
1841 == PAGE_CUR_RTREE_LOCATE
1842 || orig_mode
1843 == PAGE_CUR_RTREE_GET_FATHER);
1844
1845 offsets = rec_get_offsets(
1846 rec, index, offsets,
1847 ULINT_UNDEFINED, &heap);
1848
1849 page_no = btr_node_ptr_get_child_page_no(
1850 rec, offsets);
1851
1852 ut_ad(level >= 1);
1853
1854 /* Get current SSN, before we insert
1855 it into the path stack */
1856 new_seq = rtr_get_current_ssn_id(index);
1857
1858 rtr_non_leaf_stack_push(
1859 rtr_info->path,
1860 page_no,
1861 new_seq, level - 1, 0,
1862 NULL, 0);
1863
1864 if (is_loc) {
1865 rtr_non_leaf_insert_stack_push(
1866 index,
1867 rtr_info->parent_path,
1868 level, page_no, block,
1869 rec, 0);
1870 }
1871
1872 if (!srv_read_only_mode
1873 && (rtr_info->need_page_lock
1874 || !is_loc)) {
1875
1876 /* Lock the page, preventing it
1877 from being shrunk */
1878 lock_place_prdt_page_lock(
1879 space, page_no, index,
1880 rtr_info->thr);
1881 }
1882 } else {
1883 ut_ad(orig_mode
1884 != PAGE_CUR_RTREE_LOCATE);
1885
1886 if (!match_init) {
1887 rtr_init_match(
1888 rtr_info->matches,
1889 block, page);
1890 match_init = true;
1891 }
1892
1893 /* Collect matched records on page */
1894 offsets = rec_get_offsets(
1895 rec, index, offsets,
1896 ULINT_UNDEFINED, &heap);
1897 rtr_leaf_push_match_rec(
1898 rec, rtr_info, offsets,
1899 page_is_comp(page));
1900 }
1901
1902 last_match_rec = rec;
1903 } else {
1904 /* This is the insertion case, it will break
1905 once it finds the first MBR that can accomodate
1906 the inserting rec */
1907 break;
1908 }
1909 }
1910
1911 last_rec = rec;
1912
1913 rec = page_rec_get_next_const(rec);
1914 }
1915
1916 /* All records on page are searched */
1917 if (page_rec_is_supremum(rec)) {
1918 if (!is_leaf) {
1919 if (!found) {
1920 /* No match case, if it is for insertion,
1921 then we select the record that result in
1922 least increased area */
1923 if (mode == PAGE_CUR_RTREE_INSERT) {
1924 ulint child_no;
1925 ut_ad(least_inc < DBL_MAX);
1926 offsets = rec_get_offsets(
1927 best_rec, index,
1928 offsets, ULINT_UNDEFINED,
1929 &heap);
1930 child_no =
1931 btr_node_ptr_get_child_page_no(
1932 best_rec, offsets);
1933
1934 rtr_non_leaf_insert_stack_push(
1935 index, rtr_info->parent_path,
1936 level, child_no, block,
1937 best_rec, least_inc);
1938
1939 page_cur_position(best_rec, block,
1940 cursor);
1941 rtr_info->mbr_adj = true;
1942 } else {
1943 /* Position at the last rec of the
1944 page, if it is not the leaf page */
1945 page_cur_position(last_rec, block,
1946 cursor);
1947 }
1948 } else {
1949 /* There are matching records, position
1950 in the last matching records */
1951 if (rtr_info) {
1952 rec = last_match_rec;
1953 page_cur_position(
1954 rec, block, cursor);
1955 }
1956 }
1957 } else if (rtr_info) {
1958 /* Leaf level, no match, position at the
1959 last (supremum) rec */
1960 if (!last_match_rec) {
1961 page_cur_position(rec, block, cursor);
1962 goto func_exit;
1963 }
1964
1965 /* There are matched records */
1966 matched_rec_t* match_rec = rtr_info->matches;
1967
1968 rtr_rec_t test_rec;
1969
1970 test_rec = match_rec->matched_recs->back();
1971 #ifdef UNIV_DEBUG
1972 ulint offsets_2[REC_OFFS_NORMAL_SIZE];
1973 ulint* offsets2 = offsets_2;
1974 rec_offs_init(offsets_2);
1975
1976 ut_ad(found);
1977
1978 /* Verify the record to be positioned is the same
1979 as the last record in matched_rec vector */
1980 offsets2 = rec_get_offsets(test_rec.r_rec, index,
1981 offsets2, ULINT_UNDEFINED,
1982 &heap);
1983
1984 offsets = rec_get_offsets(last_match_rec, index,
1985 offsets, ULINT_UNDEFINED,
1986 &heap);
1987
1988 ut_ad(cmp_rec_rec(test_rec.r_rec, last_match_rec,
1989 offsets2, offsets, index, false) == 0);
1990 #endif /* UNIV_DEBUG */
1991 /* Pop the last match record and position on it */
1992 match_rec->matched_recs->pop_back();
1993 page_cur_position(test_rec.r_rec, &match_rec->block,
1994 cursor);
1995 }
1996 } else {
1997
1998 if (mode == PAGE_CUR_RTREE_INSERT) {
1999 ulint child_no;
2000 ut_ad(!last_match_rec && rec);
2001
2002 offsets = rec_get_offsets(
2003 rec, index, offsets, ULINT_UNDEFINED, &heap);
2004
2005 child_no = btr_node_ptr_get_child_page_no(rec, offsets);
2006
2007 rtr_non_leaf_insert_stack_push(
2008 index, rtr_info->parent_path, level, child_no,
2009 block, rec, 0);
2010
2011 } else if (rtr_info && found && !is_leaf) {
2012 rec = last_match_rec;
2013 }
2014
2015 page_cur_position(rec, block, cursor);
2016 }
2017
2018 #ifdef UNIV_DEBUG
2019 /* Verify that we are positioned at the same child page as pushed in
2020 the path stack */
2021 if (!is_leaf && (!page_rec_is_supremum(rec) || found)
2022 && mode != PAGE_CUR_RTREE_INSERT) {
2023 ulint page_no;
2024
2025 offsets = rec_get_offsets(rec, index, offsets,
2026 ULINT_UNDEFINED, &heap);
2027 page_no = btr_node_ptr_get_child_page_no(rec, offsets);
2028
2029 if (rtr_info && found) {
2030 rtr_node_path_t* path = rtr_info->path;
2031 node_visit_t last_visit = path->back();
2032
2033 ut_ad(last_visit.page_no == page_no);
2034 }
2035 }
2036 #endif /* UNIV_DEBUG */
2037
2038 func_exit:
2039 if (UNIV_LIKELY_NULL(heap)) {
2040 mem_heap_free(heap);
2041 }
2042
2043 return(found);
2044 }
2045