1 /*****************************************************************************
2 
3 Copyright (c) 2017, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file gis/gis0sea.cc
29 InnoDB R-tree search interfaces
30 
31 Created 2014/01/16 Jimmy Yang
32 ***********************************************************************/
33 
34 #include "fsp0fsp.h"
35 #include "page0page.h"
36 #include "page0cur.h"
37 #include "page0zip.h"
38 #include "gis0rtree.h"
39 
40 #ifndef UNIV_HOTBACKUP
41 #include "btr0cur.h"
42 #include "btr0sea.h"
43 #include "btr0pcur.h"
44 #include "rem0cmp.h"
45 #include "lock0lock.h"
46 #include "ibuf0ibuf.h"
47 #include "trx0trx.h"
48 #include "srv0mon.h"
49 #include "gis0geo.h"
50 #include "sync0sync.h"
51 
52 #endif /* UNIV_HOTBACKUP */
53 
54 /*************************************************************//**
55 Pop out used parent path entry, until we find the parent with matching
56 page number */
57 static
58 void
rtr_adjust_parent_path(rtr_info_t * rtr_info,ulint page_no)59 rtr_adjust_parent_path(
60 /*===================*/
61 	rtr_info_t*	rtr_info,	/* R-Tree info struct */
62 	ulint		page_no)	/* page number to look for */
63 {
64 	while (!rtr_info->parent_path->empty()) {
65 		if (rtr_info->parent_path->back().child_no == page_no) {
66 			break;
67 		} else {
68 			if (rtr_info->parent_path->back().cursor) {
69 				btr_pcur_close(
70 					rtr_info->parent_path->back().cursor);
71 				ut_free(rtr_info->parent_path->back().cursor);
72 			}
73 
74 			rtr_info->parent_path->pop_back();
75 		}
76 	}
77 }
78 
79 /*************************************************************//**
80 Find the next matching record. This function is used by search
81 or record locating during index delete/update.
82 @return true if there is suitable record found, otherwise false */
83 static
84 bool
rtr_pcur_getnext_from_path(const dtuple_t * tuple,page_cur_mode_t mode,btr_cur_t * btr_cur,ulint target_level,ulint latch_mode,bool index_locked,mtr_t * mtr)85 rtr_pcur_getnext_from_path(
86 /*=======================*/
87 	const dtuple_t* tuple,	/*!< in: data tuple */
88 	page_cur_mode_t	mode,	/*!< in: cursor search mode */
89 	btr_cur_t*	btr_cur,/*!< in: persistent cursor; NOTE that the
90 				function may release the page latch */
91 	ulint		target_level,
92 				/*!< in: target level */
93 	ulint		latch_mode,
94 				/*!< in: latch_mode */
95 	bool		index_locked,
96 				/*!< in: index tree locked */
97 	mtr_t*		mtr)	/*!< in: mtr */
98 {
99 	dict_index_t*	index = btr_cur->index;
100 	bool		found = false;
101 	ulint		space = dict_index_get_space(index);
102 	page_cur_t*	page_cursor;
103 	ulint		level = 0;
104 	node_visit_t	next_rec;
105 	rtr_info_t*	rtr_info = btr_cur->rtr_info;
106 	node_seq_t	page_ssn;
107 	ulint		my_latch_mode;
108 	ulint		skip_parent = false;
109 	bool		new_split = false;
110 	bool		need_parent;
111 	bool		for_delete = false;
112 	bool		for_undo_ins = false;
113 
114 	/* exhausted all the pages to be searched */
115 	if (rtr_info->path->empty()) {
116 		return(false);
117 	}
118 
119 	ut_ad(dtuple_get_n_fields_cmp(tuple));
120 
121 	my_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
122 
123 	for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
124 	for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
125 
126 	/* There should be no insert coming to this function. Only
127 	mode with BTR_MODIFY_* should be delete */
128 	ut_ad(mode != PAGE_CUR_RTREE_INSERT);
129 	ut_ad(my_latch_mode == BTR_SEARCH_LEAF
130 	      || my_latch_mode == BTR_MODIFY_LEAF
131 	      || my_latch_mode == BTR_MODIFY_TREE
132 	      || my_latch_mode == BTR_CONT_MODIFY_TREE);
133 
134 	/* Whether need to track parent information. Only need so
135 	when we do tree altering operations (such as index page merge) */
136 	need_parent = ((my_latch_mode == BTR_MODIFY_TREE
137 		        || my_latch_mode == BTR_CONT_MODIFY_TREE)
138 		       && mode == PAGE_CUR_RTREE_LOCATE);
139 
140 	if (!index_locked) {
141 		ut_ad(latch_mode & BTR_SEARCH_LEAF
142 		      || latch_mode & BTR_MODIFY_LEAF);
143 		mtr_s_lock(dict_index_get_lock(index), mtr);
144 	} else {
145 		ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
146 					MTR_MEMO_SX_LOCK)
147 		      || mtr_memo_contains(mtr, dict_index_get_lock(index),
148 					MTR_MEMO_S_LOCK)
149 		      || mtr_memo_contains(mtr, dict_index_get_lock(index),
150 					   MTR_MEMO_X_LOCK));
151 	}
152 
153 	const page_size_t&	page_size = dict_table_page_size(index->table);
154 
155 	/* Pop each node/page to be searched from "path" structure
156 	and do a search on it. Please note, any pages that are in
157 	the "path" structure are protected by "page" lock, so tey
158 	cannot be shrunk away */
159 	do {
160 		buf_block_t*	block;
161 		node_seq_t	path_ssn;
162 		const page_t*	page;
163 		ulint		rw_latch = RW_X_LATCH;
164 		ulint		tree_idx;
165 
166 		mutex_enter(&rtr_info->rtr_path_mutex);
167 		next_rec = rtr_info->path->back();
168 		rtr_info->path->pop_back();
169 		level = next_rec.level;
170 		path_ssn = next_rec.seq_no;
171 		tree_idx = btr_cur->tree_height - level - 1;
172 
173 		/* Maintain the parent path info as well, if needed */
174 		if (need_parent && !skip_parent && !new_split) {
175 			ulint		old_level;
176 			ulint		new_level;
177 
178 			ut_ad(!rtr_info->parent_path->empty());
179 
180 			/* Cleanup unused parent info */
181 			if (rtr_info->parent_path->back().cursor) {
182 				btr_pcur_close(
183 					rtr_info->parent_path->back().cursor);
184 				ut_free(rtr_info->parent_path->back().cursor);
185 			}
186 
187 			old_level = rtr_info->parent_path->back().level;
188 
189 			rtr_info->parent_path->pop_back();
190 
191 			ut_ad(!rtr_info->parent_path->empty());
192 
193 			/* check whether there is a level change. If so,
194 			the current parent path needs to pop enough
195 			nodes to adjust to the new search page */
196 			new_level = rtr_info->parent_path->back().level;
197 
198 			if (old_level < new_level) {
199 				rtr_adjust_parent_path(
200 					rtr_info, next_rec.page_no);
201 			}
202 
203 			ut_ad(!rtr_info->parent_path->empty());
204 
205 			ut_ad(next_rec.page_no
206 			      == rtr_info->parent_path->back().child_no);
207 		}
208 
209 		mutex_exit(&rtr_info->rtr_path_mutex);
210 
211 		skip_parent = false;
212 		new_split = false;
213 
214 		/* Once we have pages in "path", these pages are
215 		predicate page locked, so they can't be shrunk away.
216 		They also have SSN (split sequence number) to detect
217 		splits, so we can directly latch single page while
218 		getting them. They can be unlatched if not qualified.
219 		One reason for pre-latch is that we might need to position
220 		some parent position (requires latch) during search */
221 		if (level == 0) {
222 			/* S latched for SEARCH_LEAF, and X latched
223 			for MODIFY_LEAF */
224 			if (my_latch_mode <= BTR_MODIFY_LEAF) {
225 				rw_latch = my_latch_mode;
226 			}
227 
228 			if (my_latch_mode == BTR_CONT_MODIFY_TREE
229 			    || my_latch_mode == BTR_MODIFY_TREE) {
230 				rw_latch = RW_NO_LATCH;
231 			}
232 
233 		} else if (level == target_level) {
234 			rw_latch = RW_X_LATCH;
235 		}
236 
237 		/* Release previous locked blocks */
238 		if (my_latch_mode != BTR_SEARCH_LEAF) {
239 			for (ulint idx = 0; idx < btr_cur->tree_height;
240 			     idx++) {
241 				if (rtr_info->tree_blocks[idx]) {
242 					mtr_release_block_at_savepoint(
243 						mtr,
244 						rtr_info->tree_savepoints[idx],
245 						rtr_info->tree_blocks[idx]);
246 					rtr_info->tree_blocks[idx] = NULL;
247 				}
248 			}
249 			for (ulint idx = RTR_MAX_LEVELS; idx < RTR_MAX_LEVELS + 3;
250 			     idx++) {
251 				if (rtr_info->tree_blocks[idx]) {
252 					mtr_release_block_at_savepoint(
253 						mtr,
254 						rtr_info->tree_savepoints[idx],
255 						rtr_info->tree_blocks[idx]);
256 					rtr_info->tree_blocks[idx] = NULL;
257 				}
258 			}
259 		}
260 
261 		/* set up savepoint to record any locks to be taken */
262 		rtr_info->tree_savepoints[tree_idx] = mtr_set_savepoint(mtr);
263 
264 #ifdef UNIV_RTR_DEBUG
265 		ut_ad(!(rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_X)
266 			||
267 			rw_lock_own(&btr_cur->page_cur.block->lock, RW_LOCK_S))
268 			|| my_latch_mode == BTR_MODIFY_TREE
269 			|| my_latch_mode == BTR_CONT_MODIFY_TREE
270 			|| !page_is_leaf(buf_block_get_frame(
271 					btr_cur->page_cur.block)));
272 #endif /* UNIV_RTR_DEBUG */
273 
274 		page_id_t	page_id(space, next_rec.page_no);
275 
276 		block = buf_page_get_gen(
277 			page_id, page_size,
278 			rw_latch, NULL, BUF_GET, __FILE__, __LINE__, mtr);
279 
280 		if (block == NULL) {
281 			continue;
282 		} else if (rw_latch != RW_NO_LATCH) {
283 			ut_ad(!dict_index_is_ibuf(index));
284 			buf_block_dbg_add_level(block, SYNC_TREE_NODE);
285 		}
286 
287 		rtr_info->tree_blocks[tree_idx] = block;
288 
289 		page = buf_block_get_frame(block);
290 		page_ssn = page_get_ssn_id(page);
291 
292 		/* If there are splits, push the splitted page.
293 		Note that we have SX lock on index->lock, there
294 		should not be any split/shrink happening here */
295 		if (page_ssn > path_ssn) {
296 			ulint next_page_no = btr_page_get_next(page, mtr);
297 			rtr_non_leaf_stack_push(
298 				rtr_info->path, next_page_no, path_ssn,
299 				level, 0, NULL, 0);
300 
301 			if (!srv_read_only_mode
302 			    && mode != PAGE_CUR_RTREE_INSERT
303 			    && mode != PAGE_CUR_RTREE_LOCATE) {
304 				ut_ad(rtr_info->thr);
305 				lock_place_prdt_page_lock(
306 					space, next_page_no, index,
307 					rtr_info->thr);
308 			}
309 			new_split = true;
310 #if UNIV_GIS_DEBUG
311 			fprintf(stderr,
312 				"GIS_DIAG: Splitted page found: %d, %ld\n",
313 				static_cast<int>(need_parent), next_page_no);
314 #endif
315 		}
316 
317 		page_cursor = btr_cur_get_page_cur(btr_cur);
318 		page_cursor->rec = NULL;
319 
320 		if (mode == PAGE_CUR_RTREE_LOCATE) {
321 			if (level == target_level && level == 0) {
322 				ulint	low_match;
323 
324 				found = false;
325 
326 				low_match = page_cur_search(
327 					block, index, tuple,
328 					PAGE_CUR_LE,
329 					btr_cur_get_page_cur(btr_cur));
330 
331 				if (low_match == dtuple_get_n_fields_cmp(
332 							tuple)) {
333 					rec_t*	rec = btr_cur_get_rec(btr_cur);
334 
335 					if (!rec_get_deleted_flag(rec,
336 					    dict_table_is_comp(index->table))
337 					    || (!for_delete && !for_undo_ins)) {
338 						found = true;
339 						btr_cur->low_match = low_match;
340 					} else {
341 						/* mark we found deleted row */
342 						btr_cur->rtr_info->fd_del
343 							= true;
344 					}
345 				}
346 			} else {
347 				page_cur_mode_t	page_mode = mode;
348 
349 				if (level == target_level
350 				    && target_level != 0) {
351 					page_mode = PAGE_CUR_RTREE_GET_FATHER;
352 				}
353 				found = rtr_cur_search_with_match(
354 					block, index, tuple, page_mode,
355 					page_cursor, btr_cur->rtr_info);
356 
357 				/* Save the position of parent if needed */
358 				if (found && need_parent) {
359 					btr_pcur_t*     r_cursor =
360 						rtr_get_parent_cursor(
361 							btr_cur, level, false);
362 
363 					rec_t*	rec = page_cur_get_rec(
364 						page_cursor);
365 					page_cur_position(
366 						rec, block,
367 						btr_pcur_get_page_cur(r_cursor));
368 					r_cursor->pos_state =
369 						 BTR_PCUR_IS_POSITIONED;
370 					r_cursor->latch_mode = my_latch_mode;
371 					btr_pcur_store_position(r_cursor, mtr);
372 #ifdef UNIV_DEBUG
373 					ulint num_stored =
374 						rtr_store_parent_path(
375 							block, btr_cur,
376 							rw_latch, level, mtr);
377 					ut_ad(num_stored > 0);
378 #else
379 					rtr_store_parent_path(
380 						block, btr_cur, rw_latch,
381 						level, mtr);
382 #endif /* UNIV_DEBUG */
383 				}
384 			}
385 		} else {
386 			found = rtr_cur_search_with_match(
387 				block, index, tuple, mode, page_cursor,
388 				btr_cur->rtr_info);
389 		}
390 
391 		/* Attach predicate lock if needed, no matter whether
392 		there are matched records */
393 		if (mode != PAGE_CUR_RTREE_INSERT
394 		    && mode != PAGE_CUR_RTREE_LOCATE
395 		    && mode >= PAGE_CUR_CONTAIN
396 		    && btr_cur->rtr_info->need_prdt_lock) {
397 			lock_prdt_t	prdt;
398 
399 			trx_t*		trx = thr_get_trx(
400 						btr_cur->rtr_info->thr);
401 			lock_mutex_enter();
402 			lock_init_prdt_from_mbr(
403 				&prdt, &btr_cur->rtr_info->mbr,
404 				mode, trx->lock.lock_heap);
405 			lock_mutex_exit();
406 
407 			if (rw_latch == RW_NO_LATCH) {
408 				rw_lock_s_lock(&(block->lock));
409 			}
410 
411 			lock_prdt_lock(block, &prdt, index, LOCK_S,
412 				       LOCK_PREDICATE, btr_cur->rtr_info->thr,
413 				       mtr);
414 
415 			if (rw_latch == RW_NO_LATCH) {
416 				rw_lock_s_unlock(&(block->lock));
417 			}
418 		}
419 
420 		if (found) {
421 			if (level == target_level) {
422 				page_cur_t*	r_cur;;
423 
424 				if (my_latch_mode == BTR_MODIFY_TREE
425 				    && level == 0) {
426 					ut_ad(rw_latch == RW_NO_LATCH);
427 					page_id_t	my_page_id(
428 						space, block->page.id.page_no());
429 
430 					btr_cur_latch_leaves(
431 						block, my_page_id,
432 						page_size, BTR_MODIFY_TREE,
433 						btr_cur, mtr);
434 				}
435 
436 				r_cur = btr_cur_get_page_cur(btr_cur);
437 
438 				page_cur_position(
439 					page_cur_get_rec(page_cursor),
440 					page_cur_get_block(page_cursor),
441 					r_cur);
442 
443 				btr_cur->low_match = level != 0 ?
444 					DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1
445 					: btr_cur->low_match;
446 				break;
447 			}
448 
449 			/* Keep the parent path node, which points to
450 			last node just located */
451 			skip_parent = true;
452 		} else {
453 			/* Release latch on the current page */
454 			ut_ad(rtr_info->tree_blocks[tree_idx]);
455 
456 			mtr_release_block_at_savepoint(
457 				mtr, rtr_info->tree_savepoints[tree_idx],
458 				rtr_info->tree_blocks[tree_idx]);
459 			rtr_info->tree_blocks[tree_idx] = NULL;
460 		}
461 
462 	} while (!rtr_info->path->empty());
463 
464 	const rec_t* rec = btr_cur_get_rec(btr_cur);
465 
466 	if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
467 		mtr_commit(mtr);
468 		mtr_start(mtr);
469 	} else if (!index_locked) {
470 		mtr_memo_release(mtr, dict_index_get_lock(index),
471 				 MTR_MEMO_X_LOCK);
472 	}
473 
474 	return(found);
475 }
476 
477 /*************************************************************//**
478 Find the next matching record. This function will first exhaust
479 the copied record listed in the rtr_info->matches vector before
480 moving to the next page
481 @return true if there is suitable record found, otherwise false */
482 bool
rtr_pcur_move_to_next(const dtuple_t * tuple,page_cur_mode_t mode,btr_pcur_t * cursor,ulint level,mtr_t * mtr)483 rtr_pcur_move_to_next(
484 /*==================*/
485 	const dtuple_t*	tuple,	/*!< in: data tuple; NOTE: n_fields_cmp in
486 				tuple must be set so that it cannot get
487 				compared to the node ptr page number field! */
488 	page_cur_mode_t	mode,	/*!< in: cursor search mode */
489 	btr_pcur_t*	cursor,	/*!< in: persistent cursor; NOTE that the
490 				function may release the page latch */
491 	ulint		level,	/*!< in: target level */
492 	mtr_t*		mtr)	/*!< in: mtr */
493 {
494 	rtr_info_t*	rtr_info = cursor->btr_cur.rtr_info;
495 
496 	ut_a(cursor->pos_state == BTR_PCUR_IS_POSITIONED);
497 
498 	mutex_enter(&rtr_info->matches->rtr_match_mutex);
499 	/* First retrieve the next record on the current page */
500 	if (!rtr_info->matches->matched_recs->empty()) {
501 		rtr_rec_t	rec;
502 		rec = rtr_info->matches->matched_recs->back();
503 		rtr_info->matches->matched_recs->pop_back();
504 		mutex_exit(&rtr_info->matches->rtr_match_mutex);
505 
506 		cursor->btr_cur.page_cur.rec = rec.r_rec;
507 		cursor->btr_cur.page_cur.block = &rtr_info->matches->block;
508 
509 		DEBUG_SYNC_C("rtr_pcur_move_to_next_return");
510 		return(true);
511 	}
512 
513 	mutex_exit(&rtr_info->matches->rtr_match_mutex);
514 
515 	/* Fetch the next page */
516 	return(rtr_pcur_getnext_from_path(tuple, mode, &cursor->btr_cur,
517 					 level, cursor->latch_mode,
518 					 false, mtr));
519 }
520 
521 /*************************************************************//**
522 Check if the cursor holds record pointing to the specified child page
523 @return	true if it is (pointing to the child page) false otherwise */
524 static
525 bool
rtr_compare_cursor_rec(dict_index_t * index,btr_cur_t * cursor,ulint page_no,mem_heap_t ** heap)526 rtr_compare_cursor_rec(
527 /*===================*/
528 	dict_index_t*	index,		/*!< in: index */
529 	btr_cur_t*	cursor,		/*!< in: Cursor to check */
530 	ulint		page_no,	/*!< in: desired child page number */
531 	mem_heap_t**	heap)		/*!< in: memory heap */
532 {
533 	const rec_t*	rec;
534 	ulint*		offsets;
535 
536 	rec = btr_cur_get_rec(cursor);
537 
538 	offsets = rec_get_offsets(
539 		rec, index, NULL, ULINT_UNDEFINED, heap);
540 
541 	return(btr_node_ptr_get_child_page_no(rec, offsets) == page_no);
542 }
543 
544 /**************************************************************//**
545 Initializes and opens a persistent cursor to an index tree. It should be
546 closed with btr_pcur_close. Mainly called by row_search_index_entry() */
547 void
rtr_pcur_open_low(dict_index_t * index,ulint level,const dtuple_t * tuple,page_cur_mode_t mode,ulint latch_mode,btr_pcur_t * cursor,const char * file,ulint line,mtr_t * mtr)548 rtr_pcur_open_low(
549 /*==============*/
550 	dict_index_t*	index,	/*!< in: index */
551 	ulint		level,	/*!< in: level in the rtree */
552 	const dtuple_t*	tuple,	/*!< in: tuple on which search done */
553 	page_cur_mode_t	mode,	/*!< in: PAGE_CUR_RTREE_LOCATE, ... */
554 	ulint		latch_mode,/*!< in: BTR_SEARCH_LEAF, ... */
555 	btr_pcur_t*	cursor, /*!< in: memory buffer for persistent cursor */
556 	const char*	file,	/*!< in: file name */
557 	ulint		line,	/*!< in: line where called */
558 	mtr_t*		mtr)	/*!< in: mtr */
559 {
560 	btr_cur_t*	btr_cursor;
561 	ulint		n_fields;
562 	ulint		low_match;
563 	rec_t*		rec;
564 	bool		tree_latched = false;
565 	bool		for_delete = false;
566 	bool		for_undo_ins = false;
567 
568 	ut_ad(level == 0);
569 
570 	ut_ad(latch_mode & BTR_MODIFY_LEAF || latch_mode & BTR_MODIFY_TREE);
571 	ut_ad(mode == PAGE_CUR_RTREE_LOCATE);
572 
573 	/* Initialize the cursor */
574 
575 	btr_pcur_init(cursor);
576 
577 	for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
578 	for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
579 
580 	cursor->latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
581 	cursor->search_mode = mode;
582 
583 	/* Search with the tree cursor */
584 
585 	btr_cursor = btr_pcur_get_btr_cur(cursor);
586 
587 	btr_cursor->rtr_info = rtr_create_rtr_info(false, false,
588 						   btr_cursor, index);
589 
590 	/* Purge will SX lock the tree instead of take Page Locks */
591 	if (btr_cursor->thr) {
592 		btr_cursor->rtr_info->need_page_lock = true;
593 		btr_cursor->rtr_info->thr = btr_cursor->thr;
594 	}
595 
596 	btr_cur_search_to_nth_level(index, level, tuple, mode, latch_mode,
597 				    btr_cursor, 0, file, line, mtr);
598 	cursor->pos_state = BTR_PCUR_IS_POSITIONED;
599 
600 	cursor->trx_if_known = NULL;
601 
602 	low_match = btr_pcur_get_low_match(cursor);
603 
604 	rec = btr_pcur_get_rec(cursor);
605 
606 	n_fields = dtuple_get_n_fields(tuple);
607 
608 	if (latch_mode & BTR_ALREADY_S_LATCHED) {
609 		ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
610 				  MTR_MEMO_S_LOCK));
611 		tree_latched = true;
612 	}
613 
614 	if (latch_mode & BTR_MODIFY_TREE) {
615 		ut_ad(mtr_memo_contains(mtr, dict_index_get_lock(index),
616 					MTR_MEMO_X_LOCK)
617 		      || mtr_memo_contains(mtr, dict_index_get_lock(index),
618 					   MTR_MEMO_SX_LOCK));
619 		tree_latched = true;
620 	}
621 
622 	if (page_rec_is_infimum(rec) || low_match != n_fields
623 	    || (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
624 		&& (for_delete || for_undo_ins))) {
625 
626 		if (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
627 		    && for_delete) {
628 			btr_cursor->rtr_info->fd_del = true;
629 			btr_cursor->low_match = 0;
630 		}
631 		/* Did not find matched row in first dive. Release
632 		latched block if any before search more pages */
633 		if (latch_mode & BTR_MODIFY_LEAF) {
634 			ulint		tree_idx = btr_cursor->tree_height - 1;
635 			rtr_info_t*	rtr_info = btr_cursor->rtr_info;
636 
637 			ut_ad(level == 0);
638 
639 			if (rtr_info->tree_blocks[tree_idx]) {
640 				mtr_release_block_at_savepoint(
641 					mtr,
642 					rtr_info->tree_savepoints[tree_idx],
643 					rtr_info->tree_blocks[tree_idx]);
644 				rtr_info->tree_blocks[tree_idx] = NULL;
645 			}
646 		}
647 
648 		bool	ret = rtr_pcur_getnext_from_path(
649 			tuple, mode, btr_cursor, level, latch_mode,
650 			tree_latched, mtr);
651 
652 		if (ret) {
653 			low_match = btr_pcur_get_low_match(cursor);
654 			ut_ad(low_match == n_fields);
655 		}
656 	}
657 }
658 
659 /* Get the rtree page father.
660 @param[in]	index		rtree index
661 @param[in]	block		child page in the index
662 @param[in]	mtr		mtr
663 @param[in]	sea_cur		search cursor, contains information
664 				about parent nodes in search
665 @param[in]	cursor		cursor on node pointer record,
666 				its page x-latched */
667 void
rtr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)668 rtr_page_get_father(
669 	dict_index_t*	index,
670 	buf_block_t*	block,
671 	mtr_t*		mtr,
672 	btr_cur_t*	sea_cur,
673 	btr_cur_t*	cursor)
674 {
675 	mem_heap_t*	heap = mem_heap_create(100);
676 #ifdef UNIV_DEBUG
677 	ulint*		offsets;
678 
679 	offsets = rtr_page_get_father_block(
680 		NULL, heap, index, block, mtr, sea_cur, cursor);
681 
682 	ulint	page_no = btr_node_ptr_get_child_page_no(cursor->page_cur.rec,
683 							 offsets);
684 
685 	ut_ad(page_no == block->page.id.page_no());
686 #else
687 	rtr_page_get_father_block(
688 		NULL, heap, index, block, mtr, sea_cur, cursor);
689 #endif
690 
691 	mem_heap_free(heap);
692 }
693 
694 /************************************************************//**
695 Returns the father block to a page. It is assumed that mtr holds
696 an X or SX latch on the tree.
697 @return rec_get_offsets() of the node pointer record */
698 ulint*
rtr_page_get_father_block(ulint * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)699 rtr_page_get_father_block(
700 /*======================*/
701 	ulint*		offsets,/*!< in: work area for the return value */
702 	mem_heap_t*	heap,	/*!< in: memory heap to use */
703 	dict_index_t*	index,	/*!< in: b-tree index */
704 	buf_block_t*	block,	/*!< in: child page in the index */
705 	mtr_t*		mtr,	/*!< in: mtr */
706 	btr_cur_t*	sea_cur,/*!< in: search cursor, contains information
707 				about parent nodes in search */
708 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
709 				its page x-latched */
710 {
711 
712 	rec_t*  rec = page_rec_get_next(
713 		page_get_infimum_rec(buf_block_get_frame(block)));
714 	btr_cur_position(index, rec, block, cursor);
715 
716 	return(rtr_page_get_father_node_ptr(offsets, heap, sea_cur,
717 					    cursor, mtr));
718 }
719 
720 /************************************************************//**
721 Returns the upper level node pointer to a R-Tree page. It is assumed
722 that mtr holds an x-latch on the tree.
723 @return	rec_get_offsets() of the node pointer record */
724 ulint*
rtr_page_get_father_node_ptr_func(ulint * offsets,mem_heap_t * heap,btr_cur_t * sea_cur,btr_cur_t * cursor,const char * file,ulint line,mtr_t * mtr)725 rtr_page_get_father_node_ptr_func(
726 /*==============================*/
727 	ulint*		offsets,/*!< in: work area for the return value */
728 	mem_heap_t*	heap,	/*!< in: memory heap to use */
729 	btr_cur_t*	sea_cur,/*!< in: search cursor */
730 	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
731 				out: cursor on node pointer record,
732 				its page x-latched */
733 	const char*	file,	/*!< in: file name */
734 	ulint		line,	/*!< in: line where called */
735 	mtr_t*		mtr)	/*!< in: mtr */
736 {
737 	dtuple_t*	tuple;
738 	rec_t*		user_rec;
739 	rec_t*		node_ptr;
740 	ulint		level;
741 	ulint		page_no;
742 	dict_index_t*	index;
743 	rtr_mbr_t	mbr;
744 
745 	page_no = btr_cur_get_block(cursor)->page.id.page_no();
746 	index = btr_cur_get_index(cursor);
747 
748 	ut_ad(srv_read_only_mode
749 	      || mtr_memo_contains_flagged(mtr, dict_index_get_lock(index),
750 					   MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
751 
752 	ut_ad(dict_index_get_page(index) != page_no);
753 
754 	level = btr_page_get_level(btr_cur_get_page(cursor), mtr);
755 
756 	user_rec = btr_cur_get_rec(cursor);
757 	ut_a(page_rec_is_user_rec(user_rec));
758 
759 	offsets = rec_get_offsets(user_rec, index, offsets,
760 				  ULINT_UNDEFINED, &heap);
761 	rtr_get_mbr_from_rec(user_rec, offsets, &mbr);
762 
763 	tuple = rtr_index_build_node_ptr(
764 			index, &mbr, user_rec, page_no, heap, level);
765 
766 	if (sea_cur && !sea_cur->rtr_info) {
767 		sea_cur = NULL;
768 	}
769 
770 	rtr_get_father_node(index, level + 1, tuple, sea_cur, cursor,
771 			    page_no, mtr);
772 
773 	node_ptr = btr_cur_get_rec(cursor);
774 	ut_ad(!page_rec_is_comp(node_ptr)
775 	      || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
776 	offsets = rec_get_offsets(node_ptr, index, offsets,
777 				  ULINT_UNDEFINED, &heap);
778 
779 	ulint	child_page = btr_node_ptr_get_child_page_no(node_ptr, offsets);
780 
781 	if (child_page != page_no) {
782 		const rec_t*	print_rec;
783 
784 		ib::fatal	error;
785 
786 		error << "Corruption of index " << index->name
787 			<< " of table " << index->table->name
788 			<< " parent page " << page_no
789 			<< " child page " << child_page;
790 
791 		print_rec = page_rec_get_next(
792 			page_get_infimum_rec(page_align(user_rec)));
793 		offsets = rec_get_offsets(print_rec, index,
794 					  offsets, ULINT_UNDEFINED, &heap);
795 		error << "; child ";
796 		rec_print(error.m_oss, print_rec,
797 			  rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
798 			  offsets);
799 		offsets = rec_get_offsets(node_ptr, index, offsets,
800 					  ULINT_UNDEFINED, &heap);
801 		error << "; parent ";
802 		rec_print(error.m_oss, print_rec,
803 			  rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
804 			  offsets);
805 
806 		error << ". You should dump + drop + reimport the table to"
807 			" fix the corruption. If the crash happens at"
808 			" database startup, see " REFMAN
809 			"forcing-innodb-recovery.html about forcing"
810 			" recovery. Then dump + drop + reimport.";
811 	}
812 
813 	return(offsets);
814 }
815 
816 /********************************************************************//**
817 Returns the upper level node pointer to a R-Tree page. It is assumed
818 that mtr holds an x-latch on the tree. */
819 void
rtr_get_father_node(dict_index_t * index,ulint level,const dtuple_t * tuple,btr_cur_t * sea_cur,btr_cur_t * btr_cur,ulint page_no,mtr_t * mtr)820 rtr_get_father_node(
821 /*================*/
822 	dict_index_t*	index,	/*!< in: index */
823 	ulint		level,	/*!< in: the tree level of search */
824 	const dtuple_t*	tuple,	/*!< in: data tuple; NOTE: n_fields_cmp in
825 				tuple must be set so that it cannot get
826 				compared to the node ptr page number field! */
827 	btr_cur_t*	sea_cur,/*!< in: search cursor */
828 	btr_cur_t*	btr_cur,/*!< in/out: tree cursor; the cursor page is
829 				s- or x-latched, but see also above! */
830 	ulint		page_no,/*!< Current page no */
831 	mtr_t*		mtr)	/*!< in: mtr */
832 {
833 	mem_heap_t*	heap = NULL;
834 	bool		ret = false;
835 	const rec_t*	rec;
836 	ulint		n_fields;
837 	bool		new_rtr = false;
838 
839 	/* Try to optimally locate the parent node. Level should always
840 	less than sea_cur->tree_height unless the root is splitting */
841 	if (sea_cur && sea_cur->tree_height > level) {
842 
843 		ut_ad(mtr_memo_contains_flagged(mtr,
844 						dict_index_get_lock(index),
845 						MTR_MEMO_X_LOCK
846 						| MTR_MEMO_SX_LOCK));
847 		ret = rtr_cur_restore_position(
848 			BTR_CONT_MODIFY_TREE, sea_cur, level, mtr);
849 
850 		/* Once we block shrink tree nodes while there are
851 		active search on it, this optimal locating should always
852 		succeeds */
853 		ut_ad(ret);
854 
855 		if (ret) {
856 			btr_pcur_t*	r_cursor = rtr_get_parent_cursor(
857 						sea_cur, level, false);
858 
859 			rec = btr_pcur_get_rec(r_cursor);
860 
861 			ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
862 			page_cur_position(rec,
863 					  btr_pcur_get_block(r_cursor),
864 					  btr_cur_get_page_cur(btr_cur));
865 			btr_cur->rtr_info = sea_cur->rtr_info;
866 			btr_cur->tree_height = sea_cur->tree_height;
867 			ut_ad(rtr_compare_cursor_rec(
868 				index, btr_cur, page_no, &heap));
869 			goto func_exit;
870 		}
871 	}
872 
873 	/* We arrive here in one of two scenario
874 	1) check table and btr_valide
875 	2) index root page being raised */
876 	ut_ad(!sea_cur || sea_cur->tree_height == level);
877 
878 	if (btr_cur->rtr_info) {
879 		rtr_clean_rtr_info(btr_cur->rtr_info, true);
880 	} else {
881 		new_rtr = true;
882 	}
883 
884 	btr_cur->rtr_info = rtr_create_rtr_info(false, false, btr_cur, index);
885 
886 	if (sea_cur && sea_cur->tree_height == level) {
887 		/* root split, and search the new root */
888 		btr_cur_search_to_nth_level(
889 			index, level, tuple, PAGE_CUR_RTREE_LOCATE,
890 			BTR_CONT_MODIFY_TREE, btr_cur, 0,
891 			__FILE__, __LINE__, mtr);
892 
893 	} else {
894 		/* btr_validate */
895 		ut_ad(level >= 1);
896 		ut_ad(!sea_cur);
897 
898 		btr_cur_search_to_nth_level(
899 			index, level, tuple, PAGE_CUR_RTREE_LOCATE,
900 			BTR_CONT_MODIFY_TREE, btr_cur, 0,
901 			__FILE__, __LINE__, mtr);
902 
903 		rec = btr_cur_get_rec(btr_cur);
904 		n_fields = dtuple_get_n_fields_cmp(tuple);
905 
906 		if (page_rec_is_infimum(rec)
907 		    || (btr_cur->low_match != n_fields)) {
908 			ret = rtr_pcur_getnext_from_path(
909 				tuple, PAGE_CUR_RTREE_LOCATE, btr_cur,
910 				level, BTR_CONT_MODIFY_TREE,
911 				true, mtr);
912 
913 			ut_ad(ret && btr_cur->low_match == n_fields);
914 		}
915 	}
916 
917 	ret = rtr_compare_cursor_rec(
918 		index, btr_cur, page_no, &heap);
919 
920 	ut_ad(ret);
921 
922 func_exit:
923 	if (heap) {
924 		mem_heap_free(heap);
925 	}
926 
927 	if (new_rtr && btr_cur->rtr_info) {
928 		rtr_clean_rtr_info(btr_cur->rtr_info, true);
929 		btr_cur->rtr_info = NULL;
930 	}
931 }
932 
933 /*******************************************************************//**
934 Create a RTree search info structure */
935 rtr_info_t*
rtr_create_rtr_info(bool need_prdt,bool init_matches,btr_cur_t * cursor,dict_index_t * index)936 rtr_create_rtr_info(
937 /******************/
938 	bool		need_prdt,	/*!< in: Whether predicate lock
939 					is needed */
940 	bool		init_matches,	/*!< in: Whether to initiate the
941 					"matches" structure for collecting
942 					matched leaf records */
943 	btr_cur_t*	cursor,		/*!< in: tree search cursor */
944 	dict_index_t*	index)		/*!< in: index struct */
945 {
946 	rtr_info_t*	rtr_info;
947 
948 	index = index ? index : cursor->index;
949 	ut_ad(index);
950 
951 	rtr_info = static_cast<rtr_info_t*>(ut_zalloc_nokey(sizeof(*rtr_info)));
952 
953 	rtr_info->allocated = true;
954 	rtr_info->cursor = cursor;
955 	rtr_info->index = index;
956 
957 	if (init_matches) {
958 		rtr_info->heap = mem_heap_create(sizeof(*(rtr_info->matches)));
959 		rtr_info->matches = static_cast<matched_rec_t*>(
960 					mem_heap_zalloc(
961 						rtr_info->heap,
962 						sizeof(*rtr_info->matches)));
963 
964 		rtr_info->matches->matched_recs
965 			= UT_NEW_NOKEY(rtr_rec_vector());
966 
967 		rtr_info->matches->bufp = page_align(rtr_info->matches->rec_buf
968 						     + UNIV_PAGE_SIZE_MAX + 1);
969 		mutex_create(LATCH_ID_RTR_MATCH_MUTEX,
970 			     &rtr_info->matches->rtr_match_mutex);
971 		rw_lock_create(PFS_NOT_INSTRUMENTED,
972 			       &(rtr_info->matches->block.lock),
973 			      SYNC_LEVEL_VARYING);
974 	}
975 
976 	rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
977 	rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
978 	rtr_info->need_prdt_lock = need_prdt;
979 	mutex_create(LATCH_ID_RTR_PATH_MUTEX,
980 		     &rtr_info->rtr_path_mutex);
981 
982 	mutex_enter(&index->rtr_track->rtr_active_mutex);
983 	index->rtr_track->rtr_active->push_back(rtr_info);
984 	mutex_exit(&index->rtr_track->rtr_active_mutex);
985 	return(rtr_info);
986 }
987 
988 /*******************************************************************//**
989 Update a btr_cur_t with rtr_info */
990 void
rtr_info_update_btr(btr_cur_t * cursor,rtr_info_t * rtr_info)991 rtr_info_update_btr(
992 /******************/
993 	btr_cur_t*	cursor,		/*!< in/out: tree cursor */
994 	rtr_info_t*	rtr_info)	/*!< in: rtr_info to set to the
995 					cursor */
996 {
997 	ut_ad(rtr_info);
998 
999 	cursor->rtr_info = rtr_info;
1000 }
1001 
1002 /*******************************************************************//**
1003 Initialize a R-Tree Search structure */
1004 void
rtr_init_rtr_info(rtr_info_t * rtr_info,bool need_prdt,btr_cur_t * cursor,dict_index_t * index,bool reinit)1005 rtr_init_rtr_info(
1006 /****************/
1007 	rtr_info_t*	rtr_info,	/*!< in: rtr_info to set to the
1008 					cursor */
1009 	bool		need_prdt,	/*!< in: Whether predicate lock is
1010 					needed */
1011 	btr_cur_t*	cursor,		/*!< in: tree search cursor */
1012 	dict_index_t*	index,		/*!< in: index structure */
1013 	bool		reinit)		/*!< in: Whether this is a reinit */
1014 {
1015 	ut_ad(rtr_info);
1016 
1017 	if (!reinit) {
1018 		/* Reset all members. */
1019 		rtr_info->path = NULL;
1020 		rtr_info->parent_path = NULL;
1021 		rtr_info->matches = NULL;
1022 
1023 		mutex_create(LATCH_ID_RTR_PATH_MUTEX,
1024 			     &rtr_info->rtr_path_mutex);
1025 
1026 		memset(rtr_info->tree_blocks, 0x0,
1027 		       sizeof(rtr_info->tree_blocks));
1028 		memset(rtr_info->tree_savepoints, 0x0,
1029 		       sizeof(rtr_info->tree_savepoints));
1030 		rtr_info->mbr.xmin = 0.0;
1031 		rtr_info->mbr.xmax = 0.0;
1032 		rtr_info->mbr.ymin = 0.0;
1033 		rtr_info->mbr.ymax = 0.0;
1034 		rtr_info->thr = NULL;
1035 		rtr_info->heap = NULL;
1036 		rtr_info->cursor = NULL;
1037 		rtr_info->index = NULL;
1038 		rtr_info->need_prdt_lock = false;
1039 		rtr_info->need_page_lock = false;
1040 		rtr_info->allocated = false;
1041 		rtr_info->mbr_adj = false;
1042 		rtr_info->fd_del = false;
1043 		rtr_info->search_tuple = NULL;
1044 		rtr_info->search_mode = PAGE_CUR_UNSUPP;
1045 	}
1046 
1047 	ut_ad(!rtr_info->matches || rtr_info->matches->matched_recs->empty());
1048 
1049 	rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
1050 	rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
1051 	rtr_info->need_prdt_lock = need_prdt;
1052 	rtr_info->cursor = cursor;
1053 	rtr_info->index = index;
1054 
1055 	mutex_enter(&index->rtr_track->rtr_active_mutex);
1056 	index->rtr_track->rtr_active->push_back(rtr_info);
1057 	mutex_exit(&index->rtr_track->rtr_active_mutex);
1058 }
1059 
1060 /**************************************************************//**
1061 Clean up R-Tree search structure */
1062 void
rtr_clean_rtr_info(rtr_info_t * rtr_info,bool free_all)1063 rtr_clean_rtr_info(
1064 /*===============*/
1065 	rtr_info_t*	rtr_info,	/*!< in: RTree search info */
1066 	bool		free_all)	/*!< in: need to free rtr_info itself */
1067 {
1068 	dict_index_t*	index;
1069 	bool		initialized = false;
1070 
1071 	if (!rtr_info) {
1072 		return;
1073 	}
1074 
1075 	index = rtr_info->index;
1076 
1077 	if (index) {
1078 		mutex_enter(&index->rtr_track->rtr_active_mutex);
1079 	}
1080 
1081 	while (rtr_info->parent_path && !rtr_info->parent_path->empty()) {
1082 		btr_pcur_t*	cur = rtr_info->parent_path->back().cursor;
1083 		rtr_info->parent_path->pop_back();
1084 
1085 		if (cur) {
1086 			btr_pcur_close(cur);
1087 			ut_free(cur);
1088 		}
1089 	}
1090 
1091 	UT_DELETE(rtr_info->parent_path);
1092 	rtr_info->parent_path = NULL;
1093 
1094 	if (rtr_info->path != NULL) {
1095 		UT_DELETE(rtr_info->path);
1096 		rtr_info->path = NULL;
1097 		initialized = true;
1098 	}
1099 
1100 	if (rtr_info->matches) {
1101 		rtr_info->matches->used = false;
1102 		rtr_info->matches->locked = false;
1103 		rtr_info->matches->valid = false;
1104 		rtr_info->matches->matched_recs->clear();
1105 	}
1106 
1107 	if (index) {
1108 		index->rtr_track->rtr_active->remove(rtr_info);
1109 		mutex_exit(&index->rtr_track->rtr_active_mutex);
1110 	}
1111 
1112 	if (free_all) {
1113 		if (rtr_info->matches) {
1114 			if (rtr_info->matches->matched_recs != NULL) {
1115 				UT_DELETE(rtr_info->matches->matched_recs);
1116 			}
1117 
1118 			rw_lock_free(&(rtr_info->matches->block.lock));
1119 
1120 			mutex_destroy(&rtr_info->matches->rtr_match_mutex);
1121 		}
1122 
1123 		if (rtr_info->heap) {
1124 			mem_heap_free(rtr_info->heap);
1125 		}
1126 
1127 		if (initialized) {
1128 			mutex_destroy(&rtr_info->rtr_path_mutex);
1129 		}
1130 
1131 		if (rtr_info->allocated) {
1132 			ut_free(rtr_info);
1133 		}
1134 	}
1135 }
1136 
1137 /**************************************************************//**
1138 Rebuilt the "path" to exclude the removing page no */
1139 static
1140 void
rtr_rebuild_path(rtr_info_t * rtr_info,ulint page_no)1141 rtr_rebuild_path(
1142 /*=============*/
1143 	rtr_info_t*	rtr_info,	/*!< in: RTree search info */
1144 	ulint		page_no)	/*!< in: need to free rtr_info itself */
1145 {
1146 	rtr_node_path_t*		new_path
1147 		= UT_NEW_NOKEY(rtr_node_path_t());
1148 
1149 	rtr_node_path_t::iterator	rit;
1150 #ifdef UNIV_DEBUG
1151 	ulint	before_size = rtr_info->path->size();
1152 #endif /* UNIV_DEBUG */
1153 
1154 	for (rit = rtr_info->path->begin();
1155 	     rit != rtr_info->path->end(); ++rit) {
1156 		node_visit_t	next_rec = *rit;
1157 
1158 		if (next_rec.page_no == page_no) {
1159 			continue;
1160 		}
1161 
1162 		new_path->push_back(next_rec);
1163 #ifdef UNIV_DEBUG
1164 		node_visit_t	rec = new_path->back();
1165 		ut_ad(rec.level < rtr_info->cursor->tree_height
1166 		      && rec.page_no > 0);
1167 #endif /* UNIV_DEBUG */
1168 	}
1169 
1170 	UT_DELETE(rtr_info->path);
1171 
1172 	ut_ad(new_path->size() == before_size - 1);
1173 
1174 	rtr_info->path = new_path;
1175 
1176 	if (!rtr_info->parent_path->empty()) {
1177 		rtr_node_path_t*	new_parent_path = UT_NEW_NOKEY(
1178 			rtr_node_path_t());
1179 
1180 		for (rit = rtr_info->parent_path->begin();
1181 		     rit != rtr_info->parent_path->end(); ++rit) {
1182 			node_visit_t	next_rec = *rit;
1183 
1184 			if (next_rec.child_no == page_no) {
1185 				btr_pcur_t*	cur = next_rec.cursor;
1186 
1187 				if (cur) {
1188 					btr_pcur_close(cur);
1189 					ut_free(cur);
1190 				}
1191 
1192 				continue;
1193 			}
1194 
1195 			new_parent_path->push_back(next_rec);
1196 		}
1197 		UT_DELETE(rtr_info->parent_path);
1198 		rtr_info->parent_path = new_parent_path;
1199 	}
1200 
1201 }
1202 
1203 /**************************************************************//**
1204 Check whether a discarding page is in anyone's search path */
1205 void
rtr_check_discard_page(dict_index_t * index,btr_cur_t * cursor,buf_block_t * block)1206 rtr_check_discard_page(
1207 /*===================*/
1208 	dict_index_t*	index,	/*!< in: index */
1209 	btr_cur_t*	cursor, /*!< in: cursor on the page to discard: not on
1210 				the root page */
1211 	buf_block_t*	block)	/*!< in: block of page to be discarded */
1212 {
1213 	ulint			pageno = block->page.id.page_no();
1214 	rtr_info_t*		rtr_info;
1215 	rtr_info_active::iterator	it;
1216 
1217 	mutex_enter(&index->rtr_track->rtr_active_mutex);
1218 
1219 	for (it = index->rtr_track->rtr_active->begin();
1220 	     it != index->rtr_track->rtr_active->end(); ++it) {
1221 		rtr_info = *it;
1222 		rtr_node_path_t::iterator	rit;
1223 		bool	found = false;
1224 
1225 		if (cursor && rtr_info == cursor->rtr_info) {
1226 			continue;
1227 		}
1228 
1229 		mutex_enter(&rtr_info->rtr_path_mutex);
1230 		for (rit = rtr_info->path->begin();
1231 		     rit != rtr_info->path->end(); ++rit) {
1232 			node_visit_t	node = *rit;
1233 
1234 			if (node.page_no == pageno) {
1235 				found = true;
1236 				break;
1237 			}
1238 		}
1239 
1240 		if (found) {
1241 			rtr_rebuild_path(rtr_info, pageno);
1242 		}
1243 		mutex_exit(&rtr_info->rtr_path_mutex);
1244 
1245 		if (rtr_info->matches) {
1246 			mutex_enter(&rtr_info->matches->rtr_match_mutex);
1247 
1248 			if ((&rtr_info->matches->block)->page.id.page_no()
1249 			     == pageno) {
1250 				if (!rtr_info->matches->matched_recs->empty()) {
1251 					rtr_info->matches->matched_recs->clear();
1252 				}
1253 				ut_ad(rtr_info->matches->matched_recs->empty());
1254 				rtr_info->matches->valid = false;
1255 			}
1256 
1257 			mutex_exit(&rtr_info->matches->rtr_match_mutex);
1258 		}
1259 	}
1260 
1261 	mutex_exit(&index->rtr_track->rtr_active_mutex);
1262 
1263 	lock_mutex_enter();
1264 	lock_prdt_page_free_from_discard(block, lock_sys->prdt_hash);
1265 	lock_prdt_page_free_from_discard(block, lock_sys->prdt_page_hash);
1266 	lock_mutex_exit();
1267 }
1268 /** This is a backported version of a lambda expression:
1269   [&](buf_block_t *hint) {
1270      return hint != nullptr && buf_page_optimistic_get(...hint...);
1271   }
1272 for compilers which do not support lambda expressions, nor passing local types
1273 as template arguments. */
1274 struct Buf_page_optimistic_get_functor_t{
1275 	btr_pcur_t * &r_cursor;
1276 	const char * &file;
1277 	ulint &line;
1278 	mtr_t * &mtr;
operator ()Buf_page_optimistic_get_functor_t1279 	bool operator()(buf_block_t *hint) const {
1280 		return hint != NULL && buf_page_optimistic_get(
1281 			RW_X_LATCH, hint, r_cursor->modify_clock, file, line, mtr
1282 		);
1283 	}
1284 };
1285 /**************************************************************//**
1286 Restores the stored position of a persistent cursor bufferfixing the page */
1287 bool
rtr_cur_restore_position_func(ulint latch_mode,btr_cur_t * btr_cur,ulint level,const char * file,ulint line,mtr_t * mtr)1288 rtr_cur_restore_position_func(
1289 /*==========================*/
1290 	ulint		latch_mode,	/*!< in: BTR_CONT_MODIFY_TREE, ... */
1291 	btr_cur_t*	btr_cur,	/*!< in: detached persistent cursor */
1292 	ulint		level,		/*!< in: index level */
1293 	const char*	file,		/*!< in: file name */
1294 	ulint		line,		/*!< in: line where called */
1295 	mtr_t*		mtr)		/*!< in: mtr */
1296 {
1297 	dict_index_t*	index;
1298 	mem_heap_t*	heap;
1299 	btr_pcur_t*	r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
1300 	dtuple_t*	tuple;
1301 	bool		ret = false;
1302 
1303 	ut_ad(mtr);
1304 	ut_ad(r_cursor);
1305 	ut_ad(mtr->is_active());
1306 
1307 	index = btr_cur_get_index(btr_cur);
1308 
1309 	if (r_cursor->rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE
1310 	    || r_cursor->rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE) {
1311 		return(false);
1312 	}
1313 
1314 	DBUG_EXECUTE_IF(
1315 		"rtr_pessimistic_position",
1316 		r_cursor->modify_clock = 100;
1317 	);
1318 
1319 	ut_ad(latch_mode == BTR_CONT_MODIFY_TREE);
1320 	Buf_page_optimistic_get_functor_t functor={r_cursor,file,line,mtr};
1321 	if (r_cursor->block_when_stored.run_with_hint(functor)){
1322 		ut_ad(r_cursor->pos_state == BTR_PCUR_IS_POSITIONED);
1323 
1324 		ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
1325 #ifdef UNIV_DEBUG
1326 		do {
1327 			const rec_t*	rec;
1328 			const ulint*	offsets1;
1329 			const ulint*	offsets2;
1330 			ulint		comp;
1331 
1332 			rec = btr_pcur_get_rec(r_cursor);
1333 
1334 			heap = mem_heap_create(256);
1335 			offsets1 = rec_get_offsets(
1336 				r_cursor->old_rec, index, NULL,
1337 				r_cursor->old_n_fields, &heap);
1338 			offsets2 = rec_get_offsets(
1339 				rec, index, NULL,
1340 				r_cursor->old_n_fields, &heap);
1341 
1342 			comp = rec_offs_comp(offsets1);
1343 
1344 			if (rec_get_info_bits(r_cursor->old_rec, comp)
1345 			    & REC_INFO_MIN_REC_FLAG) {
1346 				ut_ad(rec_get_info_bits(rec, comp)
1347 					& REC_INFO_MIN_REC_FLAG);
1348 			} else {
1349 
1350 				ut_ad(!cmp_rec_rec(r_cursor->old_rec,
1351 						   rec, offsets1, offsets2,
1352 						   index,page_is_spatial_non_leaf(rec, index)));
1353 			}
1354 
1355 			mem_heap_free(heap);
1356 		} while (0);
1357 #endif /* UNIV_DEBUG */
1358 
1359 		return(true);
1360 	}
1361 
1362 	/* Page has changed, for R-Tree, the page cannot be shrunk away,
1363 	so we search the page and its right siblings */
1364 	buf_block_t*	block;
1365 	node_seq_t	page_ssn;
1366 	const page_t*	page;
1367 	page_cur_t*	page_cursor;
1368 	node_visit_t*	node = rtr_get_parent_node(btr_cur, level, false);
1369 	ulint		space = dict_index_get_space(index);
1370 	node_seq_t	path_ssn = node->seq_no;
1371 	page_size_t	page_size = dict_table_page_size(index->table);
1372 
1373 	ulint		page_no = node->page_no;
1374 
1375 	heap = mem_heap_create(256);
1376 
1377 	tuple = dict_index_build_data_tuple(index, r_cursor->old_rec,
1378 					    r_cursor->old_n_fields, heap);
1379 
1380 	page_cursor = btr_pcur_get_page_cur(r_cursor);
1381 	ut_ad(r_cursor == node->cursor);
1382 
1383 search_again:
1384 	page_id_t	page_id(space, page_no);
1385 
1386 	block = buf_page_get_gen(
1387 		page_id, page_size, RW_X_LATCH, NULL,
1388 		BUF_GET, __FILE__, __LINE__, mtr);
1389 
1390 	ut_ad(block);
1391 
1392 	/* Get the page SSN */
1393 	page = buf_block_get_frame(block);
1394 	page_ssn = page_get_ssn_id(page);
1395 
1396 	ulint low_match = page_cur_search(
1397 				block, index, tuple, PAGE_CUR_LE, page_cursor);
1398 
1399 	if (low_match == r_cursor->old_n_fields) {
1400 		const rec_t*	rec;
1401 		const ulint*	offsets1;
1402 		const ulint*	offsets2;
1403 		ulint		comp;
1404 
1405 		rec = btr_pcur_get_rec(r_cursor);
1406 
1407 		offsets1 = rec_get_offsets(
1408 			r_cursor->old_rec, index, NULL,
1409 			r_cursor->old_n_fields, &heap);
1410 		offsets2 = rec_get_offsets(
1411 			rec, index, NULL,
1412 			r_cursor->old_n_fields, &heap);
1413 
1414 		comp = rec_offs_comp(offsets1);
1415 
1416 		if ((rec_get_info_bits(r_cursor->old_rec, comp)
1417 		     & REC_INFO_MIN_REC_FLAG)
1418 		    && (rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG)) {
1419 			r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1420 			ret = true;
1421 		} else if (!cmp_rec_rec(r_cursor->old_rec, rec, offsets1, offsets2,
1422 				 index,page_is_spatial_non_leaf(rec, index))) {
1423 			r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1424 			ret = true;
1425 		}
1426 	}
1427 
1428 	/* Check the page SSN to see if it has been splitted, if so, search
1429 	the right page */
1430 	if (!ret && page_ssn > path_ssn) {
1431 		page_no = btr_page_get_next(page, mtr);
1432 		goto search_again;
1433 	}
1434 
1435 	mem_heap_free(heap);
1436 
1437 	return(ret);
1438 }
1439 
1440 /****************************************************************//**
1441 Copy the leaf level R-tree record, and push it to matched_rec in rtr_info */
1442 static
1443 void
rtr_leaf_push_match_rec(const rec_t * rec,rtr_info_t * rtr_info,ulint * offsets,bool is_comp)1444 rtr_leaf_push_match_rec(
1445 /*====================*/
1446 	const rec_t*	rec,		/*!< in: record to copy */
1447 	rtr_info_t*	rtr_info,	/*!< in/out: search stack */
1448 	ulint*		offsets,	/*!< in: offsets */
1449 	bool		is_comp)	/*!< in: is compact format */
1450 {
1451 	byte*		buf;
1452 	matched_rec_t*	match_rec = rtr_info->matches;
1453 	rec_t*		copy;
1454 	ulint		data_len;
1455 	rtr_rec_t	rtr_rec;
1456 
1457 	buf = match_rec->block.frame + match_rec->used;
1458 
1459 	copy = rec_copy(buf, rec, offsets);
1460 
1461 	if (is_comp) {
1462 		rec_set_next_offs_new(copy, PAGE_NEW_SUPREMUM);
1463 	} else {
1464 		rec_set_next_offs_old(copy, PAGE_OLD_SUPREMUM);
1465 	}
1466 
1467 	rtr_rec.r_rec = copy;
1468 	rtr_rec.locked = false;
1469 
1470 	match_rec->matched_recs->push_back(rtr_rec);
1471 	match_rec->valid = true;
1472 
1473 	data_len = rec_offs_data_size(offsets) + rec_offs_extra_size(offsets);
1474 	match_rec->used += data_len;
1475 
1476 	ut_ad(match_rec->used < UNIV_PAGE_SIZE);
1477 }
1478 
1479 /**************************************************************//**
1480 Store the parent path cursor
1481 @return number of cursor stored */
1482 ulint
rtr_store_parent_path(const buf_block_t * block,btr_cur_t * btr_cur,ulint latch_mode,ulint level,mtr_t * mtr)1483 rtr_store_parent_path(
1484 /*==================*/
1485 	const buf_block_t*	block,	/*!< in: block of the page */
1486 	btr_cur_t*		btr_cur,/*!< in/out: persistent cursor */
1487 	ulint			latch_mode,
1488 					/*!< in: latch_mode */
1489 	ulint			level,	/*!< in: index level */
1490 	mtr_t*			mtr)	/*!< in: mtr */
1491 {
1492 	ulint	num = btr_cur->rtr_info->parent_path->size();
1493 	ulint	num_stored = 0;
1494 
1495 	while (num >= 1) {
1496 		node_visit_t*	node = &(*btr_cur->rtr_info->parent_path)[
1497 					num - 1];
1498 		btr_pcur_t*	r_cursor = node->cursor;
1499 		buf_block_t*	cur_block;
1500 
1501 		if (node->level > level) {
1502 			break;
1503 		}
1504 
1505 		r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1506 		r_cursor->latch_mode = latch_mode;
1507 
1508 		cur_block = btr_pcur_get_block(r_cursor);
1509 
1510 		if (cur_block == block) {
1511 			btr_pcur_store_position(r_cursor, mtr);
1512 			num_stored++;
1513 		} else {
1514 			break;
1515 		}
1516 
1517 		num--;
1518 	}
1519 
1520 	return(num_stored);
1521 }
1522 /**************************************************************//**
1523 push a nonleaf index node to the search path for insertion */
1524 static
1525 void
rtr_non_leaf_insert_stack_push(dict_index_t * index,rtr_node_path_t * path,ulint level,ulint child_no,const buf_block_t * block,const rec_t * rec,double mbr_inc)1526 rtr_non_leaf_insert_stack_push(
1527 /*===========================*/
1528 	dict_index_t*		index,	/*!< in: index descriptor */
1529 	rtr_node_path_t*	path,	/*!< in/out: search path */
1530 	ulint			level,	/*!< in: index page level */
1531 	ulint			child_no,/*!< in: child page no */
1532 	const buf_block_t*	block,	/*!< in: block of the page */
1533 	const rec_t*		rec,	/*!< in: positioned record */
1534 	double			mbr_inc)/*!< in: MBR needs to be enlarged */
1535 {
1536 	node_seq_t	new_seq;
1537 	btr_pcur_t*	my_cursor;
1538 	ulint		page_no = block->page.id.page_no();
1539 
1540 	my_cursor = static_cast<btr_pcur_t*>(
1541 		ut_malloc_nokey(sizeof(*my_cursor)));
1542 
1543 	btr_pcur_init(my_cursor);
1544 
1545 	page_cur_position(rec, block, btr_pcur_get_page_cur(my_cursor));
1546 
1547 	(btr_pcur_get_btr_cur(my_cursor))->index = index;
1548 
1549 	new_seq = rtr_get_current_ssn_id(index);
1550 	rtr_non_leaf_stack_push(path, page_no, new_seq, level, child_no,
1551 				my_cursor, mbr_inc);
1552 }
1553 
1554 /** Copy a buf_block_t strcuture, except "block->lock" and "block->mutex".
1555 @param[in,out]	matches	copy to match->block
1556 @param[in]	block	block to copy */
1557 static
1558 void
rtr_copy_buf(matched_rec_t * matches,const buf_block_t * block)1559 rtr_copy_buf(
1560 	matched_rec_t*		matches,
1561 	const buf_block_t*	block)
1562 {
1563 	/* Copy all members of "block" to "matches->block" except "mutex"
1564 	and "lock". We skip "mutex" and "lock" because they are not used
1565 	from the dummy buf_block_t we create here and because memcpy()ing
1566 	them generates (valid) compiler warnings that the vtable pointer
1567 	will be copied. It is also undefined what will happen with the
1568 	newly memcpy()ed mutex if the source mutex was acquired by
1569 	(another) thread while it was copied. */
1570 	new (&matches->block.page) buf_page_t(block->page);
1571 	matches->block.frame = block->frame;
1572 #ifndef UNIV_HOTBACKUP
1573 	matches->block.unzip_LRU = block->unzip_LRU;
1574 
1575 	ut_d(matches->block.in_unzip_LRU_list = block->in_unzip_LRU_list);
1576 	ut_d(matches->block.in_withdraw_list = block->in_withdraw_list);
1577 
1578 	/* Skip buf_block_t::mutex */
1579 	/* Skip buf_block_t::lock */
1580 	matches->block.lock_hash_val = block->lock_hash_val;
1581 	matches->block.modify_clock = block->modify_clock;
1582 	matches->block.n_hash_helps = block->n_hash_helps;
1583 	matches->block.n_fields = block->n_fields;
1584 	matches->block.left_side = block->left_side;
1585 #if defined UNIV_AHI_DEBUG || defined UNIV_DEBUG
1586 	matches->block.n_pointers = block->n_pointers;
1587 #endif /* UNIV_AHI_DEBUG || UNIV_DEBUG */
1588 	matches->block.curr_n_fields = block->curr_n_fields;
1589 	matches->block.curr_left_side = block->curr_left_side;
1590 	matches->block.index = block->index;
1591 	matches->block.made_dirty_with_no_latch
1592 		= block->made_dirty_with_no_latch;
1593 
1594 	ut_d(matches->block.debug_latch = block->debug_latch);
1595 
1596 #endif /* !UNIV_HOTBACKUP */
1597 }
1598 
1599 /****************************************************************//**
1600 Generate a shadow copy of the page block header to save the
1601 matched records */
1602 static
1603 void
rtr_init_match(matched_rec_t * matches,const buf_block_t * block,const page_t * page)1604 rtr_init_match(
1605 /*===========*/
1606 	matched_rec_t*		matches,/*!< in/out: match to initialize */
1607 	const buf_block_t*	block,	/*!< in: buffer block */
1608 	const page_t*		page)	/*!< in: buffer page */
1609 {
1610 	ut_ad(matches->matched_recs->empty());
1611 	matches->locked = false;
1612 	rtr_copy_buf(matches, block);
1613 	matches->block.frame = matches->bufp;
1614 	matches->valid = false;
1615 	/* We have to copy PAGE_W*_SUPREMUM_END bytes so that we can
1616 	use infimum/supremum of this page as normal btr page for search. */
1617 	memcpy(matches->block.frame, page, page_is_comp(page)
1618 						? PAGE_NEW_SUPREMUM_END
1619 						: PAGE_OLD_SUPREMUM_END);
1620 	matches->used = page_is_comp(page)
1621 				? PAGE_NEW_SUPREMUM_END
1622 				: PAGE_OLD_SUPREMUM_END;
1623 #ifdef RTR_SEARCH_DIAGNOSTIC
1624 	ulint pageno = page_get_page_no(page);
1625 	fprintf(stderr, "INNODB_RTR: Searching leaf page %d\n",
1626 		static_cast<int>(pageno));
1627 #endif /* RTR_SEARCH_DIAGNOSTIC */
1628 }
1629 
1630 /****************************************************************//**
1631 Get the bounding box content from an index record */
1632 void
rtr_get_mbr_from_rec(const rec_t * rec,const ulint * offsets,rtr_mbr_t * mbr)1633 rtr_get_mbr_from_rec(
1634 /*=================*/
1635 	const rec_t*	rec,	/*!< in: data tuple */
1636 	const ulint*	offsets,/*!< in: offsets array */
1637 	rtr_mbr_t*	mbr)	/*!< out MBR */
1638 {
1639 	ulint		rec_f_len;
1640 	const byte*	data;
1641 
1642 	data = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1643 
1644 	rtr_read_mbr(data, mbr);
1645 }
1646 
1647 /****************************************************************//**
1648 Get the bounding box content from a MBR data record */
1649 void
rtr_get_mbr_from_tuple(const dtuple_t * dtuple,rtr_mbr * mbr)1650 rtr_get_mbr_from_tuple(
1651 /*===================*/
1652 	const dtuple_t* dtuple, /*!< in: data tuple */
1653 	rtr_mbr*	mbr)	/*!< out: mbr to fill */
1654 {
1655 	const dfield_t* dtuple_field;
1656         ulint           dtuple_f_len;
1657 	byte*		data;
1658 
1659 	dtuple_field = dtuple_get_nth_field(dtuple, 0);
1660 	dtuple_f_len = dfield_get_len(dtuple_field);
1661 	ut_a(dtuple_f_len >= 4 * sizeof(double));
1662 
1663 	data = static_cast<byte*>(dfield_get_data(dtuple_field));
1664 
1665 	rtr_read_mbr(data, mbr);
1666 }
1667 
1668 /****************************************************************//**
1669 Searches the right position in rtree for a page cursor. */
1670 bool
rtr_cur_search_with_match(const buf_block_t * block,dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,page_cur_t * cursor,rtr_info_t * rtr_info)1671 rtr_cur_search_with_match(
1672 /*======================*/
1673 	const buf_block_t*	block,	/*!< in: buffer block */
1674 	dict_index_t*		index,	/*!< in: index descriptor */
1675 	const dtuple_t*		tuple,	/*!< in: data tuple */
1676 	page_cur_mode_t		mode,	/*!< in: PAGE_CUR_RTREE_INSERT,
1677 					PAGE_CUR_RTREE_LOCATE etc. */
1678 	page_cur_t*		cursor,	/*!< in/out: page cursor */
1679 	rtr_info_t*		rtr_info)/*!< in/out: search stack */
1680 {
1681 	bool		found = false;
1682 	const page_t*	page;
1683 	const rec_t*	rec;
1684 	const rec_t*	last_rec;
1685 	ulint		offsets_[REC_OFFS_NORMAL_SIZE];
1686 	ulint*		offsets		= offsets_;
1687 	mem_heap_t*	heap = NULL;
1688 	int		cmp = 1;
1689 	bool		is_leaf;
1690 	double		least_inc = DBL_MAX;
1691 	const rec_t*	best_rec;
1692 	const rec_t*	last_match_rec = NULL;
1693 	ulint		level;
1694 	bool		match_init = false;
1695 	ulint		space = block->page.id.space();
1696 	page_cur_mode_t	orig_mode = mode;
1697 	const rec_t*	first_rec = NULL;
1698 
1699 	rec_offs_init(offsets_);
1700 
1701 	ut_ad(RTREE_SEARCH_MODE(mode));
1702 
1703 	ut_ad(dict_index_is_spatial(index));
1704 
1705 	page = buf_block_get_frame(block);
1706 
1707 	is_leaf = page_is_leaf(page);
1708 	level = btr_page_get_level(page, mtr);
1709 
1710 	if (mode == PAGE_CUR_RTREE_LOCATE) {
1711 		ut_ad(level != 0);
1712 		mode = PAGE_CUR_WITHIN;
1713 	}
1714 
1715 	rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0));
1716 
1717 	last_rec = rec;
1718 	best_rec = rec;
1719 
1720 	if (page_rec_is_infimum(rec)) {
1721 		rec = page_rec_get_next_const(rec);
1722 	}
1723 
1724 	/* Check insert tuple size is larger than first rec, and try to
1725 	avoid it if possible */
1726 	if (mode == PAGE_CUR_RTREE_INSERT && !page_rec_is_supremum(rec)) {
1727 
1728 		ulint	new_rec_size = rec_get_converted_size(index, tuple, 0);
1729 
1730 		offsets = rec_get_offsets(rec, index, offsets,
1731 					  dtuple_get_n_fields_cmp(tuple),
1732 					  &heap);
1733 
1734 		if (rec_offs_size(offsets) < new_rec_size) {
1735 			first_rec = rec;
1736 		}
1737 
1738 		/* If this is the left-most page of this index level
1739 		and the table is a compressed table, try to avoid
1740 		first page as much as possible, as there will be problem
1741 		when update MIN_REC rec in compress table */
1742 		if (buf_block_get_page_zip(block)
1743 		    && mach_read_from_4(page + FIL_PAGE_PREV) == FIL_NULL
1744 		    && page_get_n_recs(page) >= 2) {
1745 
1746 			rec = page_rec_get_next_const(rec);
1747 		}
1748 	}
1749 
1750 	while (!page_rec_is_supremum(rec)) {
1751 		offsets = rec_get_offsets(rec, index, offsets,
1752 					  dtuple_get_n_fields_cmp(tuple),
1753 					  &heap);
1754 		if (!is_leaf) {
1755 			switch (mode) {
1756 			case PAGE_CUR_CONTAIN:
1757 			case PAGE_CUR_INTERSECT:
1758 			case PAGE_CUR_MBR_EQUAL:
1759 				/* At non-leaf level, we will need to check
1760 				both CONTAIN and INTERSECT for either of
1761 				the search mode */
1762 				cmp = cmp_dtuple_rec_with_gis(
1763 					tuple, rec, offsets, PAGE_CUR_CONTAIN);
1764 
1765 				if (cmp != 0) {
1766 					cmp = cmp_dtuple_rec_with_gis(
1767 						tuple, rec, offsets,
1768 						PAGE_CUR_INTERSECT);
1769 				}
1770 				break;
1771 			case PAGE_CUR_DISJOINT:
1772 				cmp = cmp_dtuple_rec_with_gis(
1773 					tuple, rec, offsets, mode);
1774 
1775 				if (cmp != 0) {
1776 					cmp = cmp_dtuple_rec_with_gis(
1777 						tuple, rec, offsets,
1778 						PAGE_CUR_INTERSECT);
1779 				}
1780 				break;
1781 			case PAGE_CUR_RTREE_INSERT:
1782 				double	increase;
1783 				double	area;
1784 
1785 				cmp = cmp_dtuple_rec_with_gis(
1786 					tuple, rec, offsets, PAGE_CUR_WITHIN);
1787 
1788 				if (cmp != 0) {
1789 					increase = rtr_rec_cal_increase(
1790 						tuple, rec, offsets, &area);
1791 					/* Once it goes beyond DBL_MAX,
1792 					it would not make sense to record
1793 					such value, just make it
1794 					DBL_MAX / 2  */
1795 					if (increase >= DBL_MAX) {
1796 						increase = DBL_MAX / 2;
1797 					}
1798 
1799 					if (increase < least_inc) {
1800 						least_inc = increase;
1801 						best_rec = rec;
1802 					} else if (best_rec
1803 						   && best_rec == first_rec) {
1804 						/* if first_rec is set,
1805 						we will try to avoid it */
1806 						least_inc = increase;
1807 						best_rec = rec;
1808 					}
1809 				}
1810 				break;
1811 			case PAGE_CUR_RTREE_GET_FATHER:
1812 				cmp = cmp_dtuple_rec_with_gis_internal(
1813 					tuple, rec, offsets);
1814 				break;
1815 			default:
1816 				/* WITHIN etc. */
1817 				cmp = cmp_dtuple_rec_with_gis(
1818 					tuple, rec, offsets, mode);
1819 			}
1820 		} else {
1821 			/* At leaf level, INSERT should translate to LE */
1822 			ut_ad(mode != PAGE_CUR_RTREE_INSERT);
1823 
1824 			cmp = cmp_dtuple_rec_with_gis(
1825 				tuple, rec, offsets, mode);
1826 		}
1827 
1828 		if (cmp == 0) {
1829 			found = true;
1830 
1831 			/* If located, the matching node/rec will be pushed
1832 			to rtr_info->path for non-leaf nodes, or
1833 			rtr_info->matches for leaf nodes */
1834 			if (rtr_info && mode != PAGE_CUR_RTREE_INSERT) {
1835 				if (!is_leaf) {
1836 					ulint		page_no;
1837 					node_seq_t	new_seq;
1838 					bool		is_loc;
1839 
1840 					is_loc = (orig_mode
1841 						  == PAGE_CUR_RTREE_LOCATE
1842 						  || orig_mode
1843 						  == PAGE_CUR_RTREE_GET_FATHER);
1844 
1845 					offsets = rec_get_offsets(
1846 						rec, index, offsets,
1847 						ULINT_UNDEFINED, &heap);
1848 
1849 					page_no = btr_node_ptr_get_child_page_no(
1850 						rec, offsets);
1851 
1852 					ut_ad(level >= 1);
1853 
1854 					/* Get current SSN, before we insert
1855 					it into the path stack */
1856 					new_seq = rtr_get_current_ssn_id(index);
1857 
1858 					rtr_non_leaf_stack_push(
1859 						rtr_info->path,
1860 						page_no,
1861 						new_seq, level - 1, 0,
1862 						NULL, 0);
1863 
1864 					if (is_loc) {
1865 						rtr_non_leaf_insert_stack_push(
1866 							index,
1867 							rtr_info->parent_path,
1868 							level, page_no, block,
1869 							rec, 0);
1870 					}
1871 
1872 					if (!srv_read_only_mode
1873 					    && (rtr_info->need_page_lock
1874 						|| !is_loc)) {
1875 
1876 						/* Lock the page, preventing it
1877 						from being shrunk */
1878 						lock_place_prdt_page_lock(
1879 							space, page_no, index,
1880 							rtr_info->thr);
1881 					}
1882 				} else {
1883 					ut_ad(orig_mode
1884 					      != PAGE_CUR_RTREE_LOCATE);
1885 
1886 					if (!match_init) {
1887 						rtr_init_match(
1888 							rtr_info->matches,
1889 							block, page);
1890 						match_init = true;
1891 					}
1892 
1893 					/* Collect matched records on page */
1894 					offsets = rec_get_offsets(
1895 						rec, index, offsets,
1896 						ULINT_UNDEFINED, &heap);
1897 					rtr_leaf_push_match_rec(
1898 						rec, rtr_info, offsets,
1899 						page_is_comp(page));
1900 				}
1901 
1902 				last_match_rec = rec;
1903 			} else {
1904 				/* This is the insertion case, it will break
1905 				once it finds the first MBR that can accomodate
1906 				the inserting rec */
1907 				break;
1908 			}
1909 		}
1910 
1911 		last_rec = rec;
1912 
1913 		rec = page_rec_get_next_const(rec);
1914 	}
1915 
1916 	/* All records on page are searched */
1917 	if (page_rec_is_supremum(rec)) {
1918 		if (!is_leaf) {
1919 			if (!found) {
1920 				/* No match case, if it is for insertion,
1921 				then we select the record that result in
1922 				least increased area */
1923 				if (mode == PAGE_CUR_RTREE_INSERT) {
1924 					ulint	child_no;
1925 					ut_ad(least_inc < DBL_MAX);
1926 					offsets = rec_get_offsets(
1927 						best_rec, index,
1928 						offsets, ULINT_UNDEFINED,
1929 						&heap);
1930 					child_no =
1931 					btr_node_ptr_get_child_page_no(
1932 						best_rec, offsets);
1933 
1934 					rtr_non_leaf_insert_stack_push(
1935 						index, rtr_info->parent_path,
1936 						level, child_no, block,
1937 						best_rec, least_inc);
1938 
1939 					page_cur_position(best_rec, block,
1940 							  cursor);
1941 					rtr_info->mbr_adj = true;
1942 				} else {
1943 					/* Position at the last rec of the
1944 					page, if it is not the leaf page */
1945 					page_cur_position(last_rec, block,
1946 							  cursor);
1947 				}
1948 			} else {
1949 				/* There are matching records, position
1950 				in the last matching records */
1951 				if (rtr_info) {
1952 					rec = last_match_rec;
1953 					page_cur_position(
1954 						rec, block, cursor);
1955 				}
1956 			}
1957 		} else if (rtr_info) {
1958 			/* Leaf level, no match, position at the
1959 			last (supremum) rec */
1960 			if (!last_match_rec) {
1961 				page_cur_position(rec, block, cursor);
1962 				goto func_exit;
1963 			}
1964 
1965 			/* There are matched records */
1966 			matched_rec_t*	match_rec = rtr_info->matches;
1967 
1968 			rtr_rec_t	test_rec;
1969 
1970 			test_rec = match_rec->matched_recs->back();
1971 #ifdef UNIV_DEBUG
1972 			ulint		offsets_2[REC_OFFS_NORMAL_SIZE];
1973 			ulint*		offsets2	= offsets_2;
1974 			rec_offs_init(offsets_2);
1975 
1976 			ut_ad(found);
1977 
1978 			/* Verify the record to be positioned is the same
1979 			as the last record in matched_rec vector */
1980 			offsets2 = rec_get_offsets(test_rec.r_rec, index,
1981 						   offsets2, ULINT_UNDEFINED,
1982 						   &heap);
1983 
1984 			offsets = rec_get_offsets(last_match_rec, index,
1985 						  offsets, ULINT_UNDEFINED,
1986 						  &heap);
1987 
1988 			ut_ad(cmp_rec_rec(test_rec.r_rec, last_match_rec,
1989 					  offsets2, offsets, index, false) == 0);
1990 #endif /* UNIV_DEBUG */
1991 			/* Pop the last match record and position on it */
1992 			match_rec->matched_recs->pop_back();
1993 			page_cur_position(test_rec.r_rec, &match_rec->block,
1994 					  cursor);
1995 		}
1996 	} else {
1997 
1998 		if (mode == PAGE_CUR_RTREE_INSERT) {
1999 			ulint	child_no;
2000 			ut_ad(!last_match_rec && rec);
2001 
2002 			offsets = rec_get_offsets(
2003 				rec, index, offsets, ULINT_UNDEFINED, &heap);
2004 
2005 			child_no = btr_node_ptr_get_child_page_no(rec, offsets);
2006 
2007 			rtr_non_leaf_insert_stack_push(
2008 				index, rtr_info->parent_path, level, child_no,
2009 				block, rec, 0);
2010 
2011 		} else if (rtr_info && found && !is_leaf) {
2012 			rec = last_match_rec;
2013 		}
2014 
2015 		page_cur_position(rec, block, cursor);
2016 	}
2017 
2018 #ifdef UNIV_DEBUG
2019 	/* Verify that we are positioned at the same child page as pushed in
2020 	the path stack */
2021 	if (!is_leaf && (!page_rec_is_supremum(rec) || found)
2022 	    && mode != PAGE_CUR_RTREE_INSERT) {
2023 		ulint		page_no;
2024 
2025 		offsets = rec_get_offsets(rec, index, offsets,
2026 					  ULINT_UNDEFINED, &heap);
2027 		page_no = btr_node_ptr_get_child_page_no(rec, offsets);
2028 
2029 		if (rtr_info && found) {
2030 			rtr_node_path_t*	path = rtr_info->path;
2031 			node_visit_t		last_visit = path->back();
2032 
2033 			ut_ad(last_visit.page_no == page_no);
2034 		}
2035 	}
2036 #endif /* UNIV_DEBUG */
2037 
2038 func_exit:
2039 	if (UNIV_LIKELY_NULL(heap)) {
2040 		mem_heap_free(heap);
2041 	}
2042 
2043 	return(found);
2044 }
2045