1 /*****************************************************************************
2 
3 Copyright (c) 2016, 2018, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2017, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file gis/gis0sea.cc
22 InnoDB R-tree search interfaces
23 
24 Created 2014/01/16 Jimmy Yang
25 ***********************************************************************/
26 
27 #include "fsp0fsp.h"
28 #include "page0page.h"
29 #include "page0cur.h"
30 #include "page0zip.h"
31 #include "gis0rtree.h"
32 #include "btr0cur.h"
33 #include "btr0sea.h"
34 #include "btr0pcur.h"
35 #include "rem0cmp.h"
36 #include "lock0lock.h"
37 #include "ibuf0ibuf.h"
38 #include "trx0trx.h"
39 #include "srv0mon.h"
40 #include "que0que.h"
41 #include "gis0geo.h"
42 
43 /** Restore the stored position of a persistent cursor bufferfixing the page */
44 static
45 bool
46 rtr_cur_restore_position(
47 	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF, ... */
48 	btr_cur_t*	cursor,		/*!< in: detached persistent cursor */
49 	ulint		level,		/*!< in: index level */
50 	mtr_t*		mtr);		/*!< in: mtr */
51 
52 /*************************************************************//**
53 Pop out used parent path entry, until we find the parent with matching
54 page number */
55 static
56 void
rtr_adjust_parent_path(rtr_info_t * rtr_info,ulint page_no)57 rtr_adjust_parent_path(
58 /*===================*/
59 	rtr_info_t*	rtr_info,	/* R-Tree info struct */
60 	ulint		page_no)	/* page number to look for */
61 {
62 	while (!rtr_info->parent_path->empty()) {
63 		if (rtr_info->parent_path->back().child_no == page_no) {
64 			break;
65 		} else {
66 			if (rtr_info->parent_path->back().cursor) {
67 				btr_pcur_close(
68 					rtr_info->parent_path->back().cursor);
69 				ut_free(rtr_info->parent_path->back().cursor);
70 			}
71 
72 			rtr_info->parent_path->pop_back();
73 		}
74 	}
75 }
76 
77 /*************************************************************//**
78 Find the next matching record. This function is used by search
79 or record locating during index delete/update.
80 @return true if there is suitable record found, otherwise false */
81 static
82 bool
rtr_pcur_getnext_from_path(const dtuple_t * tuple,page_cur_mode_t mode,btr_cur_t * btr_cur,ulint target_level,ulint latch_mode,bool index_locked,mtr_t * mtr)83 rtr_pcur_getnext_from_path(
84 /*=======================*/
85 	const dtuple_t* tuple,	/*!< in: data tuple */
86 	page_cur_mode_t	mode,	/*!< in: cursor search mode */
87 	btr_cur_t*	btr_cur,/*!< in: persistent cursor; NOTE that the
88 				function may release the page latch */
89 	ulint		target_level,
90 				/*!< in: target level */
91 	ulint		latch_mode,
92 				/*!< in: latch_mode */
93 	bool		index_locked,
94 				/*!< in: index tree locked */
95 	mtr_t*		mtr)	/*!< in: mtr */
96 {
97 	dict_index_t*	index = btr_cur->index;
98 	bool		found = false;
99 	page_cur_t*	page_cursor;
100 	ulint		level = 0;
101 	node_visit_t	next_rec;
102 	rtr_info_t*	rtr_info = btr_cur->rtr_info;
103 	node_seq_t	page_ssn;
104 	ulint		my_latch_mode;
105 	ulint		skip_parent = false;
106 	bool		new_split = false;
107 	bool		need_parent;
108 	bool		for_delete = false;
109 	bool		for_undo_ins = false;
110 
111 	/* exhausted all the pages to be searched */
112 	if (rtr_info->path->empty()) {
113 		return(false);
114 	}
115 
116 	ut_ad(dtuple_get_n_fields_cmp(tuple));
117 
118 	my_latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
119 
120 	for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
121 	for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
122 
123 	/* There should be no insert coming to this function. Only
124 	mode with BTR_MODIFY_* should be delete */
125 	ut_ad(mode != PAGE_CUR_RTREE_INSERT);
126 	ut_ad(my_latch_mode == BTR_SEARCH_LEAF
127 	      || my_latch_mode == BTR_MODIFY_LEAF
128 	      || my_latch_mode == BTR_MODIFY_TREE
129 	      || my_latch_mode == BTR_CONT_MODIFY_TREE);
130 
131 	/* Whether need to track parent information. Only need so
132 	when we do tree altering operations (such as index page merge) */
133 	need_parent = ((my_latch_mode == BTR_MODIFY_TREE
134 		        || my_latch_mode == BTR_CONT_MODIFY_TREE)
135 		       && mode == PAGE_CUR_RTREE_LOCATE);
136 
137 	if (!index_locked) {
138 		ut_ad(latch_mode & BTR_SEARCH_LEAF
139 		      || latch_mode & BTR_MODIFY_LEAF);
140 		mtr_s_lock_index(index, mtr);
141 	} else {
142 		ut_ad(mtr->memo_contains_flagged(&index->lock,
143 						 MTR_MEMO_SX_LOCK
144 						 | MTR_MEMO_S_LOCK
145 						 | MTR_MEMO_X_LOCK));
146 	}
147 
148 	const ulint zip_size = index->table->space->zip_size();
149 
150 	/* Pop each node/page to be searched from "path" structure
151 	and do a search on it. Please note, any pages that are in
152 	the "path" structure are protected by "page" lock, so tey
153 	cannot be shrunk away */
154 	do {
155 		buf_block_t*	block;
156 		node_seq_t	path_ssn;
157 		const page_t*	page;
158 		ulint		rw_latch = RW_X_LATCH;
159 		ulint		tree_idx;
160 
161 		mutex_enter(&rtr_info->rtr_path_mutex);
162 		next_rec = rtr_info->path->back();
163 		rtr_info->path->pop_back();
164 		level = next_rec.level;
165 		path_ssn = next_rec.seq_no;
166 		tree_idx = btr_cur->tree_height - level - 1;
167 
168 		/* Maintain the parent path info as well, if needed */
169 		if (need_parent && !skip_parent && !new_split) {
170 			ulint		old_level;
171 			ulint		new_level;
172 
173 			ut_ad(!rtr_info->parent_path->empty());
174 
175 			/* Cleanup unused parent info */
176 			if (rtr_info->parent_path->back().cursor) {
177 				btr_pcur_close(
178 					rtr_info->parent_path->back().cursor);
179 				ut_free(rtr_info->parent_path->back().cursor);
180 			}
181 
182 			old_level = rtr_info->parent_path->back().level;
183 
184 			rtr_info->parent_path->pop_back();
185 
186 			ut_ad(!rtr_info->parent_path->empty());
187 
188 			/* check whether there is a level change. If so,
189 			the current parent path needs to pop enough
190 			nodes to adjust to the new search page */
191 			new_level = rtr_info->parent_path->back().level;
192 
193 			if (old_level < new_level) {
194 				rtr_adjust_parent_path(
195 					rtr_info, next_rec.page_no);
196 			}
197 
198 			ut_ad(!rtr_info->parent_path->empty());
199 
200 			ut_ad(next_rec.page_no
201 			      == rtr_info->parent_path->back().child_no);
202 		}
203 
204 		mutex_exit(&rtr_info->rtr_path_mutex);
205 
206 		skip_parent = false;
207 		new_split = false;
208 
209 		/* Once we have pages in "path", these pages are
210 		predicate page locked, so they can't be shrunk away.
211 		They also have SSN (split sequence number) to detect
212 		splits, so we can directly latch single page while
213 		getting them. They can be unlatched if not qualified.
214 		One reason for pre-latch is that we might need to position
215 		some parent position (requires latch) during search */
216 		if (level == 0) {
217 			/* S latched for SEARCH_LEAF, and X latched
218 			for MODIFY_LEAF */
219 			if (my_latch_mode <= BTR_MODIFY_LEAF) {
220 				rw_latch = my_latch_mode;
221 			}
222 
223 			if (my_latch_mode == BTR_CONT_MODIFY_TREE
224 			    || my_latch_mode == BTR_MODIFY_TREE) {
225 				rw_latch = RW_NO_LATCH;
226 			}
227 
228 		} else if (level == target_level) {
229 			rw_latch = RW_X_LATCH;
230 		}
231 
232 		/* Release previous locked blocks */
233 		if (my_latch_mode != BTR_SEARCH_LEAF) {
234 			for (ulint idx = 0; idx < btr_cur->tree_height;
235 			     idx++) {
236 				if (rtr_info->tree_blocks[idx]) {
237 					mtr_release_block_at_savepoint(
238 						mtr,
239 						rtr_info->tree_savepoints[idx],
240 						rtr_info->tree_blocks[idx]);
241 					rtr_info->tree_blocks[idx] = NULL;
242 				}
243 			}
244 			for (ulint idx = RTR_MAX_LEVELS; idx < RTR_MAX_LEVELS + 3;
245 			     idx++) {
246 				if (rtr_info->tree_blocks[idx]) {
247 					mtr_release_block_at_savepoint(
248 						mtr,
249 						rtr_info->tree_savepoints[idx],
250 						rtr_info->tree_blocks[idx]);
251 					rtr_info->tree_blocks[idx] = NULL;
252 				}
253 			}
254 		}
255 
256 		/* set up savepoint to record any locks to be taken */
257 		rtr_info->tree_savepoints[tree_idx] = mtr_set_savepoint(mtr);
258 
259 #ifdef UNIV_RTR_DEBUG
260 		ut_ad(!(rw_lock_own_flagged(&btr_cur->page_cur.block->lock,
261 					    RW_LOCK_FLAG_X | RW_LOCK_FLAG_S))
262 			|| my_latch_mode == BTR_MODIFY_TREE
263 			|| my_latch_mode == BTR_CONT_MODIFY_TREE
264 			|| !page_is_leaf(buf_block_get_frame(
265 					btr_cur->page_cur.block)));
266 #endif /* UNIV_RTR_DEBUG */
267 
268 		dberr_t err = DB_SUCCESS;
269 
270 		block = buf_page_get_gen(
271 			page_id_t(index->table->space_id,
272 				  next_rec.page_no), zip_size,
273 			rw_latch, NULL, BUF_GET, __FILE__, __LINE__, mtr, &err);
274 
275 		if (block == NULL) {
276 			continue;
277 		} else if (rw_latch != RW_NO_LATCH) {
278 			ut_ad(!dict_index_is_ibuf(index));
279 			buf_block_dbg_add_level(block, SYNC_TREE_NODE);
280 		}
281 
282 		rtr_info->tree_blocks[tree_idx] = block;
283 
284 		page = buf_block_get_frame(block);
285 		page_ssn = page_get_ssn_id(page);
286 
287 		/* If there are splits, push the splitted page.
288 		Note that we have SX lock on index->lock, there
289 		should not be any split/shrink happening here */
290 		if (page_ssn > path_ssn) {
291 			uint32_t next_page_no = btr_page_get_next(page);
292 			rtr_non_leaf_stack_push(
293 				rtr_info->path, next_page_no, path_ssn,
294 				level, 0, NULL, 0);
295 
296 			if (!srv_read_only_mode
297 			    && mode != PAGE_CUR_RTREE_INSERT
298 			    && mode != PAGE_CUR_RTREE_LOCATE) {
299 				ut_ad(rtr_info->thr);
300 				lock_place_prdt_page_lock(
301 					page_id_t(block->page.id().space(),
302 						  next_page_no),
303 					index,
304 					rtr_info->thr);
305 			}
306 			new_split = true;
307 #if defined(UNIV_GIS_DEBUG)
308 			fprintf(stderr,
309 				"GIS_DIAG: Splitted page found: %d, %ld\n",
310 				static_cast<int>(need_parent), next_page_no);
311 #endif
312 		}
313 
314 		page_cursor = btr_cur_get_page_cur(btr_cur);
315 		page_cursor->rec = NULL;
316 
317 		if (mode == PAGE_CUR_RTREE_LOCATE) {
318 			if (level == target_level && level == 0) {
319 				ulint	low_match;
320 
321 				found = false;
322 
323 				low_match = page_cur_search(
324 					block, index, tuple,
325 					PAGE_CUR_LE,
326 					btr_cur_get_page_cur(btr_cur));
327 
328 				if (low_match == dtuple_get_n_fields_cmp(
329 							tuple)) {
330 					rec_t*	rec = btr_cur_get_rec(btr_cur);
331 
332 					if (!rec_get_deleted_flag(rec,
333 					    dict_table_is_comp(index->table))
334 					    || (!for_delete && !for_undo_ins)) {
335 						found = true;
336 						btr_cur->low_match = low_match;
337 					} else {
338 						/* mark we found deleted row */
339 						btr_cur->rtr_info->fd_del
340 							= true;
341 					}
342 				}
343 			} else {
344 				page_cur_mode_t	page_mode = mode;
345 
346 				if (level == target_level
347 				    && target_level != 0) {
348 					page_mode = PAGE_CUR_RTREE_GET_FATHER;
349 				}
350 				found = rtr_cur_search_with_match(
351 					block, index, tuple, page_mode,
352 					page_cursor, btr_cur->rtr_info);
353 
354 				/* Save the position of parent if needed */
355 				if (found && need_parent) {
356 					btr_pcur_t*     r_cursor =
357 						rtr_get_parent_cursor(
358 							btr_cur, level, false);
359 
360 					rec_t*	rec = page_cur_get_rec(
361 						page_cursor);
362 					page_cur_position(
363 						rec, block,
364 						btr_pcur_get_page_cur(r_cursor));
365 					r_cursor->pos_state =
366 						 BTR_PCUR_IS_POSITIONED;
367 					r_cursor->latch_mode = my_latch_mode;
368 					btr_pcur_store_position(r_cursor, mtr);
369 #ifdef UNIV_DEBUG
370 					ulint num_stored =
371 						rtr_store_parent_path(
372 							block, btr_cur,
373 							rw_latch, level, mtr);
374 					ut_ad(num_stored > 0);
375 #else
376 					rtr_store_parent_path(
377 						block, btr_cur, rw_latch,
378 						level, mtr);
379 #endif /* UNIV_DEBUG */
380 				}
381 			}
382 		} else {
383 			found = rtr_cur_search_with_match(
384 				block, index, tuple, mode, page_cursor,
385 				btr_cur->rtr_info);
386 		}
387 
388 		/* Attach predicate lock if needed, no matter whether
389 		there are matched records */
390 		if (mode != PAGE_CUR_RTREE_INSERT
391 		    && mode != PAGE_CUR_RTREE_LOCATE
392 		    && mode >= PAGE_CUR_CONTAIN
393 		    && btr_cur->rtr_info->need_prdt_lock) {
394 			lock_prdt_t	prdt;
395 
396 			trx_t*		trx = thr_get_trx(
397 						btr_cur->rtr_info->thr);
398 			lock_mutex_enter();
399 			lock_init_prdt_from_mbr(
400 				&prdt, &btr_cur->rtr_info->mbr,
401 				mode, trx->lock.lock_heap);
402 			lock_mutex_exit();
403 
404 			if (rw_latch == RW_NO_LATCH) {
405 				rw_lock_s_lock(&(block->lock));
406 			}
407 
408 			lock_prdt_lock(block, &prdt, index, LOCK_S,
409 				       LOCK_PREDICATE, btr_cur->rtr_info->thr);
410 
411 			if (rw_latch == RW_NO_LATCH) {
412 				rw_lock_s_unlock(&(block->lock));
413 			}
414 		}
415 
416 		if (found) {
417 			if (level == target_level) {
418 				page_cur_t*	r_cur;;
419 
420 				if (my_latch_mode == BTR_MODIFY_TREE
421 				    && level == 0) {
422 					ut_ad(rw_latch == RW_NO_LATCH);
423 
424 					btr_cur_latch_leaves(
425 						block,
426 						BTR_MODIFY_TREE,
427 						btr_cur, mtr);
428 				}
429 
430 				r_cur = btr_cur_get_page_cur(btr_cur);
431 
432 				page_cur_position(
433 					page_cur_get_rec(page_cursor),
434 					page_cur_get_block(page_cursor),
435 					r_cur);
436 
437 				btr_cur->low_match = level != 0 ?
438 					DICT_INDEX_SPATIAL_NODEPTR_SIZE + 1
439 					: btr_cur->low_match;
440 				break;
441 			}
442 
443 			/* Keep the parent path node, which points to
444 			last node just located */
445 			skip_parent = true;
446 		} else {
447 			/* Release latch on the current page */
448 			ut_ad(rtr_info->tree_blocks[tree_idx]);
449 
450 			mtr_release_block_at_savepoint(
451 				mtr, rtr_info->tree_savepoints[tree_idx],
452 				rtr_info->tree_blocks[tree_idx]);
453 			rtr_info->tree_blocks[tree_idx] = NULL;
454 		}
455 
456 	} while (!rtr_info->path->empty());
457 
458 	const rec_t* rec = btr_cur_get_rec(btr_cur);
459 
460 	if (page_rec_is_infimum(rec) || page_rec_is_supremum(rec)) {
461 		mtr_commit(mtr);
462 		mtr_start(mtr);
463 	} else if (!index_locked) {
464 		mtr_memo_release(mtr, dict_index_get_lock(index),
465 				 MTR_MEMO_X_LOCK);
466 	}
467 
468 	return(found);
469 }
470 
471 /*************************************************************//**
472 Find the next matching record. This function will first exhaust
473 the copied record listed in the rtr_info->matches vector before
474 moving to the next page
475 @return true if there is suitable record found, otherwise false */
476 bool
rtr_pcur_move_to_next(const dtuple_t * tuple,page_cur_mode_t mode,btr_pcur_t * cursor,ulint level,mtr_t * mtr)477 rtr_pcur_move_to_next(
478 /*==================*/
479 	const dtuple_t*	tuple,	/*!< in: data tuple; NOTE: n_fields_cmp in
480 				tuple must be set so that it cannot get
481 				compared to the node ptr page number field! */
482 	page_cur_mode_t	mode,	/*!< in: cursor search mode */
483 	btr_pcur_t*	cursor,	/*!< in: persistent cursor; NOTE that the
484 				function may release the page latch */
485 	ulint		level,	/*!< in: target level */
486 	mtr_t*		mtr)	/*!< in: mtr */
487 {
488 	rtr_info_t*	rtr_info = cursor->btr_cur.rtr_info;
489 
490 	ut_a(cursor->pos_state == BTR_PCUR_IS_POSITIONED);
491 
492 	mutex_enter(&rtr_info->matches->rtr_match_mutex);
493 	/* First retrieve the next record on the current page */
494 	if (!rtr_info->matches->matched_recs->empty()) {
495 		rtr_rec_t	rec;
496 		rec = rtr_info->matches->matched_recs->back();
497 		rtr_info->matches->matched_recs->pop_back();
498 		mutex_exit(&rtr_info->matches->rtr_match_mutex);
499 
500 		cursor->btr_cur.page_cur.rec = rec.r_rec;
501 		cursor->btr_cur.page_cur.block = &rtr_info->matches->block;
502 
503 		DEBUG_SYNC_C("rtr_pcur_move_to_next_return");
504 		return(true);
505 	}
506 
507 	mutex_exit(&rtr_info->matches->rtr_match_mutex);
508 
509 	/* Fetch the next page */
510 	return(rtr_pcur_getnext_from_path(tuple, mode, &cursor->btr_cur,
511 					 level, cursor->latch_mode,
512 					 false, mtr));
513 }
514 
515 /*************************************************************//**
516 Check if the cursor holds record pointing to the specified child page
517 @return	true if it is (pointing to the child page) false otherwise */
518 static
519 bool
rtr_compare_cursor_rec(dict_index_t * index,btr_cur_t * cursor,ulint page_no,mem_heap_t ** heap)520 rtr_compare_cursor_rec(
521 /*===================*/
522 	dict_index_t*	index,		/*!< in: index */
523 	btr_cur_t*	cursor,		/*!< in: Cursor to check */
524 	ulint		page_no,	/*!< in: desired child page number */
525 	mem_heap_t**	heap)		/*!< in: memory heap */
526 {
527 	const rec_t*	rec;
528 	rec_offs*	offsets;
529 
530 	rec = btr_cur_get_rec(cursor);
531 
532 	offsets = rec_get_offsets(rec, index, NULL, 0, ULINT_UNDEFINED, heap);
533 
534 	return(btr_node_ptr_get_child_page_no(rec, offsets) == page_no);
535 }
536 
537 /**************************************************************//**
538 Initializes and opens a persistent cursor to an index tree. It should be
539 closed with btr_pcur_close. Mainly called by row_search_index_entry() */
540 void
rtr_pcur_open_low(dict_index_t * index,ulint level,const dtuple_t * tuple,page_cur_mode_t mode,ulint latch_mode,btr_pcur_t * cursor,const char * file,unsigned line,mtr_t * mtr)541 rtr_pcur_open_low(
542 /*==============*/
543 	dict_index_t*	index,	/*!< in: index */
544 	ulint		level,	/*!< in: level in the rtree */
545 	const dtuple_t*	tuple,	/*!< in: tuple on which search done */
546 	page_cur_mode_t	mode,	/*!< in: PAGE_CUR_RTREE_LOCATE, ... */
547 	ulint		latch_mode,/*!< in: BTR_SEARCH_LEAF, ... */
548 	btr_pcur_t*	cursor, /*!< in: memory buffer for persistent cursor */
549 	const char*	file,	/*!< in: file name */
550 	unsigned	line,	/*!< in: line where called */
551 	mtr_t*		mtr)	/*!< in: mtr */
552 {
553 	btr_cur_t*	btr_cursor;
554 	ulint		n_fields;
555 	ulint		low_match;
556 	rec_t*		rec;
557 	bool		tree_latched = false;
558 	bool		for_delete = false;
559 	bool		for_undo_ins = false;
560 
561 	ut_ad(level == 0);
562 
563 	ut_ad(latch_mode & BTR_MODIFY_LEAF || latch_mode & BTR_MODIFY_TREE);
564 	ut_ad(mode == PAGE_CUR_RTREE_LOCATE);
565 
566 	/* Initialize the cursor */
567 
568 	btr_pcur_init(cursor);
569 
570 	for_delete = latch_mode & BTR_RTREE_DELETE_MARK;
571 	for_undo_ins = latch_mode & BTR_RTREE_UNDO_INS;
572 
573 	cursor->latch_mode = BTR_LATCH_MODE_WITHOUT_FLAGS(latch_mode);
574 	cursor->search_mode = mode;
575 
576 	/* Search with the tree cursor */
577 
578 	btr_cursor = btr_pcur_get_btr_cur(cursor);
579 
580 	btr_cursor->rtr_info = rtr_create_rtr_info(false, false,
581 						   btr_cursor, index);
582 
583 	/* Purge will SX lock the tree instead of take Page Locks */
584 	if (btr_cursor->thr) {
585 		btr_cursor->rtr_info->need_page_lock = true;
586 		btr_cursor->rtr_info->thr = btr_cursor->thr;
587 	}
588 
589 	btr_cur_search_to_nth_level(index, level, tuple, mode, latch_mode,
590 				    btr_cursor, 0, file, line, mtr);
591 	cursor->pos_state = BTR_PCUR_IS_POSITIONED;
592 
593 	cursor->trx_if_known = NULL;
594 
595 	low_match = btr_pcur_get_low_match(cursor);
596 
597 	rec = btr_pcur_get_rec(cursor);
598 
599 	n_fields = dtuple_get_n_fields(tuple);
600 
601 	if (latch_mode & BTR_ALREADY_S_LATCHED) {
602 		ut_ad(mtr->memo_contains(index->lock, MTR_MEMO_S_LOCK));
603 		tree_latched = true;
604 	}
605 
606 	if (latch_mode & BTR_MODIFY_TREE) {
607 		ut_ad(mtr->memo_contains_flagged(&index->lock,
608 						 MTR_MEMO_X_LOCK
609 						 | MTR_MEMO_SX_LOCK));
610 		tree_latched = true;
611 	}
612 
613 	if (page_rec_is_infimum(rec) || low_match != n_fields
614 	    || (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
615 		&& (for_delete || for_undo_ins))) {
616 
617 		if (rec_get_deleted_flag(rec, dict_table_is_comp(index->table))
618 		    && for_delete) {
619 			btr_cursor->rtr_info->fd_del = true;
620 			btr_cursor->low_match = 0;
621 		}
622 		/* Did not find matched row in first dive. Release
623 		latched block if any before search more pages */
624 		if (latch_mode & BTR_MODIFY_LEAF) {
625 			ulint		tree_idx = btr_cursor->tree_height - 1;
626 			rtr_info_t*	rtr_info = btr_cursor->rtr_info;
627 
628 			ut_ad(level == 0);
629 
630 			if (rtr_info->tree_blocks[tree_idx]) {
631 				mtr_release_block_at_savepoint(
632 					mtr,
633 					rtr_info->tree_savepoints[tree_idx],
634 					rtr_info->tree_blocks[tree_idx]);
635 				rtr_info->tree_blocks[tree_idx] = NULL;
636 			}
637 		}
638 
639 		bool	ret = rtr_pcur_getnext_from_path(
640 			tuple, mode, btr_cursor, level, latch_mode,
641 			tree_latched, mtr);
642 
643 		if (ret) {
644 			low_match = btr_pcur_get_low_match(cursor);
645 			ut_ad(low_match == n_fields);
646 		}
647 	}
648 }
649 
650 /* Get the rtree page father.
651 @param[in]	index		rtree index
652 @param[in]	block		child page in the index
653 @param[in]	mtr		mtr
654 @param[in]	sea_cur		search cursor, contains information
655 				about parent nodes in search
656 @param[in]	cursor		cursor on node pointer record,
657 				its page x-latched */
658 void
rtr_page_get_father(dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)659 rtr_page_get_father(
660 	dict_index_t*	index,
661 	buf_block_t*	block,
662 	mtr_t*		mtr,
663 	btr_cur_t*	sea_cur,
664 	btr_cur_t*	cursor)
665 {
666 	mem_heap_t*	heap = mem_heap_create(100);
667 #ifdef UNIV_DEBUG
668 	rec_offs*	offsets;
669 
670 	offsets = rtr_page_get_father_block(
671 		NULL, heap, index, block, mtr, sea_cur, cursor);
672 
673 	ulint	page_no = btr_node_ptr_get_child_page_no(cursor->page_cur.rec,
674 							 offsets);
675 
676 	ut_ad(page_no == block->page.id().page_no());
677 #else
678 	rtr_page_get_father_block(
679 		NULL, heap, index, block, mtr, sea_cur, cursor);
680 #endif
681 
682 	mem_heap_free(heap);
683 }
684 
685 /********************************************************************//**
686 Returns the upper level node pointer to a R-Tree page. It is assumed
687 that mtr holds an x-latch on the tree. */
rtr_get_father_node(dict_index_t * index,ulint level,const dtuple_t * tuple,btr_cur_t * sea_cur,btr_cur_t * btr_cur,ulint page_no,mtr_t * mtr)688 static void rtr_get_father_node(
689 	dict_index_t*	index,	/*!< in: index */
690 	ulint		level,	/*!< in: the tree level of search */
691 	const dtuple_t*	tuple,	/*!< in: data tuple; NOTE: n_fields_cmp in
692 				tuple must be set so that it cannot get
693 				compared to the node ptr page number field! */
694 	btr_cur_t*	sea_cur,/*!< in: search cursor */
695 	btr_cur_t*	btr_cur,/*!< in/out: tree cursor; the cursor page is
696 				s- or x-latched, but see also above! */
697 	ulint		page_no,/*!< Current page no */
698 	mtr_t*		mtr)	/*!< in: mtr */
699 {
700 	mem_heap_t*	heap = NULL;
701 	bool		ret = false;
702 	const rec_t*	rec;
703 	ulint		n_fields;
704 	bool		new_rtr = false;
705 
706 	/* Try to optimally locate the parent node. Level should always
707 	less than sea_cur->tree_height unless the root is splitting */
708 	if (sea_cur && sea_cur->tree_height > level) {
709 		ut_ad(mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
710 						 | MTR_MEMO_SX_LOCK));
711 		ret = rtr_cur_restore_position(
712 			BTR_CONT_MODIFY_TREE, sea_cur, level, mtr);
713 
714 		/* Once we block shrink tree nodes while there are
715 		active search on it, this optimal locating should always
716 		succeeds */
717 		ut_ad(ret);
718 
719 		if (ret) {
720 			btr_pcur_t*	r_cursor = rtr_get_parent_cursor(
721 						sea_cur, level, false);
722 
723 			rec = btr_pcur_get_rec(r_cursor);
724 
725 			ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
726 			page_cur_position(rec,
727 					  btr_pcur_get_block(r_cursor),
728 					  btr_cur_get_page_cur(btr_cur));
729 			btr_cur->rtr_info = sea_cur->rtr_info;
730 			btr_cur->tree_height = sea_cur->tree_height;
731 			ut_ad(rtr_compare_cursor_rec(
732 				index, btr_cur, page_no, &heap));
733 			goto func_exit;
734 		}
735 	}
736 
737 	/* We arrive here in one of two scenario
738 	1) check table and btr_valide
739 	2) index root page being raised */
740 	ut_ad(!sea_cur || sea_cur->tree_height == level);
741 
742 	if (btr_cur->rtr_info) {
743 		rtr_clean_rtr_info(btr_cur->rtr_info, true);
744 	} else {
745 		new_rtr = true;
746 	}
747 
748 	btr_cur->rtr_info = rtr_create_rtr_info(false, false, btr_cur, index);
749 
750 	if (sea_cur && sea_cur->tree_height == level) {
751 		/* root split, and search the new root */
752 		btr_cur_search_to_nth_level(
753 			index, level, tuple, PAGE_CUR_RTREE_LOCATE,
754 			BTR_CONT_MODIFY_TREE, btr_cur, 0,
755 			__FILE__, __LINE__, mtr);
756 
757 	} else {
758 		/* btr_validate */
759 		ut_ad(level >= 1);
760 		ut_ad(!sea_cur);
761 
762 		btr_cur_search_to_nth_level(
763 			index, level, tuple, PAGE_CUR_RTREE_LOCATE,
764 			BTR_CONT_MODIFY_TREE, btr_cur, 0,
765 			__FILE__, __LINE__, mtr);
766 
767 		rec = btr_cur_get_rec(btr_cur);
768 		n_fields = dtuple_get_n_fields_cmp(tuple);
769 
770 		if (page_rec_is_infimum(rec)
771 		    || (btr_cur->low_match != n_fields)) {
772 			ret = rtr_pcur_getnext_from_path(
773 				tuple, PAGE_CUR_RTREE_LOCATE, btr_cur,
774 				level, BTR_CONT_MODIFY_TREE,
775 				true, mtr);
776 
777 			ut_ad(ret && btr_cur->low_match == n_fields);
778 		}
779 	}
780 
781 	ret = rtr_compare_cursor_rec(
782 		index, btr_cur, page_no, &heap);
783 
784 	ut_ad(ret);
785 
786 func_exit:
787 	if (heap) {
788 		mem_heap_free(heap);
789 	}
790 
791 	if (new_rtr && btr_cur->rtr_info) {
792 		rtr_clean_rtr_info(btr_cur->rtr_info, true);
793 		btr_cur->rtr_info = NULL;
794 	}
795 }
796 
797 /** Returns the upper level node pointer to a R-Tree page. It is assumed
798 that mtr holds an SX-latch or X-latch on the tree.
799 @return	rec_get_offsets() of the node pointer record */
800 static
801 rec_offs*
rtr_page_get_father_node_ptr(rec_offs * offsets,mem_heap_t * heap,btr_cur_t * sea_cur,btr_cur_t * cursor,mtr_t * mtr)802 rtr_page_get_father_node_ptr(
803 	rec_offs*	offsets,/*!< in: work area for the return value */
804 	mem_heap_t*	heap,	/*!< in: memory heap to use */
805 	btr_cur_t*	sea_cur,/*!< in: search cursor */
806 	btr_cur_t*	cursor,	/*!< in: cursor pointing to user record,
807 				out: cursor on node pointer record,
808 				its page x-latched */
809 	mtr_t*		mtr)	/*!< in: mtr */
810 {
811 	dtuple_t*	tuple;
812 	rec_t*		user_rec;
813 	rec_t*		node_ptr;
814 	ulint		level;
815 	ulint		page_no;
816 	dict_index_t*	index;
817 	rtr_mbr_t	mbr;
818 
819 	page_no = btr_cur_get_block(cursor)->page.id().page_no();
820 	index = btr_cur_get_index(cursor);
821 
822 	ut_ad(srv_read_only_mode
823 	      || mtr->memo_contains_flagged(&index->lock, MTR_MEMO_X_LOCK
824 					    | MTR_MEMO_SX_LOCK));
825 
826 	ut_ad(dict_index_get_page(index) != page_no);
827 
828 	level = btr_page_get_level(btr_cur_get_page(cursor));
829 
830 	user_rec = btr_cur_get_rec(cursor);
831 	ut_a(page_rec_is_user_rec(user_rec));
832 
833 	offsets = rec_get_offsets(user_rec, index, offsets,
834 				  level ? 0 : index->n_fields,
835 				  ULINT_UNDEFINED, &heap);
836 	rtr_get_mbr_from_rec(user_rec, offsets, &mbr);
837 
838 	tuple = rtr_index_build_node_ptr(
839 		index, &mbr, user_rec, page_no, heap);
840 
841 	if (sea_cur && !sea_cur->rtr_info) {
842 		sea_cur = NULL;
843 	}
844 
845 	rtr_get_father_node(index, level + 1, tuple, sea_cur, cursor,
846 			    page_no, mtr);
847 
848 	node_ptr = btr_cur_get_rec(cursor);
849 	ut_ad(!page_rec_is_comp(node_ptr)
850 	      || rec_get_status(node_ptr) == REC_STATUS_NODE_PTR);
851 	offsets = rec_get_offsets(node_ptr, index, offsets, 0,
852 				  ULINT_UNDEFINED, &heap);
853 
854 	ulint	child_page = btr_node_ptr_get_child_page_no(node_ptr, offsets);
855 
856 	if (child_page != page_no) {
857 		const rec_t*	print_rec;
858 
859 		ib::fatal	error;
860 
861 		error << "Corruption of index " << index->name
862 			<< " of table " << index->table->name
863 			<< " parent page " << page_no
864 			<< " child page " << child_page;
865 
866 		print_rec = page_rec_get_next(
867 			page_get_infimum_rec(page_align(user_rec)));
868 		offsets = rec_get_offsets(print_rec, index, offsets,
869 					  page_rec_is_leaf(user_rec)
870 					  ? index->n_fields : 0,
871 					  ULINT_UNDEFINED, &heap);
872 		error << "; child ";
873 		rec_print(error.m_oss, print_rec,
874 			  rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
875 			  offsets);
876 		offsets = rec_get_offsets(node_ptr, index, offsets, 0,
877 					  ULINT_UNDEFINED, &heap);
878 		error << "; parent ";
879 		rec_print(error.m_oss, print_rec,
880 			  rec_get_info_bits(print_rec, rec_offs_comp(offsets)),
881 			  offsets);
882 
883 		error << ". You should dump + drop + reimport the table to"
884 			" fix the corruption. If the crash happens at"
885 			" database startup, see "
886 			"https://mariadb.com/kb/en/library/innodb-recovery-modes/"
887 			" about forcing"
888 			" recovery. Then dump + drop + reimport.";
889 	}
890 
891 	return(offsets);
892 }
893 
894 /************************************************************//**
895 Returns the father block to a page. It is assumed that mtr holds
896 an X or SX latch on the tree.
897 @return rec_get_offsets() of the node pointer record */
898 rec_offs*
rtr_page_get_father_block(rec_offs * offsets,mem_heap_t * heap,dict_index_t * index,buf_block_t * block,mtr_t * mtr,btr_cur_t * sea_cur,btr_cur_t * cursor)899 rtr_page_get_father_block(
900 /*======================*/
901 	rec_offs*	offsets,/*!< in: work area for the return value */
902 	mem_heap_t*	heap,	/*!< in: memory heap to use */
903 	dict_index_t*	index,	/*!< in: b-tree index */
904 	buf_block_t*	block,	/*!< in: child page in the index */
905 	mtr_t*		mtr,	/*!< in: mtr */
906 	btr_cur_t*	sea_cur,/*!< in: search cursor, contains information
907 				about parent nodes in search */
908 	btr_cur_t*	cursor)	/*!< out: cursor on node pointer record,
909 				its page x-latched */
910 {
911 	rec_t*  rec = page_rec_get_next(
912 		page_get_infimum_rec(buf_block_get_frame(block)));
913 	btr_cur_position(index, rec, block, cursor);
914 
915 	return(rtr_page_get_father_node_ptr(offsets, heap, sea_cur,
916 					    cursor, mtr));
917 }
918 
919 /*******************************************************************//**
920 Create a RTree search info structure */
921 rtr_info_t*
rtr_create_rtr_info(bool need_prdt,bool init_matches,btr_cur_t * cursor,dict_index_t * index)922 rtr_create_rtr_info(
923 /******************/
924 	bool		need_prdt,	/*!< in: Whether predicate lock
925 					is needed */
926 	bool		init_matches,	/*!< in: Whether to initiate the
927 					"matches" structure for collecting
928 					matched leaf records */
929 	btr_cur_t*	cursor,		/*!< in: tree search cursor */
930 	dict_index_t*	index)		/*!< in: index struct */
931 {
932 	rtr_info_t*	rtr_info;
933 
934 	index = index ? index : cursor->index;
935 	ut_ad(index);
936 
937 	rtr_info = static_cast<rtr_info_t*>(ut_zalloc_nokey(sizeof(*rtr_info)));
938 
939 	rtr_info->allocated = true;
940 	rtr_info->cursor = cursor;
941 	rtr_info->index = index;
942 
943 	if (init_matches) {
944 		rtr_info->heap = mem_heap_create(sizeof(*(rtr_info->matches)));
945 		rtr_info->matches = static_cast<matched_rec_t*>(
946 					mem_heap_zalloc(
947 						rtr_info->heap,
948 						sizeof(*rtr_info->matches)));
949 
950 		rtr_info->matches->matched_recs
951 			= UT_NEW_NOKEY(rtr_rec_vector());
952 
953 		rtr_info->matches->bufp = page_align(rtr_info->matches->rec_buf
954 						     + UNIV_PAGE_SIZE_MAX + 1);
955 		mutex_create(LATCH_ID_RTR_MATCH_MUTEX,
956 			     &rtr_info->matches->rtr_match_mutex);
957 		rw_lock_create(PFS_NOT_INSTRUMENTED,
958 			       &(rtr_info->matches->block.lock),
959 			      SYNC_LEVEL_VARYING);
960 	}
961 
962 	rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
963 	rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
964 	rtr_info->need_prdt_lock = need_prdt;
965 	mutex_create(LATCH_ID_RTR_PATH_MUTEX,
966 		     &rtr_info->rtr_path_mutex);
967 
968 	mutex_enter(&index->rtr_track->rtr_active_mutex);
969 	index->rtr_track->rtr_active.push_front(rtr_info);
970 	mutex_exit(&index->rtr_track->rtr_active_mutex);
971 	return(rtr_info);
972 }
973 
974 /*******************************************************************//**
975 Update a btr_cur_t with rtr_info */
976 void
rtr_info_update_btr(btr_cur_t * cursor,rtr_info_t * rtr_info)977 rtr_info_update_btr(
978 /******************/
979 	btr_cur_t*	cursor,		/*!< in/out: tree cursor */
980 	rtr_info_t*	rtr_info)	/*!< in: rtr_info to set to the
981 					cursor */
982 {
983 	ut_ad(rtr_info);
984 
985 	cursor->rtr_info = rtr_info;
986 }
987 
988 /*******************************************************************//**
989 Initialize a R-Tree Search structure */
990 void
rtr_init_rtr_info(rtr_info_t * rtr_info,bool need_prdt,btr_cur_t * cursor,dict_index_t * index,bool reinit)991 rtr_init_rtr_info(
992 /****************/
993 	rtr_info_t*	rtr_info,	/*!< in: rtr_info to set to the
994 					cursor */
995 	bool		need_prdt,	/*!< in: Whether predicate lock is
996 					needed */
997 	btr_cur_t*	cursor,		/*!< in: tree search cursor */
998 	dict_index_t*	index,		/*!< in: index structure */
999 	bool		reinit)		/*!< in: Whether this is a reinit */
1000 {
1001 	ut_ad(rtr_info);
1002 
1003 	if (!reinit) {
1004 		/* Reset all members. */
1005 		rtr_info->path = NULL;
1006 		rtr_info->parent_path = NULL;
1007 		rtr_info->matches = NULL;
1008 
1009 		mutex_create(LATCH_ID_RTR_PATH_MUTEX,
1010 			     &rtr_info->rtr_path_mutex);
1011 
1012 		memset(rtr_info->tree_blocks, 0x0,
1013 		       sizeof(rtr_info->tree_blocks));
1014 		memset(rtr_info->tree_savepoints, 0x0,
1015 		       sizeof(rtr_info->tree_savepoints));
1016 		rtr_info->mbr.xmin = 0.0;
1017 		rtr_info->mbr.xmax = 0.0;
1018 		rtr_info->mbr.ymin = 0.0;
1019 		rtr_info->mbr.ymax = 0.0;
1020 		rtr_info->thr = NULL;
1021 		rtr_info->heap = NULL;
1022 		rtr_info->cursor = NULL;
1023 		rtr_info->index = NULL;
1024 		rtr_info->need_prdt_lock = false;
1025 		rtr_info->need_page_lock = false;
1026 		rtr_info->allocated = false;
1027 		rtr_info->mbr_adj = false;
1028 		rtr_info->fd_del = false;
1029 		rtr_info->search_tuple = NULL;
1030 		rtr_info->search_mode = PAGE_CUR_UNSUPP;
1031 	}
1032 
1033 	ut_ad(!rtr_info->matches || rtr_info->matches->matched_recs->empty());
1034 
1035 	rtr_info->path = UT_NEW_NOKEY(rtr_node_path_t());
1036 	rtr_info->parent_path = UT_NEW_NOKEY(rtr_node_path_t());
1037 	rtr_info->need_prdt_lock = need_prdt;
1038 	rtr_info->cursor = cursor;
1039 	rtr_info->index = index;
1040 
1041 	mutex_enter(&index->rtr_track->rtr_active_mutex);
1042 	index->rtr_track->rtr_active.push_front(rtr_info);
1043 	mutex_exit(&index->rtr_track->rtr_active_mutex);
1044 }
1045 
1046 /**************************************************************//**
1047 Clean up R-Tree search structure */
1048 void
rtr_clean_rtr_info(rtr_info_t * rtr_info,bool free_all)1049 rtr_clean_rtr_info(
1050 /*===============*/
1051 	rtr_info_t*	rtr_info,	/*!< in: RTree search info */
1052 	bool		free_all)	/*!< in: need to free rtr_info itself */
1053 {
1054 	dict_index_t*	index;
1055 	bool		initialized = false;
1056 
1057 	if (!rtr_info) {
1058 		return;
1059 	}
1060 
1061 	index = rtr_info->index;
1062 
1063 	if (index) {
1064 		mutex_enter(&index->rtr_track->rtr_active_mutex);
1065 	}
1066 
1067 	while (rtr_info->parent_path && !rtr_info->parent_path->empty()) {
1068 		btr_pcur_t*	cur = rtr_info->parent_path->back().cursor;
1069 		rtr_info->parent_path->pop_back();
1070 
1071 		if (cur) {
1072 			btr_pcur_close(cur);
1073 			ut_free(cur);
1074 		}
1075 	}
1076 
1077 	UT_DELETE(rtr_info->parent_path);
1078 	rtr_info->parent_path = NULL;
1079 
1080 	if (rtr_info->path != NULL) {
1081 		UT_DELETE(rtr_info->path);
1082 		rtr_info->path = NULL;
1083 		initialized = true;
1084 	}
1085 
1086 	if (rtr_info->matches) {
1087 		rtr_info->matches->used = false;
1088 		rtr_info->matches->locked = false;
1089 		rtr_info->matches->valid = false;
1090 		rtr_info->matches->matched_recs->clear();
1091 	}
1092 
1093 	if (index) {
1094 		index->rtr_track->rtr_active.remove(rtr_info);
1095 		mutex_exit(&index->rtr_track->rtr_active_mutex);
1096 	}
1097 
1098 	if (free_all) {
1099 		if (rtr_info->matches) {
1100 			if (rtr_info->matches->matched_recs != NULL) {
1101 				UT_DELETE(rtr_info->matches->matched_recs);
1102 			}
1103 
1104 			rw_lock_free(&(rtr_info->matches->block.lock));
1105 
1106 			mutex_destroy(&rtr_info->matches->rtr_match_mutex);
1107 		}
1108 
1109 		if (rtr_info->heap) {
1110 			mem_heap_free(rtr_info->heap);
1111 		}
1112 
1113 		if (initialized) {
1114 			mutex_destroy(&rtr_info->rtr_path_mutex);
1115 		}
1116 
1117 		if (rtr_info->allocated) {
1118 			ut_free(rtr_info);
1119 		}
1120 	}
1121 }
1122 
1123 /**************************************************************//**
1124 Rebuilt the "path" to exclude the removing page no */
1125 static
1126 void
rtr_rebuild_path(rtr_info_t * rtr_info,ulint page_no)1127 rtr_rebuild_path(
1128 /*=============*/
1129 	rtr_info_t*	rtr_info,	/*!< in: RTree search info */
1130 	ulint		page_no)	/*!< in: need to free rtr_info itself */
1131 {
1132 	rtr_node_path_t*		new_path
1133 		= UT_NEW_NOKEY(rtr_node_path_t());
1134 
1135 	rtr_node_path_t::iterator	rit;
1136 #ifdef UNIV_DEBUG
1137 	ulint	before_size = rtr_info->path->size();
1138 #endif /* UNIV_DEBUG */
1139 
1140 	for (rit = rtr_info->path->begin();
1141 	     rit != rtr_info->path->end(); ++rit) {
1142 		node_visit_t	next_rec = *rit;
1143 
1144 		if (next_rec.page_no == page_no) {
1145 			continue;
1146 		}
1147 
1148 		new_path->push_back(next_rec);
1149 #ifdef UNIV_DEBUG
1150 		node_visit_t	rec = new_path->back();
1151 		ut_ad(rec.level < rtr_info->cursor->tree_height
1152 		      && rec.page_no > 0);
1153 #endif /* UNIV_DEBUG */
1154 	}
1155 
1156 	UT_DELETE(rtr_info->path);
1157 
1158 	ut_ad(new_path->size() == before_size - 1);
1159 
1160 	rtr_info->path = new_path;
1161 
1162 	if (!rtr_info->parent_path->empty()) {
1163 		rtr_node_path_t*	new_parent_path = UT_NEW_NOKEY(
1164 			rtr_node_path_t());
1165 
1166 		for (rit = rtr_info->parent_path->begin();
1167 		     rit != rtr_info->parent_path->end(); ++rit) {
1168 			node_visit_t	next_rec = *rit;
1169 
1170 			if (next_rec.child_no == page_no) {
1171 				btr_pcur_t*	cur = next_rec.cursor;
1172 
1173 				if (cur) {
1174 					btr_pcur_close(cur);
1175 					ut_free(cur);
1176 				}
1177 
1178 				continue;
1179 			}
1180 
1181 			new_parent_path->push_back(next_rec);
1182 		}
1183 		UT_DELETE(rtr_info->parent_path);
1184 		rtr_info->parent_path = new_parent_path;
1185 	}
1186 
1187 }
1188 
1189 /**************************************************************//**
1190 Check whether a discarding page is in anyone's search path */
1191 void
rtr_check_discard_page(dict_index_t * index,btr_cur_t * cursor,buf_block_t * block)1192 rtr_check_discard_page(
1193 /*===================*/
1194 	dict_index_t*	index,	/*!< in: index */
1195 	btr_cur_t*	cursor, /*!< in: cursor on the page to discard: not on
1196 				the root page */
1197 	buf_block_t*	block)	/*!< in: block of page to be discarded */
1198 {
1199 	const ulint pageno = block->page.id().page_no();
1200 
1201 	mutex_enter(&index->rtr_track->rtr_active_mutex);
1202 
1203 	for (const auto& rtr_info : index->rtr_track->rtr_active) {
1204 		if (cursor && rtr_info == cursor->rtr_info) {
1205 			continue;
1206 		}
1207 
1208 		mutex_enter(&rtr_info->rtr_path_mutex);
1209 		for (const node_visit_t& node : *rtr_info->path) {
1210 			if (node.page_no == pageno) {
1211 				rtr_rebuild_path(rtr_info, pageno);
1212 				break;
1213 			}
1214 		}
1215 		mutex_exit(&rtr_info->rtr_path_mutex);
1216 
1217 		if (rtr_info->matches) {
1218 			mutex_enter(&rtr_info->matches->rtr_match_mutex);
1219 
1220 			if ((&rtr_info->matches->block)->page.id().page_no()
1221 			     == pageno) {
1222 				if (!rtr_info->matches->matched_recs->empty()) {
1223 					rtr_info->matches->matched_recs->clear();
1224 				}
1225 				ut_ad(rtr_info->matches->matched_recs->empty());
1226 				rtr_info->matches->valid = false;
1227 			}
1228 
1229 			mutex_exit(&rtr_info->matches->rtr_match_mutex);
1230 		}
1231 	}
1232 
1233 	mutex_exit(&index->rtr_track->rtr_active_mutex);
1234 
1235 	lock_mutex_enter();
1236 	lock_prdt_page_free_from_discard(block, &lock_sys.prdt_hash);
1237 	lock_prdt_page_free_from_discard(block, &lock_sys.prdt_page_hash);
1238 	lock_mutex_exit();
1239 }
1240 
1241 /** Structure acts as functor to get the optimistic access of the page.
1242 It returns true if it successfully gets the page. */
1243 struct optimistic_get
1244 {
1245   btr_pcur_t *const r_cursor;
1246   mtr_t *const mtr;
1247 
optimistic_getoptimistic_get1248   optimistic_get(btr_pcur_t *r_cursor,mtr_t *mtr)
1249   :r_cursor(r_cursor), mtr(mtr) {}
1250 
operator ()optimistic_get1251   bool operator()(buf_block_t *hint) const
1252   {
1253     return hint && buf_page_optimistic_get(
1254        RW_X_LATCH, hint, r_cursor->modify_clock, __FILE__,
1255        __LINE__, mtr);
1256   }
1257 };
1258 
1259 /** Restore the stored position of a persistent cursor bufferfixing the page */
1260 static
1261 bool
rtr_cur_restore_position(ulint latch_mode,btr_cur_t * btr_cur,ulint level,mtr_t * mtr)1262 rtr_cur_restore_position(
1263 	ulint		latch_mode,	/*!< in: BTR_SEARCH_LEAF, ... */
1264 	btr_cur_t*	btr_cur,	/*!< in: detached persistent cursor */
1265 	ulint		level,		/*!< in: index level */
1266 	mtr_t*		mtr)		/*!< in: mtr */
1267 {
1268 	dict_index_t*	index;
1269 	mem_heap_t*	heap;
1270 	btr_pcur_t*	r_cursor = rtr_get_parent_cursor(btr_cur, level, false);
1271 	dtuple_t*	tuple;
1272 	bool		ret = false;
1273 
1274 	ut_ad(mtr);
1275 	ut_ad(r_cursor);
1276 	ut_ad(mtr->is_active());
1277 
1278 	index = btr_cur_get_index(btr_cur);
1279 
1280 	if (r_cursor->rel_pos == BTR_PCUR_AFTER_LAST_IN_TREE
1281 	    || r_cursor->rel_pos == BTR_PCUR_BEFORE_FIRST_IN_TREE) {
1282 		return(false);
1283 	}
1284 
1285 	DBUG_EXECUTE_IF(
1286 		"rtr_pessimistic_position",
1287 		r_cursor->modify_clock = 100;
1288 	);
1289 
1290 	ut_ad(latch_mode == BTR_CONT_MODIFY_TREE);
1291 
1292 	if (r_cursor->block_when_stored.run_with_hint(
1293 		optimistic_get(r_cursor, mtr))) {
1294 		ut_ad(r_cursor->pos_state == BTR_PCUR_IS_POSITIONED);
1295 
1296 		ut_ad(r_cursor->rel_pos == BTR_PCUR_ON);
1297 #ifdef UNIV_DEBUG
1298 		do {
1299 			const rec_t*	rec;
1300 			const rec_offs*	offsets1;
1301 			const rec_offs*	offsets2;
1302 			ulint		comp;
1303 
1304 			rec = btr_pcur_get_rec(r_cursor);
1305 
1306 			heap = mem_heap_create(256);
1307 			offsets1 = rec_get_offsets(
1308 				r_cursor->old_rec, index, NULL,
1309 				level ? 0 : r_cursor->old_n_fields,
1310 				r_cursor->old_n_fields, &heap);
1311 			offsets2 = rec_get_offsets(
1312 				rec, index, NULL,
1313 				level ? 0 : r_cursor->old_n_fields,
1314 				r_cursor->old_n_fields, &heap);
1315 
1316 			comp = rec_offs_comp(offsets1);
1317 
1318 			if (rec_get_info_bits(r_cursor->old_rec, comp)
1319 			    & REC_INFO_MIN_REC_FLAG) {
1320 				ut_ad(rec_get_info_bits(rec, comp)
1321 					& REC_INFO_MIN_REC_FLAG);
1322 			} else {
1323 
1324 				ut_ad(!cmp_rec_rec(r_cursor->old_rec,
1325 						   rec, offsets1, offsets2,
1326 						   index));
1327 			}
1328 
1329 			mem_heap_free(heap);
1330 		} while (0);
1331 #endif /* UNIV_DEBUG */
1332 
1333 		return(true);
1334 	}
1335 
1336 	/* Page has changed, for R-Tree, the page cannot be shrunk away,
1337 	so we search the page and its right siblings */
1338 	buf_block_t*	block;
1339 	node_seq_t	page_ssn;
1340 	const page_t*	page;
1341 	page_cur_t*	page_cursor;
1342 	node_visit_t*	node = rtr_get_parent_node(btr_cur, level, false);
1343 	node_seq_t	path_ssn = node->seq_no;
1344 	const unsigned	zip_size = index->table->space->zip_size();
1345 	uint32_t	page_no = node->page_no;
1346 
1347 	heap = mem_heap_create(256);
1348 
1349 	tuple = dict_index_build_data_tuple(r_cursor->old_rec, index, !level,
1350 					    r_cursor->old_n_fields, heap);
1351 
1352 	page_cursor = btr_pcur_get_page_cur(r_cursor);
1353 	ut_ad(r_cursor == node->cursor);
1354 
1355 search_again:
1356 	dberr_t err = DB_SUCCESS;
1357 
1358 	block = buf_page_get_gen(
1359 		page_id_t(index->table->space_id, page_no),
1360 		zip_size, RW_X_LATCH, NULL,
1361 		BUF_GET, __FILE__, __LINE__, mtr, &err);
1362 
1363 	ut_ad(block);
1364 
1365 	/* Get the page SSN */
1366 	page = buf_block_get_frame(block);
1367 	page_ssn = page_get_ssn_id(page);
1368 
1369 	ulint low_match = page_cur_search(
1370 				block, index, tuple, PAGE_CUR_LE, page_cursor);
1371 
1372 	if (low_match == r_cursor->old_n_fields) {
1373 		const rec_t*	rec;
1374 		const rec_offs*	offsets1;
1375 		const rec_offs*	offsets2;
1376 		ulint		comp;
1377 
1378 		rec = btr_pcur_get_rec(r_cursor);
1379 
1380 		offsets1 = rec_get_offsets(r_cursor->old_rec, index, NULL,
1381 					   level ? 0 : r_cursor->old_n_fields,
1382 					   r_cursor->old_n_fields, &heap);
1383 		offsets2 = rec_get_offsets(rec, index, NULL,
1384 					   level ? 0 : r_cursor->old_n_fields,
1385 					   r_cursor->old_n_fields, &heap);
1386 
1387 		comp = rec_offs_comp(offsets1);
1388 
1389 		if ((rec_get_info_bits(r_cursor->old_rec, comp)
1390 		     & REC_INFO_MIN_REC_FLAG)
1391 		    && (rec_get_info_bits(rec, comp) & REC_INFO_MIN_REC_FLAG)) {
1392 			r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1393 			ret = true;
1394 		} else if (!cmp_rec_rec(r_cursor->old_rec, rec, offsets1, offsets2,
1395 				 index)) {
1396 			r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1397 			ret = true;
1398 		}
1399 	}
1400 
1401 	/* Check the page SSN to see if it has been splitted, if so, search
1402 	the right page */
1403 	if (!ret && page_ssn > path_ssn) {
1404 		page_no = btr_page_get_next(page);
1405 		goto search_again;
1406 	}
1407 
1408 	mem_heap_free(heap);
1409 
1410 	return(ret);
1411 }
1412 
1413 /****************************************************************//**
1414 Copy the leaf level R-tree record, and push it to matched_rec in rtr_info */
1415 static
1416 void
rtr_leaf_push_match_rec(const rec_t * rec,rtr_info_t * rtr_info,rec_offs * offsets,bool is_comp)1417 rtr_leaf_push_match_rec(
1418 /*====================*/
1419 	const rec_t*	rec,		/*!< in: record to copy */
1420 	rtr_info_t*	rtr_info,	/*!< in/out: search stack */
1421 	rec_offs*	offsets,	/*!< in: offsets */
1422 	bool		is_comp)	/*!< in: is compact format */
1423 {
1424 	byte*		buf;
1425 	matched_rec_t*	match_rec = rtr_info->matches;
1426 	rec_t*		copy;
1427 	ulint		data_len;
1428 	rtr_rec_t	rtr_rec;
1429 
1430 	buf = match_rec->block.frame + match_rec->used;
1431 	ut_ad(page_rec_is_leaf(rec));
1432 
1433 	copy = rec_copy(buf, rec, offsets);
1434 
1435 	if (is_comp) {
1436 		rec_set_next_offs_new(copy, PAGE_NEW_SUPREMUM);
1437 	} else {
1438 		rec_set_next_offs_old(copy, PAGE_OLD_SUPREMUM);
1439 	}
1440 
1441 	rtr_rec.r_rec = copy;
1442 	rtr_rec.locked = false;
1443 
1444 	match_rec->matched_recs->push_back(rtr_rec);
1445 	match_rec->valid = true;
1446 
1447 	data_len = rec_offs_data_size(offsets) + rec_offs_extra_size(offsets);
1448 	match_rec->used += data_len;
1449 
1450 	ut_ad(match_rec->used < srv_page_size);
1451 }
1452 
1453 /**************************************************************//**
1454 Store the parent path cursor
1455 @return number of cursor stored */
1456 ulint
rtr_store_parent_path(const buf_block_t * block,btr_cur_t * btr_cur,ulint latch_mode,ulint level,mtr_t * mtr)1457 rtr_store_parent_path(
1458 /*==================*/
1459 	const buf_block_t*	block,	/*!< in: block of the page */
1460 	btr_cur_t*		btr_cur,/*!< in/out: persistent cursor */
1461 	ulint			latch_mode,
1462 					/*!< in: latch_mode */
1463 	ulint			level,	/*!< in: index level */
1464 	mtr_t*			mtr)	/*!< in: mtr */
1465 {
1466 	ulint	num = btr_cur->rtr_info->parent_path->size();
1467 	ulint	num_stored = 0;
1468 
1469 	while (num >= 1) {
1470 		node_visit_t*	node = &(*btr_cur->rtr_info->parent_path)[
1471 					num - 1];
1472 		btr_pcur_t*	r_cursor = node->cursor;
1473 		buf_block_t*	cur_block;
1474 
1475 		if (node->level > level) {
1476 			break;
1477 		}
1478 
1479 		r_cursor->pos_state = BTR_PCUR_IS_POSITIONED;
1480 		r_cursor->latch_mode = latch_mode;
1481 
1482 		cur_block = btr_pcur_get_block(r_cursor);
1483 
1484 		if (cur_block == block) {
1485 			btr_pcur_store_position(r_cursor, mtr);
1486 			num_stored++;
1487 		} else {
1488 			break;
1489 		}
1490 
1491 		num--;
1492 	}
1493 
1494 	return(num_stored);
1495 }
1496 /**************************************************************//**
1497 push a nonleaf index node to the search path for insertion */
1498 static
1499 void
rtr_non_leaf_insert_stack_push(dict_index_t * index,rtr_node_path_t * path,ulint level,uint32_t child_no,const buf_block_t * block,const rec_t * rec,double mbr_inc)1500 rtr_non_leaf_insert_stack_push(
1501 /*===========================*/
1502 	dict_index_t*		index,	/*!< in: index descriptor */
1503 	rtr_node_path_t*	path,	/*!< in/out: search path */
1504 	ulint			level,	/*!< in: index page level */
1505 	uint32_t		child_no,/*!< in: child page no */
1506 	const buf_block_t*	block,	/*!< in: block of the page */
1507 	const rec_t*		rec,	/*!< in: positioned record */
1508 	double			mbr_inc)/*!< in: MBR needs to be enlarged */
1509 {
1510 	node_seq_t	new_seq;
1511 	btr_pcur_t*	my_cursor;
1512 
1513 	my_cursor = static_cast<btr_pcur_t*>(
1514 		ut_malloc_nokey(sizeof(*my_cursor)));
1515 
1516 	btr_pcur_init(my_cursor);
1517 
1518 	page_cur_position(rec, block, btr_pcur_get_page_cur(my_cursor));
1519 
1520 	(btr_pcur_get_btr_cur(my_cursor))->index = index;
1521 
1522 	new_seq = rtr_get_current_ssn_id(index);
1523 	rtr_non_leaf_stack_push(path, block->page.id().page_no(),
1524 				new_seq, level, child_no, my_cursor, mbr_inc);
1525 }
1526 
1527 /** Copy a buf_block_t, except "block->lock".
1528 @param[in,out]	matches	copy to match->block
1529 @param[in]	block	block to copy */
1530 static
1531 void
rtr_copy_buf(matched_rec_t * matches,const buf_block_t * block)1532 rtr_copy_buf(
1533 	matched_rec_t*		matches,
1534 	const buf_block_t*	block)
1535 {
1536 	/* Copy all members of "block" to "matches->block" except "lock".
1537 	We skip "lock" because it is not used
1538 	from the dummy buf_block_t we create here and because memcpy()ing
1539 	it generates (valid) compiler warnings that the vtable pointer
1540 	will be copied. */
1541 	new (&matches->block.page) buf_page_t(block->page);
1542 	matches->block.frame = block->frame;
1543 	matches->block.unzip_LRU = block->unzip_LRU;
1544 
1545 	ut_d(matches->block.in_unzip_LRU_list = block->in_unzip_LRU_list);
1546 	ut_d(matches->block.in_withdraw_list = block->in_withdraw_list);
1547 
1548 	/* Skip buf_block_t::lock */
1549 	matches->block.modify_clock = block->modify_clock;
1550 #ifdef BTR_CUR_HASH_ADAPT
1551 	matches->block.n_hash_helps = block->n_hash_helps;
1552 	matches->block.n_fields = block->n_fields;
1553 	matches->block.left_side = block->left_side;
1554 #if defined UNIV_AHI_DEBUG || defined UNIV_DEBUG
1555 	matches->block.n_pointers = 0;
1556 #endif /* UNIV_AHI_DEBUG || UNIV_DEBUG */
1557 	matches->block.curr_n_fields = block->curr_n_fields;
1558 	matches->block.curr_left_side = block->curr_left_side;
1559 	matches->block.index = block->index;
1560 #endif /* BTR_CUR_HASH_ADAPT */
1561 	ut_d(matches->block.debug_latch = NULL);
1562 }
1563 
1564 /****************************************************************//**
1565 Generate a shadow copy of the page block header to save the
1566 matched records */
1567 static
1568 void
rtr_init_match(matched_rec_t * matches,const buf_block_t * block,const page_t * page)1569 rtr_init_match(
1570 /*===========*/
1571 	matched_rec_t*		matches,/*!< in/out: match to initialize */
1572 	const buf_block_t*	block,	/*!< in: buffer block */
1573 	const page_t*		page)	/*!< in: buffer page */
1574 {
1575 	ut_ad(matches->matched_recs->empty());
1576 	matches->locked = false;
1577 	rtr_copy_buf(matches, block);
1578 	matches->block.frame = matches->bufp;
1579 	matches->valid = false;
1580 	/* We have to copy PAGE_W*_SUPREMUM_END bytes so that we can
1581 	use infimum/supremum of this page as normal btr page for search. */
1582 	memcpy(matches->block.frame, page, page_is_comp(page)
1583 						? PAGE_NEW_SUPREMUM_END
1584 						: PAGE_OLD_SUPREMUM_END);
1585 	matches->used = page_is_comp(page)
1586 				? PAGE_NEW_SUPREMUM_END
1587 				: PAGE_OLD_SUPREMUM_END;
1588 #ifdef RTR_SEARCH_DIAGNOSTIC
1589 	ulint pageno = page_get_page_no(page);
1590 	fprintf(stderr, "INNODB_RTR: Searching leaf page %d\n",
1591 		static_cast<int>(pageno));
1592 #endif /* RTR_SEARCH_DIAGNOSTIC */
1593 }
1594 
1595 /****************************************************************//**
1596 Get the bounding box content from an index record */
1597 void
rtr_get_mbr_from_rec(const rec_t * rec,const rec_offs * offsets,rtr_mbr_t * mbr)1598 rtr_get_mbr_from_rec(
1599 /*=================*/
1600 	const rec_t*	rec,	/*!< in: data tuple */
1601 	const rec_offs*	offsets,/*!< in: offsets array */
1602 	rtr_mbr_t*	mbr)	/*!< out MBR */
1603 {
1604 	ulint		rec_f_len;
1605 	const byte*	data;
1606 
1607 	data = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1608 
1609 	rtr_read_mbr(data, mbr);
1610 }
1611 
1612 /****************************************************************//**
1613 Get the bounding box content from a MBR data record */
1614 void
rtr_get_mbr_from_tuple(const dtuple_t * dtuple,rtr_mbr * mbr)1615 rtr_get_mbr_from_tuple(
1616 /*===================*/
1617 	const dtuple_t* dtuple, /*!< in: data tuple */
1618 	rtr_mbr*	mbr)	/*!< out: mbr to fill */
1619 {
1620 	const dfield_t* dtuple_field;
1621         ulint           dtuple_f_len;
1622 
1623 	dtuple_field = dtuple_get_nth_field(dtuple, 0);
1624 	dtuple_f_len = dfield_get_len(dtuple_field);
1625 	ut_a(dtuple_f_len >= 4 * sizeof(double));
1626 
1627 	rtr_read_mbr(static_cast<const byte*>(dfield_get_data(dtuple_field)),
1628 		     mbr);
1629 }
1630 
1631 /** Compare minimum bounding rectangles.
1632 @return	1, 0, -1, if mode == PAGE_CUR_MBR_EQUAL. And return
1633 1, 0 for rest compare modes, depends on a and b qualifies the
1634 relationship (CONTAINS, WITHIN etc.) */
cmp_gis_field(page_cur_mode_t mode,const void * a,const void * b)1635 static int cmp_gis_field(page_cur_mode_t mode, const void *a, const void *b)
1636 {
1637   return mode == PAGE_CUR_MBR_EQUAL
1638     ? cmp_geometry_field(a, b)
1639     : rtree_key_cmp(mode, a, b);
1640 }
1641 
1642 /** Compare a GIS data tuple to a physical record in rtree non-leaf node.
1643 We need to check the page number field, since we don't store pk field in
1644 rtree non-leaf node.
1645 @param[in]	dtuple		data tuple
1646 @param[in]	rec		R-tree record
1647 @return whether dtuple is less than rec */
1648 static bool
cmp_dtuple_rec_with_gis_internal(const dtuple_t * dtuple,const rec_t * rec)1649 cmp_dtuple_rec_with_gis_internal(const dtuple_t* dtuple, const rec_t* rec)
1650 {
1651   const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0);
1652   ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN);
1653 
1654   if (cmp_gis_field(PAGE_CUR_WITHIN, dfield_get_data(dtuple_field), rec))
1655     return true;
1656 
1657   dtuple_field= dtuple_get_nth_field(dtuple, 1);
1658   ut_ad(dfield_get_len(dtuple_field) == 4); /* child page number */
1659   ut_ad(dtuple_field->type.mtype == DATA_SYS_CHILD);
1660   ut_ad(!(dtuple_field->type.prtype & ~DATA_NOT_NULL));
1661 
1662   return memcmp(dtuple_field->data, rec + DATA_MBR_LEN, 4) != 0;
1663 }
1664 
1665 #ifndef UNIV_DEBUG
1666 static
1667 #endif
1668 /** Compare a GIS data tuple to a physical record.
1669 @param[in] dtuple data tuple
1670 @param[in] rec R-tree record
1671 @param[in] mode compare mode
1672 @retval negative if dtuple is less than rec */
cmp_dtuple_rec_with_gis(const dtuple_t * dtuple,const rec_t * rec,page_cur_mode_t mode)1673 int cmp_dtuple_rec_with_gis(const dtuple_t *dtuple, const rec_t *rec,
1674                             page_cur_mode_t mode)
1675 {
1676   const dfield_t *dtuple_field= dtuple_get_nth_field(dtuple, 0);
1677   /* FIXME: TABLE_SHARE::init_from_binary_frm_image() is adding
1678   field->key_part_length_bytes() to the key length */
1679   ut_ad(dfield_get_len(dtuple_field) == DATA_MBR_LEN ||
1680         dfield_get_len(dtuple_field) == DATA_MBR_LEN + 2);
1681 
1682   return cmp_gis_field(mode, dfield_get_data(dtuple_field), rec);
1683 }
1684 
1685 /****************************************************************//**
1686 Searches the right position in rtree for a page cursor. */
1687 bool
rtr_cur_search_with_match(const buf_block_t * block,dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,page_cur_t * cursor,rtr_info_t * rtr_info)1688 rtr_cur_search_with_match(
1689 /*======================*/
1690 	const buf_block_t*	block,	/*!< in: buffer block */
1691 	dict_index_t*		index,	/*!< in: index descriptor */
1692 	const dtuple_t*		tuple,	/*!< in: data tuple */
1693 	page_cur_mode_t		mode,	/*!< in: PAGE_CUR_RTREE_INSERT,
1694 					PAGE_CUR_RTREE_LOCATE etc. */
1695 	page_cur_t*		cursor,	/*!< in/out: page cursor */
1696 	rtr_info_t*		rtr_info)/*!< in/out: search stack */
1697 {
1698 	bool		found = false;
1699 	const page_t*	page;
1700 	const rec_t*	rec;
1701 	const rec_t*	last_rec;
1702 	rec_offs	offsets_[REC_OFFS_NORMAL_SIZE];
1703 	rec_offs*	offsets		= offsets_;
1704 	mem_heap_t*	heap = NULL;
1705 	int		cmp = 1;
1706 	double		least_inc = DBL_MAX;
1707 	const rec_t*	best_rec;
1708 	const rec_t*	last_match_rec = NULL;
1709 	bool		match_init = false;
1710 	page_cur_mode_t	orig_mode = mode;
1711 	const rec_t*	first_rec = NULL;
1712 
1713 	rec_offs_init(offsets_);
1714 
1715 	ut_ad(RTREE_SEARCH_MODE(mode));
1716 
1717 	ut_ad(dict_index_is_spatial(index));
1718 
1719 	page = buf_block_get_frame(block);
1720 
1721 	const ulint level = btr_page_get_level(page);
1722 	const ulint n_core = level ? 0 : index->n_fields;
1723 
1724 	if (mode == PAGE_CUR_RTREE_LOCATE) {
1725 		ut_ad(level != 0);
1726 		mode = PAGE_CUR_WITHIN;
1727 	}
1728 
1729 	rec = page_dir_slot_get_rec(page_dir_get_nth_slot(page, 0));
1730 
1731 	last_rec = rec;
1732 	best_rec = rec;
1733 
1734 	if (page_rec_is_infimum(rec)) {
1735 		rec = page_rec_get_next_const(rec);
1736 	}
1737 
1738 	/* Check insert tuple size is larger than first rec, and try to
1739 	avoid it if possible */
1740 	if (mode == PAGE_CUR_RTREE_INSERT && !page_rec_is_supremum(rec)) {
1741 
1742 		ulint	new_rec_size = rec_get_converted_size(index, tuple, 0);
1743 
1744 		offsets = rec_get_offsets(rec, index, offsets, n_core,
1745 					  dtuple_get_n_fields_cmp(tuple),
1746 					  &heap);
1747 
1748 		if (rec_offs_size(offsets) < new_rec_size) {
1749 			first_rec = rec;
1750 		}
1751 
1752 		/* If this is the left-most page of this index level
1753 		and the table is a compressed table, try to avoid
1754 		first page as much as possible, as there will be problem
1755 		when update MIN_REC rec in compress table */
1756 		if (is_buf_block_get_page_zip(block)
1757 		    && !page_has_prev(page)
1758 		    && page_get_n_recs(page) >= 2) {
1759 
1760 			rec = page_rec_get_next_const(rec);
1761 		}
1762 	}
1763 
1764 	while (!page_rec_is_supremum(rec)) {
1765 		if (!n_core) {
1766 			switch (mode) {
1767 			case PAGE_CUR_CONTAIN:
1768 			case PAGE_CUR_INTERSECT:
1769 			case PAGE_CUR_MBR_EQUAL:
1770 				/* At non-leaf level, we will need to check
1771 				both CONTAIN and INTERSECT for either of
1772 				the search mode */
1773 				cmp = cmp_dtuple_rec_with_gis(
1774 					tuple, rec, PAGE_CUR_CONTAIN);
1775 
1776 				if (cmp != 0) {
1777 					cmp = cmp_dtuple_rec_with_gis(
1778 						tuple, rec,
1779 						PAGE_CUR_INTERSECT);
1780 				}
1781 				break;
1782 			case PAGE_CUR_DISJOINT:
1783 				cmp = cmp_dtuple_rec_with_gis(
1784 					tuple, rec, mode);
1785 
1786 				if (cmp != 0) {
1787 					cmp = cmp_dtuple_rec_with_gis(
1788 						tuple, rec,
1789 						PAGE_CUR_INTERSECT);
1790 				}
1791 				break;
1792 			case PAGE_CUR_RTREE_INSERT:
1793 				double	increase;
1794 				double	area;
1795 
1796 				cmp = cmp_dtuple_rec_with_gis(
1797 					tuple, rec, PAGE_CUR_WITHIN);
1798 
1799 				if (cmp != 0) {
1800 					increase = rtr_rec_cal_increase(
1801 						tuple, rec, &area);
1802 					/* Once it goes beyond DBL_MAX,
1803 					it would not make sense to record
1804 					such value, just make it
1805 					DBL_MAX / 2  */
1806 					if (increase >= DBL_MAX) {
1807 						increase = DBL_MAX / 2;
1808 					}
1809 
1810 					if (increase < least_inc) {
1811 						least_inc = increase;
1812 						best_rec = rec;
1813 					} else if (best_rec
1814 						   && best_rec == first_rec) {
1815 						/* if first_rec is set,
1816 						we will try to avoid it */
1817 						least_inc = increase;
1818 						best_rec = rec;
1819 					}
1820 				}
1821 				break;
1822 			case PAGE_CUR_RTREE_GET_FATHER:
1823 				cmp = cmp_dtuple_rec_with_gis_internal(
1824 					tuple, rec);
1825 				break;
1826 			default:
1827 				/* WITHIN etc. */
1828 				cmp = cmp_dtuple_rec_with_gis(
1829 					tuple, rec, mode);
1830 			}
1831 		} else {
1832 			/* At leaf level, INSERT should translate to LE */
1833 			ut_ad(mode != PAGE_CUR_RTREE_INSERT);
1834 
1835 			cmp = cmp_dtuple_rec_with_gis(
1836 				tuple, rec, mode);
1837 		}
1838 
1839 		if (cmp == 0) {
1840 			found = true;
1841 
1842 			/* If located, the matching node/rec will be pushed
1843 			to rtr_info->path for non-leaf nodes, or
1844 			rtr_info->matches for leaf nodes */
1845 			if (rtr_info && mode != PAGE_CUR_RTREE_INSERT) {
1846 				if (!n_core) {
1847 					uint32_t	page_no;
1848 					node_seq_t	new_seq;
1849 					bool		is_loc;
1850 
1851 					is_loc = (orig_mode
1852 						  == PAGE_CUR_RTREE_LOCATE
1853 						  || orig_mode
1854 						  == PAGE_CUR_RTREE_GET_FATHER);
1855 
1856 					offsets = rec_get_offsets(
1857 						rec, index, offsets, 0,
1858 						ULINT_UNDEFINED, &heap);
1859 
1860 					page_no = btr_node_ptr_get_child_page_no(
1861 						rec, offsets);
1862 
1863 					ut_ad(level >= 1);
1864 
1865 					/* Get current SSN, before we insert
1866 					it into the path stack */
1867 					new_seq = rtr_get_current_ssn_id(index);
1868 
1869 					rtr_non_leaf_stack_push(
1870 						rtr_info->path,
1871 						page_no,
1872 						new_seq, level - 1, 0,
1873 						NULL, 0);
1874 
1875 					if (is_loc) {
1876 						rtr_non_leaf_insert_stack_push(
1877 							index,
1878 							rtr_info->parent_path,
1879 							level, page_no, block,
1880 							rec, 0);
1881 					}
1882 
1883 					if (!srv_read_only_mode
1884 					    && (rtr_info->need_page_lock
1885 						|| !is_loc)) {
1886 
1887 						/* Lock the page, preventing it
1888 						from being shrunk */
1889 						lock_place_prdt_page_lock(
1890 							page_id_t(block->page
1891 								  .id()
1892 								  .space(),
1893 								  page_no),
1894 							index,
1895 							rtr_info->thr);
1896 					}
1897 				} else {
1898 					ut_ad(orig_mode
1899 					      != PAGE_CUR_RTREE_LOCATE);
1900 
1901 					if (!match_init) {
1902 						rtr_init_match(
1903 							rtr_info->matches,
1904 							block, page);
1905 						match_init = true;
1906 					}
1907 
1908 					/* Collect matched records on page */
1909 					offsets = rec_get_offsets(
1910 						rec, index, offsets,
1911 						index->n_fields,
1912 						ULINT_UNDEFINED, &heap);
1913 					rtr_leaf_push_match_rec(
1914 						rec, rtr_info, offsets,
1915 						page_is_comp(page));
1916 				}
1917 
1918 				last_match_rec = rec;
1919 			} else {
1920 				/* This is the insertion case, it will break
1921 				once it finds the first MBR that can accomodate
1922 				the inserting rec */
1923 				break;
1924 			}
1925 		}
1926 
1927 		last_rec = rec;
1928 
1929 		rec = page_rec_get_next_const(rec);
1930 	}
1931 
1932 	/* All records on page are searched */
1933 	if (page_rec_is_supremum(rec)) {
1934 		if (!n_core) {
1935 			if (!found) {
1936 				/* No match case, if it is for insertion,
1937 				then we select the record that result in
1938 				least increased area */
1939 				if (mode == PAGE_CUR_RTREE_INSERT) {
1940 					ut_ad(least_inc < DBL_MAX);
1941 					offsets = rec_get_offsets(
1942 						best_rec, index, offsets,
1943 						0, ULINT_UNDEFINED, &heap);
1944 					uint32_t child_no =
1945 					btr_node_ptr_get_child_page_no(
1946 						best_rec, offsets);
1947 
1948 					rtr_non_leaf_insert_stack_push(
1949 						index, rtr_info->parent_path,
1950 						level, child_no, block,
1951 						best_rec, least_inc);
1952 
1953 					page_cur_position(best_rec, block,
1954 							  cursor);
1955 					rtr_info->mbr_adj = true;
1956 				} else {
1957 					/* Position at the last rec of the
1958 					page, if it is not the leaf page */
1959 					page_cur_position(last_rec, block,
1960 							  cursor);
1961 				}
1962 			} else {
1963 				/* There are matching records, position
1964 				in the last matching records */
1965 				if (rtr_info) {
1966 					rec = last_match_rec;
1967 					page_cur_position(
1968 						rec, block, cursor);
1969 				}
1970 			}
1971 		} else if (rtr_info) {
1972 			/* Leaf level, no match, position at the
1973 			last (supremum) rec */
1974 			if (!last_match_rec) {
1975 				page_cur_position(rec, block, cursor);
1976 				goto func_exit;
1977 			}
1978 
1979 			/* There are matched records */
1980 			matched_rec_t*	match_rec = rtr_info->matches;
1981 
1982 			rtr_rec_t	test_rec;
1983 
1984 			test_rec = match_rec->matched_recs->back();
1985 #ifdef UNIV_DEBUG
1986 			rec_offs	offsets_2[REC_OFFS_NORMAL_SIZE];
1987 			rec_offs*	offsets2	= offsets_2;
1988 			rec_offs_init(offsets_2);
1989 
1990 			ut_ad(found);
1991 
1992 			/* Verify the record to be positioned is the same
1993 			as the last record in matched_rec vector */
1994 			offsets2 = rec_get_offsets(test_rec.r_rec, index,
1995 						   offsets2, index->n_fields,
1996 						   ULINT_UNDEFINED, &heap);
1997 
1998 			offsets = rec_get_offsets(last_match_rec, index,
1999 						  offsets, index->n_fields,
2000 						  ULINT_UNDEFINED, &heap);
2001 
2002 			ut_ad(cmp_rec_rec(test_rec.r_rec, last_match_rec,
2003 					  offsets2, offsets, index) == 0);
2004 #endif /* UNIV_DEBUG */
2005 			/* Pop the last match record and position on it */
2006 			match_rec->matched_recs->pop_back();
2007 			page_cur_position(test_rec.r_rec, &match_rec->block,
2008 					  cursor);
2009 		}
2010 	} else {
2011 
2012 		if (mode == PAGE_CUR_RTREE_INSERT) {
2013 			ut_ad(!last_match_rec);
2014 			rtr_non_leaf_insert_stack_push(
2015 				index, rtr_info->parent_path, level,
2016 				mach_read_from_4(rec + DATA_MBR_LEN),
2017 				block, rec, 0);
2018 
2019 		} else if (rtr_info && found && !n_core) {
2020 			rec = last_match_rec;
2021 		}
2022 
2023 		page_cur_position(rec, block, cursor);
2024 	}
2025 
2026 #ifdef UNIV_DEBUG
2027 	/* Verify that we are positioned at the same child page as pushed in
2028 	the path stack */
2029 	if (!n_core && (!page_rec_is_supremum(rec) || found)
2030 	    && mode != PAGE_CUR_RTREE_INSERT) {
2031 		ulint		page_no;
2032 
2033 		offsets = rec_get_offsets(rec, index, offsets, 0,
2034 					  ULINT_UNDEFINED, &heap);
2035 		page_no = btr_node_ptr_get_child_page_no(rec, offsets);
2036 
2037 		if (rtr_info && found) {
2038 			rtr_node_path_t*	path = rtr_info->path;
2039 			node_visit_t		last_visit = path->back();
2040 
2041 			ut_ad(last_visit.page_no == page_no);
2042 		}
2043 	}
2044 #endif /* UNIV_DEBUG */
2045 
2046 func_exit:
2047 	if (UNIV_LIKELY_NULL(heap)) {
2048 		mem_heap_free(heap);
2049 	}
2050 
2051 	return(found);
2052 }
2053