1 /*****************************************************************************
2 
3 Copyright (c) 2016, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file gis/gis0rtree.cc
29 InnoDB R-tree interfaces
30 
31 Created 2013/03/27 Allen Lai and Jimmy Yang
32 ***********************************************************************/
33 
34 #include "fsp0fsp.h"
35 #include "page0page.h"
36 #include "page0cur.h"
37 #include "page0zip.h"
38 #include "gis0rtree.h"
39 
40 #ifndef UNIV_HOTBACKUP
41 #include "btr0cur.h"
42 #include "btr0sea.h"
43 #include "btr0pcur.h"
44 #include "rem0cmp.h"
45 #include "lock0lock.h"
46 #include "ibuf0ibuf.h"
47 #include "trx0trx.h"
48 #include "srv0mon.h"
49 #include "gis0geo.h"
50 
51 #endif /* UNIV_HOTBACKUP */
52 
53 /*************************************************************//**
54 Initial split nodes info for R-tree split.
55 @return initialized split nodes array */
56 static
57 rtr_split_node_t*
rtr_page_split_initialize_nodes(mem_heap_t * heap,btr_cur_t * cursor,ulint ** offsets,const dtuple_t * tuple,double ** buf_pos)58 rtr_page_split_initialize_nodes(
59 /*============================*/
60 	mem_heap_t*	heap,	/*!< in: pointer to memory heap, or NULL */
61 	btr_cur_t*	cursor,	/*!< in: cursor at which to insert; when the
62 				function returns, the cursor is positioned
63 				on the predecessor of the inserted record */
64 	ulint**		offsets,/*!< in: offsets on inserted record */
65 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
66 	double**	buf_pos)/*!< in/out: current buffer position */
67 {
68 	rtr_split_node_t*	split_node_array;
69 	double*			buf;
70 	ulint			n_recs;
71 	rtr_split_node_t*	task;
72 	rtr_split_node_t*	stop;
73 	rtr_split_node_t*	cur;
74 	rec_t*			rec;
75 	buf_block_t*		block;
76 	page_t*			page;
77 	ulint			n_uniq;
78 	ulint			len;
79 	byte*			source_cur;
80 
81 	block = btr_cur_get_block(cursor);
82 	page = buf_block_get_frame(block);
83 	n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
84 
85 	n_recs = page_get_n_recs(page) + 1;
86 
87 	/*We reserve 2 MBRs memory space for temp result of split
88 	algrithm. And plus the new mbr that need to insert, we
89 	need (n_recs + 3)*MBR size for storing all MBRs.*/
90 	buf = static_cast<double*>(mem_heap_alloc(
91 		heap, DATA_MBR_LEN * (n_recs + 3)
92 		+ sizeof(rtr_split_node_t) * (n_recs + 1)));
93 
94 	split_node_array = (rtr_split_node_t*)(buf + SPDIMS * 2 * (n_recs + 3));
95 	task = split_node_array;
96 	*buf_pos = buf;
97 	stop = task + n_recs;
98 
99 	rec = page_rec_get_next(page_get_infimum_rec(page));
100 	*offsets = rec_get_offsets(rec, cursor->index, *offsets,
101 				   n_uniq, &heap);
102 
103 	source_cur = rec_get_nth_field(rec, *offsets, 0, &len);
104 
105 	for (cur = task; cur < stop - 1; ++cur) {
106 		cur->coords = reserve_coords(buf_pos, SPDIMS);
107 		cur->key = rec;
108 
109 		memcpy(cur->coords, source_cur, DATA_MBR_LEN);
110 
111 		rec = page_rec_get_next(rec);
112 		*offsets = rec_get_offsets(rec, cursor->index, *offsets,
113 					   n_uniq, &heap);
114 		source_cur = rec_get_nth_field(rec, *offsets, 0, &len);
115 	}
116 
117 	/* Put the insert key to node list */
118 	source_cur = static_cast<byte*>(dfield_get_data(
119 		dtuple_get_nth_field(tuple, 0)));
120 	cur->coords = reserve_coords(buf_pos, SPDIMS);
121 	rec = (byte*) mem_heap_alloc(
122 		heap, rec_get_converted_size(cursor->index, tuple, 0));
123 
124 	rec = rec_convert_dtuple_to_rec(rec, cursor->index, tuple, 0);
125 	cur->key = rec;
126 
127 	memcpy(cur->coords, source_cur, DATA_MBR_LEN);
128 
129 	return split_node_array;
130 }
131 
132 /**********************************************************************//**
133 Builds a Rtree node pointer out of a physical record and a page number.
134 Note: For Rtree, we just keep the mbr and page no field in non-leaf level
135 page. It's different with Btree, Btree still keeps PK fields so far.
136 @return	own: node pointer */
137 dtuple_t*
rtr_index_build_node_ptr(const dict_index_t * index,const rtr_mbr_t * mbr,const rec_t * rec,ulint page_no,mem_heap_t * heap,ulint level)138 rtr_index_build_node_ptr(
139 /*=====================*/
140 	const dict_index_t*	index,	/*!< in: index */
141 	const rtr_mbr_t*	mbr,	/*!< in: mbr of lower page */
142 	const rec_t*		rec,	/*!< in: record for which to build node
143 					pointer */
144 	ulint			page_no,/*!< in: page number to put in node
145 					pointer */
146 	mem_heap_t*		heap,	/*!< in: memory heap where pointer
147 					created */
148 	ulint			level)	/*!< in: level of rec in tree:
149 					0 means leaf level */
150 {
151 	dtuple_t*	tuple;
152 	dfield_t*	field;
153 	byte*		buf;
154 	ulint		n_unique;
155 	ulint		info_bits;
156 
157 	ut_ad(dict_index_is_spatial(index));
158 
159 	n_unique = DICT_INDEX_SPATIAL_NODEPTR_SIZE;
160 
161 	tuple = dtuple_create(heap, n_unique + 1);
162 
163 	/* For rtree internal node, we need to compare page number
164 	fields. */
165 	dtuple_set_n_fields_cmp(tuple, n_unique + 1);
166 
167 	dict_index_copy_types(tuple, index, n_unique);
168 
169 	/* Write page no field */
170 	buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
171 
172 	mach_write_to_4(buf, page_no);
173 
174 	field = dtuple_get_nth_field(tuple, n_unique);
175 	dfield_set_data(field, buf, 4);
176 
177 	dtype_set(dfield_get_type(field), DATA_SYS_CHILD, DATA_NOT_NULL, 4);
178 
179 	/* Set info bits. */
180 	info_bits = rec_get_info_bits(rec, dict_table_is_comp(index->table));
181 	dtuple_set_info_bits(tuple, info_bits | REC_STATUS_NODE_PTR);
182 
183 	/* Set mbr as index entry data */
184 	field = dtuple_get_nth_field(tuple, 0);
185 
186 	buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_MBR_LEN));
187 
188 	rtr_write_mbr(buf, mbr);
189 
190 	dfield_set_data(field, buf, DATA_MBR_LEN);
191 
192 	ut_ad(dtuple_check_typed(tuple));
193 
194 	return(tuple);
195 }
196 
197 /**************************************************************//**
198 In-place update the mbr field of a spatial index row.
199 @return true if update is successful */
200 static
201 bool
rtr_update_mbr_field_in_place(dict_index_t * index,rec_t * rec,ulint * offsets,rtr_mbr_t * mbr,mtr_t * mtr)202 rtr_update_mbr_field_in_place(
203 /*==========================*/
204 	dict_index_t*	index,		/*!< in: spatial index. */
205 	rec_t*		rec,		/*!< in/out: rec to be modified.*/
206 	ulint*		offsets,	/*!< in/out: offsets on rec. */
207 	rtr_mbr_t*	mbr,		/*!< in: the new mbr. */
208 	mtr_t*		mtr)		/*!< in: mtr */
209 {
210 	void*		new_mbr_ptr;
211 	double		new_mbr[SPDIMS * 2];
212 	byte*		log_ptr;
213 	page_t*		page = page_align(rec);
214 	ulint		len = DATA_MBR_LEN;
215 	ulint		flags = BTR_NO_UNDO_LOG_FLAG
216 			| BTR_NO_LOCKING_FLAG
217 			| BTR_KEEP_SYS_FLAG;
218 	ulint		rec_info;
219 
220 	rtr_write_mbr(reinterpret_cast<byte*>(&new_mbr), mbr);
221 	new_mbr_ptr = static_cast<void*>(new_mbr);
222 	/* Otherwise, set the mbr to the new_mbr. */
223 	rec_set_nth_field(rec, offsets, 0, new_mbr_ptr, len);
224 
225 	rec_info = rec_get_info_bits(rec, rec_offs_comp(offsets));
226 
227 	/* Write redo log. */
228 	/* For now, we use LOG_REC_UPDATE_IN_PLACE to log this enlarge.
229 	In the future, we may need to add a new log type for this. */
230 	log_ptr = mlog_open_and_write_index(mtr, rec, index, page_is_comp(page)
231 					    ? MLOG_COMP_REC_UPDATE_IN_PLACE
232 					    : MLOG_REC_UPDATE_IN_PLACE,
233 					    1 + DATA_ROLL_PTR_LEN + 14 + 2
234 					    + MLOG_BUF_MARGIN);
235 
236 	if (!log_ptr) {
237 		/* Logging in mtr is switched off during
238 		crash recovery */
239 		return(false);
240 	}
241 
242 	/* Flags */
243 	mach_write_to_1(log_ptr, flags);
244 	log_ptr++;
245 	/* TRX_ID Position */
246 	log_ptr += mach_write_compressed(log_ptr, 0);
247 	/* ROLL_PTR */
248 	trx_write_roll_ptr(log_ptr, 0);
249 	log_ptr += DATA_ROLL_PTR_LEN;
250 	/* TRX_ID */
251 	log_ptr += mach_u64_write_compressed(log_ptr, 0);
252 
253 	/* Offset */
254 	mach_write_to_2(log_ptr, page_offset(rec));
255 	log_ptr += 2;
256 	/* Info bits */
257 	mach_write_to_1(log_ptr, rec_info);
258 	log_ptr++;
259 	/* N fields */
260 	log_ptr += mach_write_compressed(log_ptr, 1);
261 	/* Field no, len */
262 	log_ptr += mach_write_compressed(log_ptr, 0);
263 	log_ptr += mach_write_compressed(log_ptr, len);
264 	/* Data */
265 	memcpy(log_ptr, new_mbr_ptr, len);
266 	log_ptr += len;
267 
268 	mlog_close(mtr, log_ptr);
269 
270 	return(true);
271 }
272 
273 /**************************************************************//**
274 Update the mbr field of a spatial index row.
275 @return true if update is successful */
276 bool
rtr_update_mbr_field(btr_cur_t * cursor,ulint * offsets,btr_cur_t * cursor2,page_t * child_page,rtr_mbr_t * mbr,rec_t * new_rec,mtr_t * mtr)277 rtr_update_mbr_field(
278 /*=================*/
279 	btr_cur_t*	cursor,		/*!< in/out: cursor pointed to rec.*/
280 	ulint*		offsets,	/*!< in/out: offsets on rec. */
281 	btr_cur_t*	cursor2,	/*!< in/out: cursor pointed to rec
282 					that should be deleted.
283 					this cursor is for btr_compress to
284 					delete the merged page's father rec.*/
285 	page_t*		child_page,	/*!< in: child page. */
286 	rtr_mbr_t*	mbr,		/*!< in: the new mbr. */
287 	rec_t*		new_rec,	/*!< in: rec to use */
288 	mtr_t*		mtr)		/*!< in: mtr */
289 {
290 	dict_index_t*	index = cursor->index;
291 	mem_heap_t*	heap;
292 	page_t*		page;
293 	rec_t*		rec;
294 	ulint		flags = BTR_NO_UNDO_LOG_FLAG
295 			| BTR_NO_LOCKING_FLAG
296 			| BTR_KEEP_SYS_FLAG;
297 	dberr_t		err;
298 	big_rec_t*	dummy_big_rec;
299 	buf_block_t*	block;
300 	rec_t*		child_rec;
301 	ulint		up_match = 0;
302 	ulint		low_match = 0;
303 	ulint		child;
304 	ulint		level;
305 	ulint		rec_info;
306 	page_zip_des_t*	page_zip;
307 	bool		ins_suc = true;
308 	ulint		cur2_pos = 0;
309 	ulint		del_page_no = 0;
310 	ulint*		offsets2;
311 
312 	rec = btr_cur_get_rec(cursor);
313 	page = page_align(rec);
314 
315 	rec_info = rec_get_info_bits(rec, rec_offs_comp(offsets));
316 
317 	heap = mem_heap_create(100);
318 	block = btr_cur_get_block(cursor);
319 	ut_ad(page == buf_block_get_frame(block));
320 	page_zip = buf_block_get_page_zip(block);
321 
322 	child = btr_node_ptr_get_child_page_no(rec, offsets);
323 	level = btr_page_get_level(buf_block_get_frame(block), mtr);
324 
325 	if (new_rec) {
326 		child_rec = new_rec;
327 	} else {
328 		child_rec = page_rec_get_next(page_get_infimum_rec(child_page));
329 	}
330 
331 	dtuple_t* node_ptr = rtr_index_build_node_ptr(
332 		index, mbr, child_rec, child, heap, level);
333 
334 	/* We need to remember the child page no of cursor2, since page could be
335 	reorganized or insert a new rec before it. */
336 	if (cursor2) {
337 		rec_t*	del_rec = btr_cur_get_rec(cursor2);
338 		offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2),
339 					   index, NULL,
340 					   ULINT_UNDEFINED, &heap);
341 		del_page_no = btr_node_ptr_get_child_page_no(del_rec, offsets2);
342 		cur2_pos = page_rec_get_n_recs_before(btr_cur_get_rec(cursor2));
343 	}
344 
345 	if (rec_info & REC_INFO_MIN_REC_FLAG) {
346 		/* When the rec is minimal rec in this level, we do
347 		 in-place update for avoiding it move to other place. */
348 
349 		if (page_zip) {
350 			/* Check if there's enough space for in-place
351 			update the zip page. */
352 			if (!btr_cur_update_alloc_zip(
353 					page_zip,
354 					btr_cur_get_page_cur(cursor),
355 					index, offsets,
356 					rec_offs_size(offsets),
357 					false, mtr)) {
358 
359 				/* If there's not enought space for
360 				inplace update zip page, we do delete
361 				insert. */
362 				ins_suc = false;
363 
364 				/* Since btr_cur_update_alloc_zip could
365 				reorganize the page, we need to repositon
366 				cursor2. */
367 				if (cursor2) {
368 					cursor2->page_cur.rec =
369 						page_rec_get_nth(page,
370 								 cur2_pos);
371 				}
372 
373 				goto update_mbr;
374 			}
375 
376 			/* Record could be repositioned */
377 			rec = btr_cur_get_rec(cursor);
378 
379 #ifdef UNIV_DEBUG
380 			/* Make sure it is still the first record */
381 			rec_info = rec_get_info_bits(
382 					rec, rec_offs_comp(offsets));
383 			ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
384 #endif /* UNIV_DEBUG */
385 		}
386 
387 		if (!rtr_update_mbr_field_in_place(index, rec,
388 						   offsets, mbr, mtr)) {
389 			return(false);
390 		}
391 
392 		if (page_zip) {
393 			page_zip_write_rec(page_zip, rec, index, offsets, 0);
394 		}
395 
396 		if (cursor2) {
397 			ulint* offsets2;
398 
399 			if (page_zip) {
400 				cursor2->page_cur.rec
401 					= page_rec_get_nth(page, cur2_pos);
402 			}
403 			offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2),
404 						   index, NULL,
405 						   ULINT_UNDEFINED, &heap);
406 			ut_ad(del_page_no == btr_node_ptr_get_child_page_no(
407 							cursor2->page_cur.rec,
408 							offsets2));
409 
410 			page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
411 					    index, offsets2, mtr);
412 		}
413 	} else if (page_get_n_recs(page) == 1) {
414 		/* When there's only one rec in the page, we do insert/delete to
415 		avoid page merge. */
416 
417 		page_cur_t		page_cur;
418 		rec_t*			insert_rec;
419 		ulint*			insert_offsets = NULL;
420 		ulint			old_pos;
421 		rec_t*			old_rec;
422 
423 		ut_ad(cursor2 == NULL);
424 
425 		/* Insert the new mbr rec. */
426 		old_pos = page_rec_get_n_recs_before(rec);
427 
428 		err = btr_cur_optimistic_insert(
429 			flags,
430 			cursor, &insert_offsets, &heap,
431 			node_ptr, &insert_rec, &dummy_big_rec, 0, NULL, mtr);
432 
433 		ut_ad(err == DB_SUCCESS);
434 
435 		btr_cur_position(index, insert_rec, block, cursor);
436 
437 		/* Delete the old mbr rec. */
438 		old_rec = page_rec_get_nth(page, old_pos);
439 		ut_ad(old_rec != insert_rec);
440 
441 		page_cur_position(old_rec, block, &page_cur);
442 		offsets2 = rec_get_offsets(old_rec,
443 					   index, NULL,
444 					   ULINT_UNDEFINED, &heap);
445 		page_cur_delete_rec(&page_cur, index, offsets2, mtr);
446 
447 	} else {
448 update_mbr:
449 		/* When there're not only 1 rec in the page, we do delete/insert
450 		to avoid page split. */
451 		rec_t*			insert_rec;
452 		ulint*			insert_offsets = NULL;
453 		rec_t*			next_rec;
454 
455 		/* Delete the rec which cursor point to. */
456 		next_rec = page_rec_get_next(rec);
457 		page_cur_delete_rec(btr_cur_get_page_cur(cursor),
458 				    index, offsets, mtr);
459 		if (!ins_suc) {
460 			ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
461 
462 			btr_set_min_rec_mark(next_rec, mtr);
463 		}
464 
465 		/* If there's more than 1 rec left in the page, delete
466 		the rec which cursor2 point to. Otherwise, delete it later.*/
467 		if (cursor2 && page_get_n_recs(page) > 1) {
468 			ulint		cur2_rec_info;
469 			rec_t*		cur2_rec;
470 
471 			cur2_rec = cursor2->page_cur.rec;
472 			offsets2 = rec_get_offsets(cur2_rec, index, NULL,
473 						   ULINT_UNDEFINED, &heap);
474 
475 			cur2_rec_info = rec_get_info_bits(cur2_rec,
476 						rec_offs_comp(offsets2));
477 			if (cur2_rec_info & REC_INFO_MIN_REC_FLAG) {
478 				/* If we delete the leftmost node
479 				pointer on a non-leaf level, we must
480 				mark the new leftmost node pointer as
481 				the predefined minimum record */
482 				rec_t*	next_rec = page_rec_get_next(cur2_rec);
483 				btr_set_min_rec_mark(next_rec, mtr);
484 			}
485 
486 			ut_ad(del_page_no
487 			      == btr_node_ptr_get_child_page_no(cur2_rec,
488 								offsets2));
489 			page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
490 					    index, offsets2, mtr);
491 			cursor2 = NULL;
492 		}
493 
494 		/* Insert the new rec. */
495 		page_cur_search_with_match(block, index, node_ptr,
496 					   PAGE_CUR_LE , &up_match, &low_match,
497 					   btr_cur_get_page_cur(cursor), NULL);
498 
499 		err = btr_cur_optimistic_insert(flags, cursor, &insert_offsets,
500 						&heap, node_ptr, &insert_rec,
501 						&dummy_big_rec, 0, NULL, mtr);
502 
503 		if (!ins_suc && err == DB_SUCCESS) {
504 			ins_suc = true;
505 		}
506 
507 		/* If optimistic insert fail, try reorganize the page
508 		and insert again. */
509 		if (err != DB_SUCCESS && ins_suc) {
510 			btr_page_reorganize(btr_cur_get_page_cur(cursor),
511 					    index, mtr);
512 
513 			err = btr_cur_optimistic_insert(flags,
514 							cursor,
515 							&insert_offsets,
516 							&heap,
517 							node_ptr,
518 							&insert_rec,
519 							&dummy_big_rec,
520 							0, NULL, mtr);
521 
522 			/* Will do pessimistic insert */
523 			if (err != DB_SUCCESS) {
524 				ins_suc = false;
525 			}
526 		}
527 
528 		/* Insert succeed, position cursor the inserted rec.*/
529 		if (ins_suc) {
530 			btr_cur_position(index, insert_rec, block, cursor);
531 			offsets = rec_get_offsets(insert_rec,
532 						  index, offsets,
533 						  ULINT_UNDEFINED, &heap);
534 		}
535 
536 		/* Delete the rec which cursor2 point to. */
537 		if (cursor2) {
538 			ulint		cur2_pno;
539 			rec_t*		cur2_rec;
540 
541 			cursor2->page_cur.rec = page_rec_get_nth(page,
542 								 cur2_pos);
543 
544 			cur2_rec = btr_cur_get_rec(cursor2);
545 
546 			offsets2 = rec_get_offsets(cur2_rec, index, NULL,
547 						   ULINT_UNDEFINED, &heap);
548 
549 			/* If the cursor2 position is on a wrong rec, we
550 			need to reposition it. */
551 			cur2_pno = btr_node_ptr_get_child_page_no(cur2_rec, offsets2);
552 			if ((del_page_no != cur2_pno)
553 			    || (cur2_rec == insert_rec)) {
554 				cur2_rec = page_rec_get_next(
555 					page_get_infimum_rec(page));
556 
557 				while (!page_rec_is_supremum(cur2_rec)) {
558 					offsets2 = rec_get_offsets(cur2_rec, index,
559 								   NULL,
560 								   ULINT_UNDEFINED,
561 								   &heap);
562 					cur2_pno = btr_node_ptr_get_child_page_no(
563 							cur2_rec, offsets2);
564 					if (cur2_pno == del_page_no) {
565 						if (insert_rec != cur2_rec) {
566 							cursor2->page_cur.rec =
567 								cur2_rec;
568 							break;
569 						}
570 					}
571 					cur2_rec = page_rec_get_next(cur2_rec);
572 				}
573 
574 				ut_ad(!page_rec_is_supremum(cur2_rec));
575 			}
576 
577 			rec_info = rec_get_info_bits(cur2_rec,
578 						     rec_offs_comp(offsets2));
579 			if (rec_info & REC_INFO_MIN_REC_FLAG) {
580 				/* If we delete the leftmost node
581 				pointer on a non-leaf level, we must
582 				mark the new leftmost node pointer as
583 				the predefined minimum record */
584 				rec_t*	next_rec = page_rec_get_next(cur2_rec);
585 				btr_set_min_rec_mark(next_rec, mtr);
586 			}
587 
588 			ut_ad(cur2_pno == del_page_no && cur2_rec != insert_rec);
589 
590 			page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
591 					    index, offsets2, mtr);
592 		}
593 
594 		if (!ins_suc) {
595 			mem_heap_t*	new_heap = NULL;
596 
597 			err = btr_cur_pessimistic_insert(
598 				flags,
599 				cursor, &insert_offsets, &new_heap,
600 				node_ptr, &insert_rec, &dummy_big_rec,
601 				0, NULL, mtr);
602 
603 			ut_ad(err == DB_SUCCESS);
604 
605 			if (new_heap) {
606 				mem_heap_free(new_heap);
607 			}
608 
609 		}
610 
611 		if (cursor2) {
612 			btr_cur_compress_if_useful(cursor, FALSE, mtr);
613 		}
614 	}
615 
616 #ifdef UNIV_DEBUG
617 	ulint	left_page_no = btr_page_get_prev(page, mtr);
618 
619 	if (left_page_no == FIL_NULL) {
620 
621 		ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
622 			page_rec_get_next(page_get_infimum_rec(page)),
623 			page_is_comp(page)));
624 	}
625 #endif /* UNIV_DEBUG */
626 
627 	mem_heap_free(heap);
628 
629 	return(true);
630 }
631 
632 /**************************************************************//**
633 Update parent page's MBR and Predicate lock information during a split */
634 static MY_ATTRIBUTE((nonnull))
635 void
rtr_adjust_upper_level(btr_cur_t * sea_cur,ulint flags,buf_block_t * block,buf_block_t * new_block,rtr_mbr_t * mbr,rtr_mbr_t * new_mbr,ulint direction,mtr_t * mtr)636 rtr_adjust_upper_level(
637 /*===================*/
638 	btr_cur_t*	sea_cur,	/*!< in: search cursor */
639 	ulint		flags,		/*!< in: undo logging and
640 					locking flags */
641 	buf_block_t*	block,		/*!< in/out: page to be split */
642 	buf_block_t*	new_block,	/*!< in/out: the new half page */
643 	rtr_mbr_t*	mbr,		/*!< in: MBR on the old page */
644 	rtr_mbr_t*	new_mbr,	/*!< in: MBR on the new page */
645 	ulint		direction,	/*!< in: FSP_UP or FSP_DOWN */
646 	mtr_t*		mtr)		/*!< in: mtr */
647 {
648 	page_t*		page;
649 	page_t*		new_page;
650 	ulint		page_no;
651 	ulint		new_page_no;
652 	page_zip_des_t*	page_zip;
653 	page_zip_des_t*	new_page_zip;
654 	dict_index_t*	index = sea_cur->index;
655 	btr_cur_t	cursor;
656 	ulint*		offsets;
657 	mem_heap_t*	heap;
658 	ulint		level;
659 	dtuple_t*	node_ptr_upper;
660 	ulint		prev_page_no;
661 	ulint		next_page_no;
662 	ulint		space;
663 	page_cur_t*	page_cursor;
664 	rtr_mbr_t	parent_mbr;
665 	lock_prdt_t	prdt;
666 	lock_prdt_t	new_prdt;
667 	lock_prdt_t	parent_prdt;
668 	dberr_t		err;
669 	big_rec_t*	dummy_big_rec;
670 	rec_t*		rec;
671 
672 	/* Create a memory heap where the data tuple is stored */
673 	heap = mem_heap_create(1024);
674 
675 	cursor.thr = sea_cur->thr;
676 
677 	/* Get the level of the split pages */
678 	level = btr_page_get_level(buf_block_get_frame(block), mtr);
679 	ut_ad(level
680 	      == btr_page_get_level(buf_block_get_frame(new_block), mtr));
681 
682 	page = buf_block_get_frame(block);
683 	page_no = block->page.id.page_no();
684 	page_zip = buf_block_get_page_zip(block);
685 
686 	new_page = buf_block_get_frame(new_block);
687 	new_page_no = new_block->page.id.page_no();
688 	new_page_zip = buf_block_get_page_zip(new_block);
689 
690 	/* Set new mbr for the old page on the upper level. */
691 	/* Look up the index for the node pointer to page */
692 	offsets = rtr_page_get_father_block(
693 		NULL, heap, index, block, mtr, sea_cur, &cursor);
694 
695 	page_cursor = btr_cur_get_page_cur(&cursor);
696 
697 	rtr_get_mbr_from_rec(page_cursor->rec, offsets, &parent_mbr);
698 
699 	rtr_update_mbr_field(&cursor, offsets, NULL, page, mbr, NULL, mtr);
700 
701 	/* Already updated parent MBR, reset in our path */
702 	if (sea_cur->rtr_info) {
703 		node_visit_t*	node_visit = rtr_get_parent_node(
704 						sea_cur, level + 1, true);
705 		if (node_visit) {
706 			node_visit->mbr_inc = 0;
707 		}
708 	}
709 
710 	/* Insert the node for the new page. */
711 	node_ptr_upper = rtr_index_build_node_ptr(
712 		index, new_mbr,
713 		page_rec_get_next(page_get_infimum_rec(new_page)),
714 		new_page_no, heap, level);
715 
716 	ulint	up_match = 0;
717 	ulint	low_match = 0;
718 
719 	buf_block_t*	father_block = btr_cur_get_block(&cursor);
720 
721 	page_cur_search_with_match(
722 		father_block, index, node_ptr_upper,
723 		PAGE_CUR_LE , &up_match, &low_match,
724 		btr_cur_get_page_cur(&cursor), NULL);
725 
726 	err = btr_cur_optimistic_insert(
727 		flags
728 		| BTR_NO_LOCKING_FLAG
729 		| BTR_KEEP_SYS_FLAG
730 		| BTR_NO_UNDO_LOG_FLAG,
731 		&cursor, &offsets, &heap,
732 		node_ptr_upper, &rec, &dummy_big_rec, 0, NULL, mtr);
733 
734 	if (err == DB_FAIL) {
735 		cursor.rtr_info = sea_cur->rtr_info;
736 		cursor.tree_height = sea_cur->tree_height;
737 
738 		mem_heap_t *new_heap = NULL;
739 
740 		DBUG_EXECUTE_IF("rtr_page_need_first_split", {
741 			DBUG_SET("+d,rtr_page_need_second_split");
742 		});
743 
744 		err = btr_cur_pessimistic_insert(flags
745 						 | BTR_NO_LOCKING_FLAG
746 						 | BTR_KEEP_SYS_FLAG
747 						 | BTR_NO_UNDO_LOG_FLAG,
748 						 &cursor, &offsets, &new_heap,
749 						 node_ptr_upper, &rec,
750 						 &dummy_big_rec, 0, NULL, mtr);
751 
752 		DBUG_EXECUTE_IF("rtr_page_need_first_split", {
753 			DBUG_SET("-d,rtr_page_need_second_split");
754 		});
755 
756 		cursor.rtr_info = NULL;
757 		ut_a(err == DB_SUCCESS);
758 
759 		if (new_heap) {
760 			mem_heap_free(new_heap);
761 		}
762 	}
763 
764 	prdt.data = static_cast<void*>(mbr);
765 	prdt.op = 0;
766 	new_prdt.data = static_cast<void*>(new_mbr);
767 	new_prdt.op = 0;
768 	parent_prdt.data = static_cast<void*>(&parent_mbr);
769 	parent_prdt.op = 0;
770 
771 	lock_prdt_update_parent(block, new_block, &prdt, &new_prdt,
772 				&parent_prdt, dict_index_get_space(index),
773 				page_cursor->block->page.id.page_no());
774 
775 	mem_heap_free(heap);
776 
777 	/* Get the previous and next pages of page */
778 	prev_page_no = btr_page_get_prev(page, mtr);
779 	next_page_no = btr_page_get_next(page, mtr);
780 	space = block->page.id.space();
781 	const page_size_t&	page_size = dict_table_page_size(index->table);
782 
783 	/* Update page links of the level */
784 	if (prev_page_no != FIL_NULL) {
785 		page_id_t	prev_page_id(space, prev_page_no);
786 
787 		buf_block_t*	prev_block = btr_block_get(
788 			prev_page_id, page_size, RW_X_LATCH, index, mtr);
789 #ifdef UNIV_BTR_DEBUG
790 		ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
791 		ut_a(btr_page_get_next(prev_block->frame, mtr)
792 		     == block->page.id.page_no());
793 #endif /* UNIV_BTR_DEBUG */
794 
795 		btr_page_set_next(buf_block_get_frame(prev_block),
796 				  buf_block_get_page_zip(prev_block),
797 				  page_no, mtr);
798 	}
799 
800 	if (next_page_no != FIL_NULL) {
801 		page_id_t	next_page_id(space, next_page_no);
802 
803 		buf_block_t*	next_block = btr_block_get(
804 			next_page_id, page_size, RW_X_LATCH, index, mtr);
805 #ifdef UNIV_BTR_DEBUG
806 		ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
807 		ut_a(btr_page_get_prev(next_block->frame, mtr)
808 		     == page_get_page_no(page));
809 #endif /* UNIV_BTR_DEBUG */
810 
811 		btr_page_set_prev(buf_block_get_frame(next_block),
812 				  buf_block_get_page_zip(next_block),
813 				  new_page_no, mtr);
814 	}
815 
816 	btr_page_set_prev(page, page_zip, prev_page_no, mtr);
817 	btr_page_set_next(page, page_zip, new_page_no, mtr);
818 
819 	btr_page_set_prev(new_page, new_page_zip, page_no, mtr);
820 	btr_page_set_next(new_page, new_page_zip, next_page_no, mtr);
821 }
822 
823 /*************************************************************//**
824 Moves record list to another page for rtree splitting.
825 
826 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
827 if new_block is a compressed leaf page in a secondary index.
828 This has to be done either within the same mini-transaction,
829 or by invoking ibuf_reset_free_bits() before mtr_commit().
830 
831 @return TRUE on success; FALSE on compression failure */
832 ibool
rtr_split_page_move_rec_list(rtr_split_node_t * node_array,int first_rec_group,buf_block_t * new_block,buf_block_t * block,rec_t * first_rec,dict_index_t * index,mem_heap_t * heap,mtr_t * mtr)833 rtr_split_page_move_rec_list(
834 /*=========================*/
835 	rtr_split_node_t*	node_array,	/*!< in: split node array. */
836 	int			first_rec_group,/*!< in: group number of the
837 						first rec. */
838 	buf_block_t*		new_block,	/*!< in/out: index page
839 						where to move */
840 	buf_block_t*		block,		/*!< in/out: page containing
841 						split_rec */
842 	rec_t*			first_rec,	/*!< in: first record not to
843 						move */
844 	dict_index_t*		index,		/*!< in: record descriptor */
845 	mem_heap_t*		heap,		/*!< in: pointer to memory
846 						heap, or NULL */
847 	mtr_t*			mtr)		/*!< in: mtr */
848 {
849 	rtr_split_node_t*	cur_split_node;
850 	rtr_split_node_t*	end_split_node;
851 	page_cur_t		page_cursor;
852 	page_cur_t		new_page_cursor;
853 	page_t*			page;
854 	page_t*			new_page;
855 	ulint			offsets_[REC_OFFS_NORMAL_SIZE];
856 	ulint*			offsets		= offsets_;
857 	page_zip_des_t*		new_page_zip
858 		= buf_block_get_page_zip(new_block);
859 	rec_t*			rec;
860 	rec_t*			ret;
861 	ulint			moved		= 0;
862 	ulint			max_to_move	= 0;
863 	rtr_rec_move_t*		rec_move	= NULL;
864 
865 	rec_offs_init(offsets_);
866 
867 	page_cur_set_before_first(block, &page_cursor);
868 	page_cur_set_before_first(new_block, &new_page_cursor);
869 
870 	page = buf_block_get_frame(block);
871 	new_page = buf_block_get_frame(new_block);
872 	ret = page_rec_get_prev(page_get_supremum_rec(new_page));
873 
874 	end_split_node = node_array + page_get_n_recs(page);
875 
876 	mtr_log_t	log_mode = MTR_LOG_NONE;
877 
878 	if (new_page_zip) {
879 		log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
880 	}
881 
882 	max_to_move = page_get_n_recs(
883 				buf_block_get_frame(block));
884 	rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
885 			heap,
886 			sizeof (*rec_move) * max_to_move));
887 
888 	/* Insert the recs in group 2 to new page.  */
889 	for (cur_split_node = node_array;
890 	     cur_split_node < end_split_node; ++cur_split_node) {
891 		if (cur_split_node->n_node != first_rec_group) {
892 			lock_rec_store_on_page_infimum(
893 				block, cur_split_node->key);
894 
895 			offsets = rec_get_offsets(cur_split_node->key,
896 						  index, offsets,
897 						  ULINT_UNDEFINED, &heap);
898 
899 			ut_ad (cur_split_node->key != first_rec
900 			       || !page_is_leaf(page));
901 
902 			rec = page_cur_insert_rec_low(
903 					page_cur_get_rec(&new_page_cursor),
904 					index,
905 					cur_split_node->key,
906 					offsets,
907 					mtr);
908 
909 			ut_a(rec);
910 
911 			lock_rec_restore_from_page_infimum(
912 				new_block, rec, block);
913 
914 			page_cur_move_to_next(&new_page_cursor);
915 
916 			rec_move[moved].new_rec = rec;
917 			rec_move[moved].old_rec = cur_split_node->key;
918 			rec_move[moved].moved = false;
919 			moved++;
920 
921 			if (moved > max_to_move) {
922 				ut_ad(0);
923 				break;
924 			}
925 		}
926 	}
927 
928 	/* Update PAGE_MAX_TRX_ID on the uncompressed page.
929 	Modifications will be redo logged and copied to the compressed
930 	page in page_zip_compress() or page_zip_reorganize() below.
931 	Multiple transactions cannot simultaneously operate on the
932 	same temp-table in parallel.
933 	max_trx_id is ignored for temp tables because it not required
934 	for MVCC. */
935 	if (dict_index_is_sec_or_ibuf(index)
936 	    && page_is_leaf(page)
937 	    && !dict_table_is_temporary(index->table)) {
938 		page_update_max_trx_id(new_block, NULL,
939 				       page_get_max_trx_id(page),
940 				       mtr);
941 	}
942 
943 	if (new_page_zip) {
944 		mtr_set_log_mode(mtr, log_mode);
945 
946 		if (!page_zip_compress(new_page_zip, new_page, index,
947 				       page_zip_level, NULL, mtr)) {
948 			ulint	ret_pos;
949 
950 			/* Before trying to reorganize the page,
951 			store the number of preceding records on the page. */
952 			ret_pos = page_rec_get_n_recs_before(ret);
953 			/* Before copying, "ret" was the predecessor
954 			of the predefined supremum record.  If it was
955 			the predefined infimum record, then it would
956 			still be the infimum, and we would have
957 			ret_pos == 0. */
958 
959 			if (UNIV_UNLIKELY
960 			    (!page_zip_reorganize(new_block, index, mtr))) {
961 
962 				if (UNIV_UNLIKELY
963 				    (!page_zip_decompress(new_page_zip,
964 							  new_page, FALSE))) {
965 					ut_error;
966 				}
967 #ifdef UNIV_GIS_DEBUG
968 				ut_ad(page_validate(new_page, index));
969 #endif
970 
971 				return(false);
972 			}
973 
974 			/* The page was reorganized: Seek to ret_pos. */
975 			ret = page_rec_get_nth(new_page, ret_pos);
976 		}
977 	}
978 
979 	/* Update the lock table */
980 	lock_rtr_move_rec_list(new_block, block, rec_move, moved);
981 
982 	/* Delete recs in second group from the old page. */
983 	for (cur_split_node = node_array;
984 	     cur_split_node < end_split_node; ++cur_split_node) {
985 		if (cur_split_node->n_node != first_rec_group) {
986 			page_cur_position(cur_split_node->key,
987 					  block, &page_cursor);
988 			offsets = rec_get_offsets(
989 				page_cur_get_rec(&page_cursor), index,
990 				offsets, ULINT_UNDEFINED,
991 				&heap);
992 			page_cur_delete_rec(&page_cursor,
993 				index, offsets, mtr);
994 		}
995 	}
996 
997 	return(true);
998 }
999 
1000 /*************************************************************//**
1001 Splits an R-tree index page to halves and inserts the tuple. It is assumed
1002 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
1003 released within this function! NOTE that the operation of this
1004 function must always succeed, we cannot reverse it: therefore enough
1005 free disk space (2 pages) must be guaranteed to be available before
1006 this function is called.
1007 @return inserted record */
1008 rec_t*
rtr_page_split_and_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1009 rtr_page_split_and_insert(
1010 /*======================*/
1011 	ulint		flags,	/*!< in: undo logging and locking flags */
1012 	btr_cur_t*	cursor,	/*!< in/out: cursor at which to insert; when the
1013 				function returns, the cursor is positioned
1014 				on the predecessor of the inserted record */
1015 	ulint**		offsets,/*!< out: offsets on inserted record */
1016 	mem_heap_t**	heap,	/*!< in/out: pointer to memory heap, or NULL */
1017 	const dtuple_t*	tuple,	/*!< in: tuple to insert */
1018 	ulint		n_ext,	/*!< in: number of externally stored columns */
1019 	mtr_t*		mtr)	/*!< in: mtr */
1020 {
1021 	buf_block_t*		block;
1022 	page_t*			page;
1023 	page_t*			new_page;
1024 	ulint			page_no;
1025 	byte			direction;
1026 	ulint			hint_page_no;
1027 	buf_block_t*		new_block;
1028 	page_zip_des_t*		page_zip;
1029 	page_zip_des_t*		new_page_zip;
1030 	buf_block_t*		insert_block;
1031 	page_cur_t*		page_cursor;
1032 	rec_t*			rec = 0;
1033 	ulint			n_recs;
1034 	ulint			total_data;
1035 	ulint			insert_size;
1036 	rtr_split_node_t*	rtr_split_node_array;
1037 	rtr_split_node_t*	cur_split_node;
1038 	rtr_split_node_t*	end_split_node;
1039 	double*			buf_pos;
1040 	ulint			page_level;
1041 	node_seq_t		current_ssn;
1042 	node_seq_t		next_ssn;
1043 	buf_block_t*		root_block;
1044 	rtr_mbr_t		mbr;
1045 	rtr_mbr_t		new_mbr;
1046 	lock_prdt_t		prdt;
1047 	lock_prdt_t		new_prdt;
1048 	rec_t*			first_rec = NULL;
1049 	int			first_rec_group = 1;
1050 	ulint			n_iterations = 0;
1051 
1052 	if (!*heap) {
1053 		*heap = mem_heap_create(1024);
1054 	}
1055 
1056 func_start:
1057 	ut_ad(tuple->m_heap != *heap);
1058 	mem_heap_empty(*heap);
1059 	*offsets = NULL;
1060 
1061 	ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(cursor->index),
1062 					MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
1063 	ut_ad(!dict_index_is_online_ddl(cursor->index)
1064 	      || (flags & BTR_CREATE_FLAG)
1065 	      || dict_index_is_clust(cursor->index));
1066 	ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
1067 				  RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1068 
1069 	block = btr_cur_get_block(cursor);
1070 	page = buf_block_get_frame(block);
1071 	page_zip = buf_block_get_page_zip(block);
1072 	page_level = btr_page_get_level(page, mtr);
1073 	current_ssn = page_get_ssn_id(page);
1074 
1075 	ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1076 	ut_ad(page_get_n_recs(page) >= 1);
1077 
1078 	page_no = block->page.id.page_no();
1079 
1080 	if (btr_page_get_prev(page, mtr) == FIL_NULL && !page_is_leaf(page)) {
1081 		first_rec = page_rec_get_next(
1082 			page_get_infimum_rec(buf_block_get_frame(block)));
1083 	}
1084 
1085 	/* Initial split nodes array. */
1086 	rtr_split_node_array = rtr_page_split_initialize_nodes(
1087 		*heap, cursor, offsets, tuple, &buf_pos);
1088 
1089 	/* Divide all mbrs to two groups. */
1090 	n_recs = page_get_n_recs(page) + 1;
1091 
1092 	end_split_node = rtr_split_node_array + n_recs;
1093 
1094 #ifdef UNIV_GIS_DEBUG
1095 	fprintf(stderr, "Before split a page:\n");
1096 	for (cur_split_node = rtr_split_node_array;
1097 		cur_split_node < end_split_node; ++cur_split_node) {
1098 		for (int i = 0; i < SPDIMS * 2; i++) {
1099 			fprintf(stderr, "%.2lf ",
1100 			        *(cur_split_node->coords + i));
1101 		}
1102 		fprintf(stderr, "\n");
1103 	}
1104 #endif
1105 
1106 	insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
1107 	total_data = page_get_data_size(page) + insert_size;
1108 	first_rec_group = split_rtree_node(rtr_split_node_array,
1109 					   static_cast<int>(n_recs),
1110 					   static_cast<int>(total_data),
1111 					   static_cast<int>(insert_size),
1112 					   0, 2, 2, &buf_pos, SPDIMS,
1113 					   static_cast<uchar*>(first_rec));
1114 
1115 	/* Allocate a new page to the index */
1116 	direction = FSP_UP;
1117 	hint_page_no = page_no + 1;
1118 	new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
1119 				   page_level, mtr, mtr);
1120 	new_page_zip = buf_block_get_page_zip(new_block);
1121 	btr_page_create(new_block, new_page_zip, cursor->index,
1122 			page_level, mtr);
1123 
1124 	new_page = buf_block_get_frame(new_block);
1125 	ut_ad(page_get_ssn_id(new_page) == 0);
1126 
1127 	/* Set new ssn to the new page and page. */
1128 	page_set_ssn_id(new_block, new_page_zip, current_ssn, mtr);
1129 	next_ssn = rtr_get_new_ssn_id(cursor->index);
1130 
1131 	page_set_ssn_id(block, page_zip, next_ssn, mtr);
1132 
1133 	/* Keep recs in first group to the old page, move recs in second
1134 	groups to the new page. */
1135 	if (0
1136 #ifdef UNIV_ZIP_COPY
1137 	    || page_zip
1138 #endif
1139 	    || !rtr_split_page_move_rec_list(rtr_split_node_array,
1140 					     first_rec_group,
1141 					     new_block, block, first_rec,
1142 					     cursor->index, *heap, mtr)) {
1143 		ulint			n		= 0;
1144 		rec_t*			rec;
1145 		ulint			moved		= 0;
1146 		ulint			max_to_move	= 0;
1147 		rtr_rec_move_t*		rec_move	= NULL;
1148 		ulint			pos;
1149 
1150 		/* For some reason, compressing new_page failed,
1151 		even though it should contain fewer records than
1152 		the original page.  Copy the page byte for byte
1153 		and then delete the records from both pages
1154 		as appropriate.  Deleting will always succeed. */
1155 		ut_a(new_page_zip);
1156 
1157 		page_zip_copy_recs(new_page_zip, new_page,
1158 				   page_zip, page, cursor->index, mtr);
1159 
1160 		page_cursor = btr_cur_get_page_cur(cursor);
1161 
1162 		/* Move locks on recs. */
1163 		max_to_move = page_get_n_recs(page);
1164 		rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
1165 				*heap,
1166 				sizeof (*rec_move) * max_to_move));
1167 
1168 		/* Init the rec_move array for moving lock on recs.  */
1169 		for (cur_split_node = rtr_split_node_array;
1170 		     cur_split_node < end_split_node - 1; ++cur_split_node) {
1171 			if (cur_split_node->n_node != first_rec_group) {
1172 				pos = page_rec_get_n_recs_before(
1173 					cur_split_node->key);
1174 				rec = page_rec_get_nth(new_page, pos);
1175 				ut_a(rec);
1176 
1177 				rec_move[moved].new_rec = rec;
1178 				rec_move[moved].old_rec = cur_split_node->key;
1179 				rec_move[moved].moved = false;
1180 				moved++;
1181 
1182 				if (moved > max_to_move) {
1183 					ut_ad(0);
1184 					break;
1185 				}
1186 			}
1187 		}
1188 
1189 		/* Update the lock table */
1190 		lock_rtr_move_rec_list(new_block, block, rec_move, moved);
1191 
1192 		/* Delete recs in first group from the new page. */
1193 		for (cur_split_node = rtr_split_node_array;
1194 		     cur_split_node < end_split_node - 1; ++cur_split_node) {
1195 			if (cur_split_node->n_node == first_rec_group) {
1196 				ulint	pos;
1197 
1198 				pos = page_rec_get_n_recs_before(
1199 						cur_split_node->key);
1200 				ut_a(pos > 0);
1201 				rec_t* new_rec = page_rec_get_nth(new_page,
1202 								  pos - n);
1203 
1204 				ut_a(new_rec && page_rec_is_user_rec(new_rec));
1205 				page_cur_position(new_rec, new_block,
1206 						  page_cursor);
1207 
1208 				*offsets = rec_get_offsets(
1209 					page_cur_get_rec(page_cursor),
1210 					cursor->index,
1211 					*offsets, ULINT_UNDEFINED,
1212 					heap);
1213 
1214 				page_cur_delete_rec(page_cursor,
1215 					cursor->index, *offsets, mtr);
1216 				n++;
1217 			}
1218 		}
1219 
1220 		/* Delete recs in second group from the old page. */
1221 		for (cur_split_node = rtr_split_node_array;
1222 		     cur_split_node < end_split_node - 1; ++cur_split_node) {
1223 			if (cur_split_node->n_node != first_rec_group) {
1224 				page_cur_position(cur_split_node->key,
1225 						  block, page_cursor);
1226 				*offsets = rec_get_offsets(
1227 					page_cur_get_rec(page_cursor),
1228 					cursor->index,
1229 					*offsets, ULINT_UNDEFINED,
1230 					heap);
1231 				page_cur_delete_rec(page_cursor,
1232 					cursor->index, *offsets, mtr);
1233 			}
1234 		}
1235 
1236 #ifdef UNIV_GIS_DEBUG
1237 		ut_ad(page_validate(new_page, cursor->index));
1238 		ut_ad(page_validate(page, cursor->index));
1239 #endif
1240 	}
1241 
1242 	/* Insert the new rec to the proper page. */
1243 	cur_split_node = end_split_node - 1;
1244 	if (cur_split_node->n_node != first_rec_group) {
1245 		insert_block = new_block;
1246 	} else {
1247 		insert_block = block;
1248 	}
1249 
1250 	/* Reposition the cursor for insert and try insertion */
1251 	page_cursor = btr_cur_get_page_cur(cursor);
1252 
1253 	page_cur_search(insert_block, cursor->index, tuple,
1254 			PAGE_CUR_LE, page_cursor);
1255 
1256 	/* It's possible that the new record is too big to be inserted into
1257 	the page, and it'll need the second round split in this case.
1258 	We test this scenario here*/
1259 	DBUG_EXECUTE_IF("rtr_page_need_second_split",
1260 			if (n_iterations == 0) {
1261 				rec = NULL;
1262 				goto after_insert; }
1263 	);
1264 
1265 	rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
1266 				    offsets, heap, n_ext, mtr);
1267 
1268 	/* If insert did not fit, try page reorganization.
1269 	For compressed pages, page_cur_tuple_insert() will have
1270 	attempted this already. */
1271 	if (rec == NULL) {
1272 		if (!page_cur_get_page_zip(page_cursor)
1273 		    && btr_page_reorganize(page_cursor, cursor->index, mtr)) {
1274 			rec = page_cur_tuple_insert(page_cursor, tuple,
1275 						    cursor->index, offsets,
1276 						    heap, n_ext, mtr);
1277 
1278 		}
1279 		/* If insert fail, we will try to split the insert_block
1280 		again. */
1281 	}
1282 
1283 #ifdef UNIV_DEBUG
1284 after_insert:
1285 #endif
1286 	/* Calculate the mbr on the upper half-page, and the mbr on
1287 	original page. */
1288 	rtr_page_cal_mbr(cursor->index, block, &mbr, *heap);
1289 	rtr_page_cal_mbr(cursor->index, new_block, &new_mbr, *heap);
1290 	prdt.data = &mbr;
1291 	new_prdt.data = &new_mbr;
1292 
1293 	/* Check any predicate locks need to be moved/copied to the
1294 	new page */
1295 	lock_prdt_update_split(block, new_block, &prdt, &new_prdt,
1296 			       dict_index_get_space(cursor->index), page_no);
1297 
1298 	/* Adjust the upper level. */
1299 	rtr_adjust_upper_level(cursor, flags, block, new_block,
1300 			       &mbr, &new_mbr, direction, mtr);
1301 
1302 	/* Save the new ssn to the root page, since we need to reinit
1303 	the first ssn value from it after restart server. */
1304 
1305 	root_block = btr_root_block_get(cursor->index, RW_SX_LATCH, mtr);
1306 
1307 	page_zip = buf_block_get_page_zip(root_block);
1308 	page_set_ssn_id(root_block, page_zip, next_ssn, mtr);
1309 
1310 	/* Insert fit on the page: update the free bits for the
1311 	left and right pages in the same mtr */
1312 
1313 	if (page_is_leaf(page)) {
1314 		ibuf_update_free_bits_for_two_pages_low(
1315 			block, new_block, mtr);
1316 	}
1317 
1318 
1319 	/* If the new res insert fail, we need to do another split
1320 	 again. */
1321 	if (!rec) {
1322 		/* We play safe and reset the free bits for new_page */
1323 		if (!dict_index_is_clust(cursor->index)
1324 		    && !dict_table_is_temporary(cursor->index->table)) {
1325 			ibuf_reset_free_bits(new_block);
1326 			ibuf_reset_free_bits(block);
1327 		}
1328 
1329 		/* We need to clean the parent path here and search father
1330 		node later, otherwise, it's possible that find a wrong
1331 		parent. */
1332 		rtr_clean_rtr_info(cursor->rtr_info, true);
1333 		cursor->rtr_info = NULL;
1334 		n_iterations++;
1335 
1336 		rec_t* i_rec = page_rec_get_next(page_get_infimum_rec(
1337 			buf_block_get_frame(block)));
1338 		btr_cur_position(cursor->index, i_rec, block, cursor);
1339 
1340 		goto func_start;
1341 	}
1342 
1343 #ifdef UNIV_GIS_DEBUG
1344 	ut_ad(page_validate(buf_block_get_frame(block), cursor->index));
1345 	ut_ad(page_validate(buf_block_get_frame(new_block), cursor->index));
1346 
1347 	ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
1348 #endif
1349 	MONITOR_INC(MONITOR_INDEX_SPLIT);
1350 
1351 	return(rec);
1352 }
1353 
1354 /****************************************************************//**
1355 Following the right link to find the proper block for insert.
1356 @return the proper block.*/
1357 dberr_t
rtr_ins_enlarge_mbr(btr_cur_t * btr_cur,que_thr_t * thr,mtr_t * mtr)1358 rtr_ins_enlarge_mbr(
1359 /*================*/
1360 	btr_cur_t*		btr_cur,	/*!< in: btr cursor */
1361 	que_thr_t*		thr,		/*!< in: query thread */
1362 	mtr_t*			mtr)		/*!< in: mtr */
1363 {
1364 	dberr_t			err = DB_SUCCESS;
1365 	rtr_mbr_t		new_mbr;
1366 	buf_block_t*		block;
1367 	mem_heap_t*		heap;
1368 	dict_index_t*		index = btr_cur->index;
1369 	page_cur_t*		page_cursor;
1370 	ulint*			offsets;
1371 	node_visit_t*		node_visit;
1372 	page_t*			page;
1373 
1374 	ut_ad(dict_index_is_spatial(index));
1375 
1376 	/* If no rtr_info or rtree is one level tree, return. */
1377 	if (!btr_cur->rtr_info || btr_cur->tree_height == 1) {
1378 		return(err);
1379 	}
1380 
1381 	/* Check path info is not empty. */
1382 	ut_ad(!btr_cur->rtr_info->parent_path->empty());
1383 
1384 	/* Create a memory heap. */
1385 	heap = mem_heap_create(1024);
1386 
1387 	/* Leaf level page is stored in cursor */
1388 	page_cursor = btr_cur_get_page_cur(btr_cur);
1389 	block = page_cur_get_block(page_cursor);
1390 
1391 	for (ulint i = 1; i < btr_cur->tree_height; i++) {
1392 		node_visit = rtr_get_parent_node(btr_cur, i, true);
1393 		ut_ad(node_visit != NULL);
1394 
1395 		/* If there's no mbr enlarge, return.*/
1396 		if (node_visit->mbr_inc == 0) {
1397 			block = btr_pcur_get_block(node_visit->cursor);
1398 			continue;
1399 		}
1400 
1401 		/* Calculate the mbr of the child page. */
1402 		rtr_page_cal_mbr(index, block, &new_mbr, heap);
1403 
1404 		/* Get father block. */
1405 		btr_cur_t cursor;
1406 		offsets = rtr_page_get_father_block(
1407 			NULL, heap, index, block, mtr, btr_cur, &cursor);
1408 
1409 		page = buf_block_get_frame(block);
1410 
1411 		/* Update the mbr field of the rec. */
1412 		if (!rtr_update_mbr_field(&cursor, offsets, NULL, page,
1413 					  &new_mbr, NULL, mtr)) {
1414 			err = DB_ERROR;
1415 			break;
1416 		}
1417 
1418 		page_cursor = btr_cur_get_page_cur(&cursor);
1419 		block = page_cur_get_block(page_cursor);
1420 	}
1421 
1422 	mem_heap_free(heap);
1423 
1424 	return(err);
1425 }
1426 
1427 /*************************************************************//**
1428 Copy recs from a page to new_block of rtree.
1429 Differs from page_copy_rec_list_end, because this function does not
1430 touch the lock table and max trx id on page or compress the page.
1431 
1432 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1433 if new_block is a compressed leaf page in a secondary index.
1434 This has to be done either within the same mini-transaction,
1435 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
1436 void
rtr_page_copy_rec_list_end_no_locks(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mem_heap_t * heap,rtr_rec_move_t * rec_move,ulint max_move,ulint * num_moved,mtr_t * mtr)1437 rtr_page_copy_rec_list_end_no_locks(
1438 /*================================*/
1439 	buf_block_t*	new_block,	/*!< in: index page to copy to */
1440 	buf_block_t*	block,		/*!< in: index page of rec */
1441 	rec_t*		rec,		/*!< in: record on page */
1442 	dict_index_t*	index,		/*!< in: record descriptor */
1443 	mem_heap_t*	heap,		/*!< in/out: heap memory */
1444 	rtr_rec_move_t*	rec_move,	/*!< in: recording records moved */
1445 	ulint		max_move,	/*!< in: num of rec to move */
1446 	ulint*		num_moved,	/*!< out: num of rec to move */
1447 	mtr_t*		mtr)		/*!< in: mtr */
1448 {
1449 	page_t*		new_page	= buf_block_get_frame(new_block);
1450 	page_cur_t	page_cur;
1451 	page_cur_t	cur1;
1452 	rec_t*		cur_rec;
1453 	ulint		offsets_1[REC_OFFS_NORMAL_SIZE];
1454 	ulint*		offsets1 = offsets_1;
1455 	ulint		offsets_2[REC_OFFS_NORMAL_SIZE];
1456 	ulint*		offsets2 = offsets_2;
1457 	ulint		moved = 0;
1458 	bool		is_leaf = page_is_leaf(new_page);
1459 
1460 	rec_offs_init(offsets_1);
1461 	rec_offs_init(offsets_2);
1462 
1463 	page_cur_position(rec, block, &cur1);
1464 
1465 	if (page_cur_is_before_first(&cur1)) {
1466 		page_cur_move_to_next(&cur1);
1467 	}
1468 
1469 	btr_assert_not_corrupted(new_block, index);
1470 	ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
1471 	ut_a(mach_read_from_2(new_page + UNIV_PAGE_SIZE - 10) == (ulint)
1472 	     (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
1473 
1474 	cur_rec = page_rec_get_next(
1475 		page_get_infimum_rec(buf_block_get_frame(new_block)));
1476 	page_cur_position(cur_rec, new_block, &page_cur);
1477 
1478 	/* Copy records from the original page to the new page */
1479 	while (!page_cur_is_after_last(&cur1)) {
1480 		rec_t*	cur1_rec = page_cur_get_rec(&cur1);
1481 		rec_t*	ins_rec;
1482 
1483 		if (page_rec_is_infimum(cur_rec)) {
1484 			cur_rec = page_rec_get_next(cur_rec);
1485 		}
1486 
1487 		offsets1 = rec_get_offsets(cur1_rec, index, offsets1,
1488 					   ULINT_UNDEFINED, &heap);
1489 		while (!page_rec_is_supremum(cur_rec)) {
1490 			ulint		cur_matched_fields = 0;
1491 			int		cmp;
1492 
1493 			offsets2 = rec_get_offsets(cur_rec, index, offsets2,
1494 						   ULINT_UNDEFINED, &heap);
1495 			cmp = cmp_rec_rec_with_match(cur1_rec, cur_rec,
1496 						     offsets1, offsets2,
1497 						     index,
1498 						     page_is_spatial_non_leaf(cur1_rec, index),
1499 						     false,
1500 						     &cur_matched_fields);
1501 			if (cmp < 0) {
1502 				page_cur_move_to_prev(&page_cur);
1503 				break;
1504 			} else if (cmp > 0) {
1505 				/* Skip small recs. */
1506 				page_cur_move_to_next(&page_cur);
1507 				cur_rec = page_cur_get_rec(&page_cur);
1508 			} else if (is_leaf) {
1509 				if (rec_get_deleted_flag(cur1_rec,
1510 					dict_table_is_comp(index->table))) {
1511 					goto next;
1512 				} else {
1513 					/* We have two identical leaf records,
1514 					skip copying the undeleted one, and
1515 					unmark deleted on the current page */
1516 					btr_rec_set_deleted_flag(
1517 						cur_rec, NULL, FALSE);
1518 					goto next;
1519 				}
1520 			}
1521 		}
1522 
1523 		/* If position is on suprenum rec, need to move to
1524 		previous rec. */
1525 		if (page_rec_is_supremum(cur_rec)) {
1526 			page_cur_move_to_prev(&page_cur);
1527 		}
1528 
1529 		cur_rec = page_cur_get_rec(&page_cur);
1530 
1531 		offsets1 = rec_get_offsets(cur1_rec, index, offsets1,
1532 					   ULINT_UNDEFINED, &heap);
1533 
1534 		ins_rec = page_cur_insert_rec_low(cur_rec, index,
1535 						  cur1_rec, offsets1, mtr);
1536 		if (UNIV_UNLIKELY(!ins_rec)) {
1537 			fprintf(stderr, "page number %ld and %ld\n",
1538 				(long)new_block->page.id.page_no(),
1539 				(long)block->page.id.page_no());
1540 
1541 			ib::fatal() << "rec offset " << page_offset(rec)
1542 				<< ", cur1 offset "
1543 				<<  page_offset(page_cur_get_rec(&cur1))
1544 				<< ", cur_rec offset "
1545 				<< page_offset(cur_rec);
1546 		}
1547 
1548 		rec_move[moved].new_rec = ins_rec;
1549 		rec_move[moved].old_rec = cur1_rec;
1550 		rec_move[moved].moved = false;
1551 		moved++;
1552 next:
1553 		if (moved > max_move) {
1554 			ut_ad(0);
1555 			break;
1556 		}
1557 
1558 		page_cur_move_to_next(&cur1);
1559 	}
1560 
1561 	*num_moved = moved;
1562 }
1563 
1564 /*************************************************************//**
1565 Copy recs till a specified rec from a page to new_block of rtree. */
1566 void
rtr_page_copy_rec_list_start_no_locks(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mem_heap_t * heap,rtr_rec_move_t * rec_move,ulint max_move,ulint * num_moved,mtr_t * mtr)1567 rtr_page_copy_rec_list_start_no_locks(
1568 /*==================================*/
1569 	buf_block_t*	new_block,	/*!< in: index page to copy to */
1570 	buf_block_t*	block,		/*!< in: index page of rec */
1571 	rec_t*		rec,		/*!< in: record on page */
1572 	dict_index_t*	index,		/*!< in: record descriptor */
1573 	mem_heap_t*	heap,		/*!< in/out: heap memory */
1574 	rtr_rec_move_t*	rec_move,	/*!< in: recording records moved */
1575 	ulint		max_move,	/*!< in: num of rec to move */
1576 	ulint*		num_moved,	/*!< out: num of rec to move */
1577 	mtr_t*		mtr)		/*!< in: mtr */
1578 {
1579 	page_cur_t	cur1;
1580 	rec_t*		cur_rec;
1581 	ulint		offsets_1[REC_OFFS_NORMAL_SIZE];
1582 	ulint*		offsets1 = offsets_1;
1583 	ulint		offsets_2[REC_OFFS_NORMAL_SIZE];
1584 	ulint*		offsets2 = offsets_2;
1585 	page_cur_t	page_cur;
1586 	ulint		moved = 0;
1587 	bool		is_leaf = page_is_leaf(buf_block_get_frame(block));
1588 
1589 	rec_offs_init(offsets_1);
1590 	rec_offs_init(offsets_2);
1591 
1592 	page_cur_set_before_first(block, &cur1);
1593 	page_cur_move_to_next(&cur1);
1594 
1595 	cur_rec = page_rec_get_next(
1596 		page_get_infimum_rec(buf_block_get_frame(new_block)));
1597 	page_cur_position(cur_rec, new_block, &page_cur);
1598 
1599 	while (page_cur_get_rec(&cur1) != rec) {
1600 		rec_t*	cur1_rec = page_cur_get_rec(&cur1);
1601 		rec_t*	ins_rec;
1602 
1603 		if (page_rec_is_infimum(cur_rec)) {
1604 			cur_rec = page_rec_get_next(cur_rec);
1605 		}
1606 
1607 		offsets1 = rec_get_offsets(cur1_rec, index, offsets1,
1608 					   ULINT_UNDEFINED, &heap);
1609 
1610 		while (!page_rec_is_supremum(cur_rec)) {
1611 			ulint		cur_matched_fields = 0;
1612 			int		cmp;
1613 
1614 			offsets2 = rec_get_offsets(cur_rec, index, offsets2,
1615 						   ULINT_UNDEFINED, &heap);
1616 			cmp = cmp_rec_rec_with_match(cur1_rec, cur_rec,
1617 						     offsets1, offsets2,
1618 						     index,
1619 						     page_is_spatial_non_leaf(cur1_rec, index),
1620 						     false,
1621 						     &cur_matched_fields);
1622 			if (cmp < 0) {
1623 				page_cur_move_to_prev(&page_cur);
1624 				cur_rec = page_cur_get_rec(&page_cur);
1625 				break;
1626 			} else if (cmp > 0) {
1627 				/* Skip small recs. */
1628 				page_cur_move_to_next(&page_cur);
1629 				cur_rec = page_cur_get_rec(&page_cur);
1630 			} else if (is_leaf) {
1631 				if (rec_get_deleted_flag(
1632 					cur1_rec,
1633 					dict_table_is_comp(index->table))) {
1634 					goto next;
1635 				} else {
1636 					/* We have two identical leaf records,
1637 					skip copying the undeleted one, and
1638 					unmark deleted on the current page */
1639 					btr_rec_set_deleted_flag(
1640 						cur_rec, NULL, FALSE);
1641 					goto next;
1642 				}
1643 			}
1644 		}
1645 
1646 		/* If position is on suprenum rec, need to move to
1647 		previous rec. */
1648 		if (page_rec_is_supremum(cur_rec)) {
1649 			page_cur_move_to_prev(&page_cur);
1650 		}
1651 
1652 		cur_rec = page_cur_get_rec(&page_cur);
1653 
1654 		offsets1 = rec_get_offsets(cur1_rec, index, offsets1,
1655 					   ULINT_UNDEFINED, &heap);
1656 
1657 		ins_rec = page_cur_insert_rec_low(cur_rec, index,
1658 						  cur1_rec, offsets1, mtr);
1659 		if (UNIV_UNLIKELY(!ins_rec)) {
1660 			fprintf(stderr, "page number %ld and %ld\n",
1661 				(long)new_block->page.id.page_no(),
1662 				(long)block->page.id.page_no());
1663 
1664 			ib::fatal() << "rec offset " << page_offset(rec)
1665 				<< ", cur1 offset "
1666 				<<  page_offset(page_cur_get_rec(&cur1))
1667 				<< ", cur_rec offset "
1668 				<< page_offset(cur_rec);
1669 		}
1670 
1671 		rec_move[moved].new_rec = ins_rec;
1672 		rec_move[moved].old_rec = cur1_rec;
1673 		rec_move[moved].moved = false;
1674 		moved++;
1675 next:
1676 		if (moved > max_move) {
1677 			ut_ad(0);
1678 			break;
1679 		}
1680 
1681 		page_cur_move_to_next(&cur1);
1682 	}
1683 
1684 	*num_moved = moved;
1685 }
1686 
1687 /****************************************************************//**
1688 Check two MBRs are identical or need to be merged */
1689 bool
rtr_merge_mbr_changed(btr_cur_t * cursor,btr_cur_t * cursor2,ulint * offsets,ulint * offsets2,rtr_mbr_t * new_mbr,buf_block_t * merge_block,buf_block_t * block,dict_index_t * index)1690 rtr_merge_mbr_changed(
1691 /*==================*/
1692 	btr_cur_t*		cursor,		/*!< in/out: cursor */
1693 	btr_cur_t*		cursor2,	/*!< in: the other cursor */
1694 	ulint*			offsets,	/*!< in: rec offsets */
1695 	ulint*			offsets2,	/*!< in: rec offsets */
1696 	rtr_mbr_t*		new_mbr,	/*!< out: MBR to update */
1697 	buf_block_t*		merge_block,	/*!< in: page to merge */
1698 	buf_block_t*		block,		/*!< in: page be merged */
1699 	dict_index_t*		index)		/*!< in: index */
1700 {
1701 	double*		mbr;
1702 	double		mbr1[SPDIMS * 2];
1703 	double		mbr2[SPDIMS * 2];
1704 	rec_t*		rec;
1705 	ulint		len;
1706 	bool		changed = false;
1707 
1708 	ut_ad(dict_index_is_spatial(cursor->index));
1709 
1710 	rec = btr_cur_get_rec(cursor);
1711 
1712 	rtr_read_mbr(rec_get_nth_field(rec, offsets, 0, &len),
1713 		     reinterpret_cast<rtr_mbr_t*>(mbr1));
1714 
1715 	rec = btr_cur_get_rec(cursor2);
1716 
1717 	rtr_read_mbr(rec_get_nth_field(rec, offsets2, 0, &len),
1718 		     reinterpret_cast<rtr_mbr_t*>(mbr2));
1719 
1720 	mbr = reinterpret_cast<double*>(new_mbr);
1721 
1722 	for (int i = 0; i < SPDIMS * 2; i += 2) {
1723 		changed = (changed || mbr1[i] != mbr2[i]);
1724 		*mbr = mbr1[i] < mbr2[i] ? mbr1[i] : mbr2[i];
1725 		mbr++;
1726 		changed = (changed || mbr1[i + 1] != mbr2 [i + 1]);
1727 		*mbr = mbr1[i + 1] > mbr2[i + 1] ? mbr1[i + 1] : mbr2[i + 1];
1728 		mbr++;
1729 	}
1730 
1731 	return(changed);
1732 }
1733 
1734 /****************************************************************//**
1735 Merge 2 mbrs and update the the mbr that cursor is on. */
1736 dberr_t
rtr_merge_and_update_mbr(btr_cur_t * cursor,btr_cur_t * cursor2,ulint * offsets,ulint * offsets2,page_t * child_page,buf_block_t * merge_block,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1737 rtr_merge_and_update_mbr(
1738 /*=====================*/
1739 	btr_cur_t*		cursor,		/*!< in/out: cursor */
1740 	btr_cur_t*		cursor2,	/*!< in: the other cursor */
1741 	ulint*			offsets,	/*!< in: rec offsets */
1742 	ulint*			offsets2,	/*!< in: rec offsets */
1743 	page_t*			child_page,	/*!< in: the page. */
1744 	buf_block_t*		merge_block,	/*!< in: page to merge */
1745 	buf_block_t*		block,		/*!< in: page be merged */
1746 	dict_index_t*		index,		/*!< in: index */
1747 	mtr_t*			mtr)		/*!< in: mtr */
1748 {
1749 	dberr_t			err = DB_SUCCESS;
1750 	rtr_mbr_t		new_mbr;
1751 	bool			changed = false;
1752 
1753 	ut_ad(dict_index_is_spatial(cursor->index));
1754 
1755 	changed = rtr_merge_mbr_changed(cursor, cursor2, offsets, offsets2,
1756 					&new_mbr, merge_block,
1757 					block, index);
1758 
1759 	/* Update the mbr field of the rec. And will delete the record
1760 	pointed by cursor2 */
1761 	if (changed) {
1762 		if (!rtr_update_mbr_field(cursor, offsets, cursor2, child_page,
1763 					  &new_mbr, NULL, mtr)) {
1764 			err = DB_ERROR;
1765 		}
1766 	} else {
1767 		rtr_node_ptr_delete(cursor2->index, cursor2, block, mtr);
1768 	}
1769 
1770 	return(err);
1771 }
1772 
1773 /*************************************************************//**
1774 Deletes on the upper level the node pointer to a page. */
1775 void
rtr_node_ptr_delete(dict_index_t * index,btr_cur_t * cursor,buf_block_t * block,mtr_t * mtr)1776 rtr_node_ptr_delete(
1777 /*================*/
1778 	dict_index_t*	index,	/*!< in: index tree */
1779 	btr_cur_t*	cursor, /*!< in: search cursor, contains information
1780 				about parent nodes in search */
1781 	buf_block_t*	block,	/*!< in: page whose node pointer is deleted */
1782 	mtr_t*		mtr)	/*!< in: mtr */
1783 {
1784 	ibool		compressed;
1785 	dberr_t		err;
1786 
1787 	compressed = btr_cur_pessimistic_delete(&err, TRUE, cursor,
1788 						BTR_CREATE_FLAG, false, mtr);
1789 	ut_a(err == DB_SUCCESS);
1790 
1791 	if (!compressed) {
1792 		btr_cur_compress_if_useful(cursor, FALSE, mtr);
1793 	}
1794 }
1795 
1796 /**************************************************************//**
1797 Check whether a Rtree page is child of a parent page
1798 @return true if there is child/parent relationship */
1799 bool
rtr_check_same_block(dict_index_t * index,btr_cur_t * cursor,buf_block_t * parentb,buf_block_t * childb,mem_heap_t * heap)1800 rtr_check_same_block(
1801 /*================*/
1802 	dict_index_t*	index,	/*!< in: index tree */
1803 	btr_cur_t*	cursor,	/*!< in/out: position at the parent entry
1804 				pointing to the child if successful */
1805 	buf_block_t*	parentb,/*!< in: parent page to check */
1806 	buf_block_t*	childb,	/*!< in: child Page */
1807 	mem_heap_t*	heap)	/*!< in: memory heap */
1808 
1809 {
1810 	ulint		page_no = childb->page.id.page_no();
1811 	ulint*		offsets;
1812 	rec_t*		rec = page_rec_get_next(page_get_infimum_rec(
1813 				buf_block_get_frame(parentb)));
1814 
1815 	while (!page_rec_is_supremum(rec)) {
1816 		offsets = rec_get_offsets(
1817 			rec, index, NULL, ULINT_UNDEFINED, &heap);
1818 
1819 		if (btr_node_ptr_get_child_page_no(rec, offsets) == page_no) {
1820 			btr_cur_position(index, rec, parentb, cursor);
1821 			return(true);
1822 		}
1823 
1824 		rec = page_rec_get_next(rec);
1825 	}
1826 
1827 	return(false);
1828 }
1829 
1830 /****************************************************************//**
1831 Calculate the area increased for a new record
1832 @return area increased */
1833 double
rtr_rec_cal_increase(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets,double * area)1834 rtr_rec_cal_increase(
1835 /*=================*/
1836 	const dtuple_t*	dtuple,	/*!< in: data tuple to insert, which
1837 				cause area increase */
1838 	const rec_t*	rec,	/*!< in: physical record which differs from
1839 				dtuple in some of the common fields, or which
1840 				has an equal number or more fields than
1841 				dtuple */
1842 	const ulint*	offsets,/*!< in: array returned by rec_get_offsets() */
1843 	double*		area)	/*!< out: increased area */
1844 {
1845 	const dfield_t*	dtuple_field;
1846 	ulint		dtuple_f_len;
1847 	ulint		rec_f_len;
1848 	const byte*	rec_b_ptr;
1849 	double		ret = 0;
1850 
1851 	ut_ad(!page_rec_is_supremum(rec));
1852 	ut_ad(!page_rec_is_infimum(rec));
1853 
1854 	dtuple_field = dtuple_get_nth_field(dtuple, 0);
1855 	dtuple_f_len = dfield_get_len(dtuple_field);
1856 
1857 	rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1858 	ret = rtree_area_increase(
1859 		rec_b_ptr,
1860 		static_cast<const byte*>(dfield_get_data(dtuple_field)),
1861 		static_cast<int>(dtuple_f_len), area);
1862 
1863 	return(ret);
1864 }
1865 
1866 /** Estimates the number of rows in a given area.
1867 @param[in]	index	index
1868 @param[in]	tuple	range tuple containing mbr, may also be empty tuple
1869 @param[in]	mode	search mode
1870 @return estimated number of rows */
1871 int64_t
rtr_estimate_n_rows_in_range(dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode)1872 rtr_estimate_n_rows_in_range(
1873 	dict_index_t*	index,
1874 	const dtuple_t*	tuple,
1875 	page_cur_mode_t	mode)
1876 {
1877 	/* Check tuple & mode */
1878 	if (tuple->n_fields == 0) {
1879 		return(HA_POS_ERROR);
1880 	}
1881 
1882 	switch (mode) {
1883 	case PAGE_CUR_DISJOINT:
1884 	case PAGE_CUR_CONTAIN:
1885 	case PAGE_CUR_INTERSECT:
1886 	case PAGE_CUR_WITHIN:
1887 	case PAGE_CUR_MBR_EQUAL:
1888 		break;
1889 	default:
1890 		return(HA_POS_ERROR);
1891 	}
1892 
1893 	DBUG_EXECUTE_IF("rtr_pcur_move_to_next_return",
1894 		return(2);
1895 	);
1896 
1897 	/* Read mbr from tuple. */
1898 	const dfield_t*	dtuple_field;
1899 	ulint		dtuple_f_len MY_ATTRIBUTE((unused));
1900 	rtr_mbr_t	range_mbr;
1901 	double		range_area;
1902 	byte*		range_mbr_ptr;
1903 
1904 	dtuple_field = dtuple_get_nth_field(tuple, 0);
1905 	dtuple_f_len = dfield_get_len(dtuple_field);
1906 	range_mbr_ptr = reinterpret_cast<byte*>(dfield_get_data(dtuple_field));
1907 
1908 	ut_ad(dtuple_f_len >= DATA_MBR_LEN);
1909 	rtr_read_mbr(range_mbr_ptr, &range_mbr);
1910 	range_area = (range_mbr.xmax - range_mbr.xmin)
1911 		 * (range_mbr.ymax - range_mbr.ymin);
1912 
1913 	/* Get index root page. */
1914 	page_size_t	page_size(dict_table_page_size(index->table));
1915 	page_id_t	page_id(dict_index_get_space(index),
1916 				dict_index_get_page(index));
1917 	mtr_t		mtr;
1918 	buf_block_t*	block;
1919 	page_t*		page;
1920 	ulint		n_recs;
1921 
1922 	mtr_start(&mtr);
1923 	mtr.set_named_space(dict_index_get_space(index));
1924 	mtr_s_lock(dict_index_get_lock(index), &mtr);
1925 
1926 	block = btr_block_get(page_id, page_size, RW_S_LATCH, index, &mtr);
1927 	page = buf_block_get_frame(block);
1928 	n_recs = page_header_get_field(page, PAGE_N_RECS);
1929 
1930 	if (n_recs == 0) {
1931 		mtr_commit(&mtr);
1932 		return(HA_POS_ERROR);
1933 	}
1934 
1935 	rec_t*		rec;
1936 	byte*		field;
1937 	ulint		len;
1938 	ulint*		offsets = NULL;
1939 	mem_heap_t*	heap;
1940 
1941 	heap = mem_heap_create(512);
1942 	rec = page_rec_get_next(page_get_infimum_rec(page));
1943 	offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1944 
1945 	/* Scan records in root page and calculate area. */
1946 	double	area = 0;
1947 	while (!page_rec_is_supremum(rec)) {
1948 		rtr_mbr_t	mbr;
1949 		double		rec_area;
1950 
1951 		field = rec_get_nth_field(rec, offsets, 0, &len);
1952 		ut_ad(len == DATA_MBR_LEN);
1953 
1954 		rtr_read_mbr(field, &mbr);
1955 
1956 		rec_area = (mbr.xmax - mbr.xmin) * (mbr.ymax - mbr.ymin);
1957 
1958 		if (rec_area == 0) {
1959 			switch (mode) {
1960 			case PAGE_CUR_CONTAIN:
1961 			case PAGE_CUR_INTERSECT:
1962 				area += 1;
1963 				break;
1964 
1965 			case PAGE_CUR_DISJOINT:
1966 				break;
1967 
1968 			case PAGE_CUR_WITHIN:
1969 			case PAGE_CUR_MBR_EQUAL:
1970 				if (rtree_key_cmp(
1971 					PAGE_CUR_WITHIN, range_mbr_ptr,
1972 					DATA_MBR_LEN, field, DATA_MBR_LEN)
1973 				    == 0) {
1974 					area += 1;
1975 				}
1976 
1977 				break;
1978 
1979 			default:
1980 				ut_error;
1981 			}
1982 		} else {
1983 			switch (mode) {
1984 			case PAGE_CUR_CONTAIN:
1985 			case PAGE_CUR_INTERSECT:
1986 				area += rtree_area_overlapping(range_mbr_ptr,
1987 						field, DATA_MBR_LEN) / rec_area;
1988 				break;
1989 
1990 			case PAGE_CUR_DISJOINT:
1991 				area += 1;
1992 				area -= rtree_area_overlapping(range_mbr_ptr,
1993 						field, DATA_MBR_LEN) / rec_area;
1994 				break;
1995 
1996 			case PAGE_CUR_WITHIN:
1997 			case PAGE_CUR_MBR_EQUAL:
1998 				if (rtree_key_cmp(
1999 					PAGE_CUR_WITHIN, range_mbr_ptr,
2000 					DATA_MBR_LEN, field, DATA_MBR_LEN)
2001 				    == 0) {
2002 					area += range_area / rec_area;
2003 				}
2004 
2005 				break;
2006 			default:
2007 				ut_error;
2008 			}
2009 		}
2010 
2011 		rec = page_rec_get_next(rec);
2012 	}
2013 
2014 	mtr_commit(&mtr);
2015 	mem_heap_free(heap);
2016 
2017 	if (my_isinf(area) || my_isnan(area)) {
2018 		return(HA_POS_ERROR);
2019 	}
2020 
2021 	return(static_cast<int64_t>(dict_table_get_n_rows(index->table)
2022 				    * area / n_recs));
2023 }
2024