1 /*****************************************************************************
2
3 Copyright (c) 2016, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file gis/gis0rtree.cc
29 InnoDB R-tree interfaces
30
31 Created 2013/03/27 Allen Lai and Jimmy Yang
32 ***********************************************************************/
33
34 #include "fsp0fsp.h"
35 #include "page0page.h"
36 #include "page0cur.h"
37 #include "page0zip.h"
38 #include "gis0rtree.h"
39
40 #ifndef UNIV_HOTBACKUP
41 #include "btr0cur.h"
42 #include "btr0sea.h"
43 #include "btr0pcur.h"
44 #include "rem0cmp.h"
45 #include "lock0lock.h"
46 #include "ibuf0ibuf.h"
47 #include "trx0trx.h"
48 #include "srv0mon.h"
49 #include "gis0geo.h"
50
51 #endif /* UNIV_HOTBACKUP */
52
53 /*************************************************************//**
54 Initial split nodes info for R-tree split.
55 @return initialized split nodes array */
56 static
57 rtr_split_node_t*
rtr_page_split_initialize_nodes(mem_heap_t * heap,btr_cur_t * cursor,ulint ** offsets,const dtuple_t * tuple,double ** buf_pos)58 rtr_page_split_initialize_nodes(
59 /*============================*/
60 mem_heap_t* heap, /*!< in: pointer to memory heap, or NULL */
61 btr_cur_t* cursor, /*!< in: cursor at which to insert; when the
62 function returns, the cursor is positioned
63 on the predecessor of the inserted record */
64 ulint** offsets,/*!< in: offsets on inserted record */
65 const dtuple_t* tuple, /*!< in: tuple to insert */
66 double** buf_pos)/*!< in/out: current buffer position */
67 {
68 rtr_split_node_t* split_node_array;
69 double* buf;
70 ulint n_recs;
71 rtr_split_node_t* task;
72 rtr_split_node_t* stop;
73 rtr_split_node_t* cur;
74 rec_t* rec;
75 buf_block_t* block;
76 page_t* page;
77 ulint n_uniq;
78 ulint len;
79 byte* source_cur;
80
81 block = btr_cur_get_block(cursor);
82 page = buf_block_get_frame(block);
83 n_uniq = dict_index_get_n_unique_in_tree(cursor->index);
84
85 n_recs = page_get_n_recs(page) + 1;
86
87 /*We reserve 2 MBRs memory space for temp result of split
88 algrithm. And plus the new mbr that need to insert, we
89 need (n_recs + 3)*MBR size for storing all MBRs.*/
90 buf = static_cast<double*>(mem_heap_alloc(
91 heap, DATA_MBR_LEN * (n_recs + 3)
92 + sizeof(rtr_split_node_t) * (n_recs + 1)));
93
94 split_node_array = (rtr_split_node_t*)(buf + SPDIMS * 2 * (n_recs + 3));
95 task = split_node_array;
96 *buf_pos = buf;
97 stop = task + n_recs;
98
99 rec = page_rec_get_next(page_get_infimum_rec(page));
100 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
101 n_uniq, &heap);
102
103 source_cur = rec_get_nth_field(rec, *offsets, 0, &len);
104
105 for (cur = task; cur < stop - 1; ++cur) {
106 cur->coords = reserve_coords(buf_pos, SPDIMS);
107 cur->key = rec;
108
109 memcpy(cur->coords, source_cur, DATA_MBR_LEN);
110
111 rec = page_rec_get_next(rec);
112 *offsets = rec_get_offsets(rec, cursor->index, *offsets,
113 n_uniq, &heap);
114 source_cur = rec_get_nth_field(rec, *offsets, 0, &len);
115 }
116
117 /* Put the insert key to node list */
118 source_cur = static_cast<byte*>(dfield_get_data(
119 dtuple_get_nth_field(tuple, 0)));
120 cur->coords = reserve_coords(buf_pos, SPDIMS);
121 rec = (byte*) mem_heap_alloc(
122 heap, rec_get_converted_size(cursor->index, tuple, 0));
123
124 rec = rec_convert_dtuple_to_rec(rec, cursor->index, tuple, 0);
125 cur->key = rec;
126
127 memcpy(cur->coords, source_cur, DATA_MBR_LEN);
128
129 return split_node_array;
130 }
131
132 /**********************************************************************//**
133 Builds a Rtree node pointer out of a physical record and a page number.
134 Note: For Rtree, we just keep the mbr and page no field in non-leaf level
135 page. It's different with Btree, Btree still keeps PK fields so far.
136 @return own: node pointer */
137 dtuple_t*
rtr_index_build_node_ptr(const dict_index_t * index,const rtr_mbr_t * mbr,const rec_t * rec,ulint page_no,mem_heap_t * heap,ulint level)138 rtr_index_build_node_ptr(
139 /*=====================*/
140 const dict_index_t* index, /*!< in: index */
141 const rtr_mbr_t* mbr, /*!< in: mbr of lower page */
142 const rec_t* rec, /*!< in: record for which to build node
143 pointer */
144 ulint page_no,/*!< in: page number to put in node
145 pointer */
146 mem_heap_t* heap, /*!< in: memory heap where pointer
147 created */
148 ulint level) /*!< in: level of rec in tree:
149 0 means leaf level */
150 {
151 dtuple_t* tuple;
152 dfield_t* field;
153 byte* buf;
154 ulint n_unique;
155 ulint info_bits;
156
157 ut_ad(dict_index_is_spatial(index));
158
159 n_unique = DICT_INDEX_SPATIAL_NODEPTR_SIZE;
160
161 tuple = dtuple_create(heap, n_unique + 1);
162
163 /* For rtree internal node, we need to compare page number
164 fields. */
165 dtuple_set_n_fields_cmp(tuple, n_unique + 1);
166
167 dict_index_copy_types(tuple, index, n_unique);
168
169 /* Write page no field */
170 buf = static_cast<byte*>(mem_heap_alloc(heap, 4));
171
172 mach_write_to_4(buf, page_no);
173
174 field = dtuple_get_nth_field(tuple, n_unique);
175 dfield_set_data(field, buf, 4);
176
177 dtype_set(dfield_get_type(field), DATA_SYS_CHILD, DATA_NOT_NULL, 4);
178
179 /* Set info bits. */
180 info_bits = rec_get_info_bits(rec, dict_table_is_comp(index->table));
181 dtuple_set_info_bits(tuple, info_bits | REC_STATUS_NODE_PTR);
182
183 /* Set mbr as index entry data */
184 field = dtuple_get_nth_field(tuple, 0);
185
186 buf = static_cast<byte*>(mem_heap_alloc(heap, DATA_MBR_LEN));
187
188 rtr_write_mbr(buf, mbr);
189
190 dfield_set_data(field, buf, DATA_MBR_LEN);
191
192 ut_ad(dtuple_check_typed(tuple));
193
194 return(tuple);
195 }
196
197 /**************************************************************//**
198 In-place update the mbr field of a spatial index row.
199 @return true if update is successful */
200 static
201 bool
rtr_update_mbr_field_in_place(dict_index_t * index,rec_t * rec,ulint * offsets,rtr_mbr_t * mbr,mtr_t * mtr)202 rtr_update_mbr_field_in_place(
203 /*==========================*/
204 dict_index_t* index, /*!< in: spatial index. */
205 rec_t* rec, /*!< in/out: rec to be modified.*/
206 ulint* offsets, /*!< in/out: offsets on rec. */
207 rtr_mbr_t* mbr, /*!< in: the new mbr. */
208 mtr_t* mtr) /*!< in: mtr */
209 {
210 void* new_mbr_ptr;
211 double new_mbr[SPDIMS * 2];
212 byte* log_ptr;
213 page_t* page = page_align(rec);
214 ulint len = DATA_MBR_LEN;
215 ulint flags = BTR_NO_UNDO_LOG_FLAG
216 | BTR_NO_LOCKING_FLAG
217 | BTR_KEEP_SYS_FLAG;
218 ulint rec_info;
219
220 rtr_write_mbr(reinterpret_cast<byte*>(&new_mbr), mbr);
221 new_mbr_ptr = static_cast<void*>(new_mbr);
222 /* Otherwise, set the mbr to the new_mbr. */
223 rec_set_nth_field(rec, offsets, 0, new_mbr_ptr, len);
224
225 rec_info = rec_get_info_bits(rec, rec_offs_comp(offsets));
226
227 /* Write redo log. */
228 /* For now, we use LOG_REC_UPDATE_IN_PLACE to log this enlarge.
229 In the future, we may need to add a new log type for this. */
230 log_ptr = mlog_open_and_write_index(mtr, rec, index, page_is_comp(page)
231 ? MLOG_COMP_REC_UPDATE_IN_PLACE
232 : MLOG_REC_UPDATE_IN_PLACE,
233 1 + DATA_ROLL_PTR_LEN + 14 + 2
234 + MLOG_BUF_MARGIN);
235
236 if (!log_ptr) {
237 /* Logging in mtr is switched off during
238 crash recovery */
239 return(false);
240 }
241
242 /* Flags */
243 mach_write_to_1(log_ptr, flags);
244 log_ptr++;
245 /* TRX_ID Position */
246 log_ptr += mach_write_compressed(log_ptr, 0);
247 /* ROLL_PTR */
248 trx_write_roll_ptr(log_ptr, 0);
249 log_ptr += DATA_ROLL_PTR_LEN;
250 /* TRX_ID */
251 log_ptr += mach_u64_write_compressed(log_ptr, 0);
252
253 /* Offset */
254 mach_write_to_2(log_ptr, page_offset(rec));
255 log_ptr += 2;
256 /* Info bits */
257 mach_write_to_1(log_ptr, rec_info);
258 log_ptr++;
259 /* N fields */
260 log_ptr += mach_write_compressed(log_ptr, 1);
261 /* Field no, len */
262 log_ptr += mach_write_compressed(log_ptr, 0);
263 log_ptr += mach_write_compressed(log_ptr, len);
264 /* Data */
265 memcpy(log_ptr, new_mbr_ptr, len);
266 log_ptr += len;
267
268 mlog_close(mtr, log_ptr);
269
270 return(true);
271 }
272
273 /**************************************************************//**
274 Update the mbr field of a spatial index row.
275 @return true if update is successful */
276 bool
rtr_update_mbr_field(btr_cur_t * cursor,ulint * offsets,btr_cur_t * cursor2,page_t * child_page,rtr_mbr_t * mbr,rec_t * new_rec,mtr_t * mtr)277 rtr_update_mbr_field(
278 /*=================*/
279 btr_cur_t* cursor, /*!< in/out: cursor pointed to rec.*/
280 ulint* offsets, /*!< in/out: offsets on rec. */
281 btr_cur_t* cursor2, /*!< in/out: cursor pointed to rec
282 that should be deleted.
283 this cursor is for btr_compress to
284 delete the merged page's father rec.*/
285 page_t* child_page, /*!< in: child page. */
286 rtr_mbr_t* mbr, /*!< in: the new mbr. */
287 rec_t* new_rec, /*!< in: rec to use */
288 mtr_t* mtr) /*!< in: mtr */
289 {
290 dict_index_t* index = cursor->index;
291 mem_heap_t* heap;
292 page_t* page;
293 rec_t* rec;
294 ulint flags = BTR_NO_UNDO_LOG_FLAG
295 | BTR_NO_LOCKING_FLAG
296 | BTR_KEEP_SYS_FLAG;
297 dberr_t err;
298 big_rec_t* dummy_big_rec;
299 buf_block_t* block;
300 rec_t* child_rec;
301 ulint up_match = 0;
302 ulint low_match = 0;
303 ulint child;
304 ulint level;
305 ulint rec_info;
306 page_zip_des_t* page_zip;
307 bool ins_suc = true;
308 ulint cur2_pos = 0;
309 ulint del_page_no = 0;
310 ulint* offsets2;
311
312 rec = btr_cur_get_rec(cursor);
313 page = page_align(rec);
314
315 rec_info = rec_get_info_bits(rec, rec_offs_comp(offsets));
316
317 heap = mem_heap_create(100);
318 block = btr_cur_get_block(cursor);
319 ut_ad(page == buf_block_get_frame(block));
320 page_zip = buf_block_get_page_zip(block);
321
322 child = btr_node_ptr_get_child_page_no(rec, offsets);
323 level = btr_page_get_level(buf_block_get_frame(block), mtr);
324
325 if (new_rec) {
326 child_rec = new_rec;
327 } else {
328 child_rec = page_rec_get_next(page_get_infimum_rec(child_page));
329 }
330
331 dtuple_t* node_ptr = rtr_index_build_node_ptr(
332 index, mbr, child_rec, child, heap, level);
333
334 /* We need to remember the child page no of cursor2, since page could be
335 reorganized or insert a new rec before it. */
336 if (cursor2) {
337 rec_t* del_rec = btr_cur_get_rec(cursor2);
338 offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2),
339 index, NULL,
340 ULINT_UNDEFINED, &heap);
341 del_page_no = btr_node_ptr_get_child_page_no(del_rec, offsets2);
342 cur2_pos = page_rec_get_n_recs_before(btr_cur_get_rec(cursor2));
343 }
344
345 if (rec_info & REC_INFO_MIN_REC_FLAG) {
346 /* When the rec is minimal rec in this level, we do
347 in-place update for avoiding it move to other place. */
348
349 if (page_zip) {
350 /* Check if there's enough space for in-place
351 update the zip page. */
352 if (!btr_cur_update_alloc_zip(
353 page_zip,
354 btr_cur_get_page_cur(cursor),
355 index, offsets,
356 rec_offs_size(offsets),
357 false, mtr)) {
358
359 /* If there's not enought space for
360 inplace update zip page, we do delete
361 insert. */
362 ins_suc = false;
363
364 /* Since btr_cur_update_alloc_zip could
365 reorganize the page, we need to repositon
366 cursor2. */
367 if (cursor2) {
368 cursor2->page_cur.rec =
369 page_rec_get_nth(page,
370 cur2_pos);
371 }
372
373 goto update_mbr;
374 }
375
376 /* Record could be repositioned */
377 rec = btr_cur_get_rec(cursor);
378
379 #ifdef UNIV_DEBUG
380 /* Make sure it is still the first record */
381 rec_info = rec_get_info_bits(
382 rec, rec_offs_comp(offsets));
383 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
384 #endif /* UNIV_DEBUG */
385 }
386
387 if (!rtr_update_mbr_field_in_place(index, rec,
388 offsets, mbr, mtr)) {
389 return(false);
390 }
391
392 if (page_zip) {
393 page_zip_write_rec(page_zip, rec, index, offsets, 0);
394 }
395
396 if (cursor2) {
397 ulint* offsets2;
398
399 if (page_zip) {
400 cursor2->page_cur.rec
401 = page_rec_get_nth(page, cur2_pos);
402 }
403 offsets2 = rec_get_offsets(btr_cur_get_rec(cursor2),
404 index, NULL,
405 ULINT_UNDEFINED, &heap);
406 ut_ad(del_page_no == btr_node_ptr_get_child_page_no(
407 cursor2->page_cur.rec,
408 offsets2));
409
410 page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
411 index, offsets2, mtr);
412 }
413 } else if (page_get_n_recs(page) == 1) {
414 /* When there's only one rec in the page, we do insert/delete to
415 avoid page merge. */
416
417 page_cur_t page_cur;
418 rec_t* insert_rec;
419 ulint* insert_offsets = NULL;
420 ulint old_pos;
421 rec_t* old_rec;
422
423 ut_ad(cursor2 == NULL);
424
425 /* Insert the new mbr rec. */
426 old_pos = page_rec_get_n_recs_before(rec);
427
428 err = btr_cur_optimistic_insert(
429 flags,
430 cursor, &insert_offsets, &heap,
431 node_ptr, &insert_rec, &dummy_big_rec, 0, NULL, mtr);
432
433 ut_ad(err == DB_SUCCESS);
434
435 btr_cur_position(index, insert_rec, block, cursor);
436
437 /* Delete the old mbr rec. */
438 old_rec = page_rec_get_nth(page, old_pos);
439 ut_ad(old_rec != insert_rec);
440
441 page_cur_position(old_rec, block, &page_cur);
442 offsets2 = rec_get_offsets(old_rec,
443 index, NULL,
444 ULINT_UNDEFINED, &heap);
445 page_cur_delete_rec(&page_cur, index, offsets2, mtr);
446
447 } else {
448 update_mbr:
449 /* When there're not only 1 rec in the page, we do delete/insert
450 to avoid page split. */
451 rec_t* insert_rec;
452 ulint* insert_offsets = NULL;
453 rec_t* next_rec;
454
455 /* Delete the rec which cursor point to. */
456 next_rec = page_rec_get_next(rec);
457 page_cur_delete_rec(btr_cur_get_page_cur(cursor),
458 index, offsets, mtr);
459 if (!ins_suc) {
460 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
461
462 btr_set_min_rec_mark(next_rec, mtr);
463 }
464
465 /* If there's more than 1 rec left in the page, delete
466 the rec which cursor2 point to. Otherwise, delete it later.*/
467 if (cursor2 && page_get_n_recs(page) > 1) {
468 ulint cur2_rec_info;
469 rec_t* cur2_rec;
470
471 cur2_rec = cursor2->page_cur.rec;
472 offsets2 = rec_get_offsets(cur2_rec, index, NULL,
473 ULINT_UNDEFINED, &heap);
474
475 cur2_rec_info = rec_get_info_bits(cur2_rec,
476 rec_offs_comp(offsets2));
477 if (cur2_rec_info & REC_INFO_MIN_REC_FLAG) {
478 /* If we delete the leftmost node
479 pointer on a non-leaf level, we must
480 mark the new leftmost node pointer as
481 the predefined minimum record */
482 rec_t* next_rec = page_rec_get_next(cur2_rec);
483 btr_set_min_rec_mark(next_rec, mtr);
484 }
485
486 ut_ad(del_page_no
487 == btr_node_ptr_get_child_page_no(cur2_rec,
488 offsets2));
489 page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
490 index, offsets2, mtr);
491 cursor2 = NULL;
492 }
493
494 /* Insert the new rec. */
495 page_cur_search_with_match(block, index, node_ptr,
496 PAGE_CUR_LE , &up_match, &low_match,
497 btr_cur_get_page_cur(cursor), NULL);
498
499 err = btr_cur_optimistic_insert(flags, cursor, &insert_offsets,
500 &heap, node_ptr, &insert_rec,
501 &dummy_big_rec, 0, NULL, mtr);
502
503 if (!ins_suc && err == DB_SUCCESS) {
504 ins_suc = true;
505 }
506
507 /* If optimistic insert fail, try reorganize the page
508 and insert again. */
509 if (err != DB_SUCCESS && ins_suc) {
510 btr_page_reorganize(btr_cur_get_page_cur(cursor),
511 index, mtr);
512
513 err = btr_cur_optimistic_insert(flags,
514 cursor,
515 &insert_offsets,
516 &heap,
517 node_ptr,
518 &insert_rec,
519 &dummy_big_rec,
520 0, NULL, mtr);
521
522 /* Will do pessimistic insert */
523 if (err != DB_SUCCESS) {
524 ins_suc = false;
525 }
526 }
527
528 /* Insert succeed, position cursor the inserted rec.*/
529 if (ins_suc) {
530 btr_cur_position(index, insert_rec, block, cursor);
531 offsets = rec_get_offsets(insert_rec,
532 index, offsets,
533 ULINT_UNDEFINED, &heap);
534 }
535
536 /* Delete the rec which cursor2 point to. */
537 if (cursor2) {
538 ulint cur2_pno;
539 rec_t* cur2_rec;
540
541 cursor2->page_cur.rec = page_rec_get_nth(page,
542 cur2_pos);
543
544 cur2_rec = btr_cur_get_rec(cursor2);
545
546 offsets2 = rec_get_offsets(cur2_rec, index, NULL,
547 ULINT_UNDEFINED, &heap);
548
549 /* If the cursor2 position is on a wrong rec, we
550 need to reposition it. */
551 cur2_pno = btr_node_ptr_get_child_page_no(cur2_rec, offsets2);
552 if ((del_page_no != cur2_pno)
553 || (cur2_rec == insert_rec)) {
554 cur2_rec = page_rec_get_next(
555 page_get_infimum_rec(page));
556
557 while (!page_rec_is_supremum(cur2_rec)) {
558 offsets2 = rec_get_offsets(cur2_rec, index,
559 NULL,
560 ULINT_UNDEFINED,
561 &heap);
562 cur2_pno = btr_node_ptr_get_child_page_no(
563 cur2_rec, offsets2);
564 if (cur2_pno == del_page_no) {
565 if (insert_rec != cur2_rec) {
566 cursor2->page_cur.rec =
567 cur2_rec;
568 break;
569 }
570 }
571 cur2_rec = page_rec_get_next(cur2_rec);
572 }
573
574 ut_ad(!page_rec_is_supremum(cur2_rec));
575 }
576
577 rec_info = rec_get_info_bits(cur2_rec,
578 rec_offs_comp(offsets2));
579 if (rec_info & REC_INFO_MIN_REC_FLAG) {
580 /* If we delete the leftmost node
581 pointer on a non-leaf level, we must
582 mark the new leftmost node pointer as
583 the predefined minimum record */
584 rec_t* next_rec = page_rec_get_next(cur2_rec);
585 btr_set_min_rec_mark(next_rec, mtr);
586 }
587
588 ut_ad(cur2_pno == del_page_no && cur2_rec != insert_rec);
589
590 page_cur_delete_rec(btr_cur_get_page_cur(cursor2),
591 index, offsets2, mtr);
592 }
593
594 if (!ins_suc) {
595 mem_heap_t* new_heap = NULL;
596
597 err = btr_cur_pessimistic_insert(
598 flags,
599 cursor, &insert_offsets, &new_heap,
600 node_ptr, &insert_rec, &dummy_big_rec,
601 0, NULL, mtr);
602
603 ut_ad(err == DB_SUCCESS);
604
605 if (new_heap) {
606 mem_heap_free(new_heap);
607 }
608
609 }
610
611 if (cursor2) {
612 btr_cur_compress_if_useful(cursor, FALSE, mtr);
613 }
614 }
615
616 #ifdef UNIV_DEBUG
617 ulint left_page_no = btr_page_get_prev(page, mtr);
618
619 if (left_page_no == FIL_NULL) {
620
621 ut_a(REC_INFO_MIN_REC_FLAG & rec_get_info_bits(
622 page_rec_get_next(page_get_infimum_rec(page)),
623 page_is_comp(page)));
624 }
625 #endif /* UNIV_DEBUG */
626
627 mem_heap_free(heap);
628
629 return(true);
630 }
631
632 /**************************************************************//**
633 Update parent page's MBR and Predicate lock information during a split */
634 static MY_ATTRIBUTE((nonnull))
635 void
rtr_adjust_upper_level(btr_cur_t * sea_cur,ulint flags,buf_block_t * block,buf_block_t * new_block,rtr_mbr_t * mbr,rtr_mbr_t * new_mbr,ulint direction,mtr_t * mtr)636 rtr_adjust_upper_level(
637 /*===================*/
638 btr_cur_t* sea_cur, /*!< in: search cursor */
639 ulint flags, /*!< in: undo logging and
640 locking flags */
641 buf_block_t* block, /*!< in/out: page to be split */
642 buf_block_t* new_block, /*!< in/out: the new half page */
643 rtr_mbr_t* mbr, /*!< in: MBR on the old page */
644 rtr_mbr_t* new_mbr, /*!< in: MBR on the new page */
645 ulint direction, /*!< in: FSP_UP or FSP_DOWN */
646 mtr_t* mtr) /*!< in: mtr */
647 {
648 page_t* page;
649 page_t* new_page;
650 ulint page_no;
651 ulint new_page_no;
652 page_zip_des_t* page_zip;
653 page_zip_des_t* new_page_zip;
654 dict_index_t* index = sea_cur->index;
655 btr_cur_t cursor;
656 ulint* offsets;
657 mem_heap_t* heap;
658 ulint level;
659 dtuple_t* node_ptr_upper;
660 ulint prev_page_no;
661 ulint next_page_no;
662 ulint space;
663 page_cur_t* page_cursor;
664 rtr_mbr_t parent_mbr;
665 lock_prdt_t prdt;
666 lock_prdt_t new_prdt;
667 lock_prdt_t parent_prdt;
668 dberr_t err;
669 big_rec_t* dummy_big_rec;
670 rec_t* rec;
671
672 /* Create a memory heap where the data tuple is stored */
673 heap = mem_heap_create(1024);
674
675 cursor.thr = sea_cur->thr;
676
677 /* Get the level of the split pages */
678 level = btr_page_get_level(buf_block_get_frame(block), mtr);
679 ut_ad(level
680 == btr_page_get_level(buf_block_get_frame(new_block), mtr));
681
682 page = buf_block_get_frame(block);
683 page_no = block->page.id.page_no();
684 page_zip = buf_block_get_page_zip(block);
685
686 new_page = buf_block_get_frame(new_block);
687 new_page_no = new_block->page.id.page_no();
688 new_page_zip = buf_block_get_page_zip(new_block);
689
690 /* Set new mbr for the old page on the upper level. */
691 /* Look up the index for the node pointer to page */
692 offsets = rtr_page_get_father_block(
693 NULL, heap, index, block, mtr, sea_cur, &cursor);
694
695 page_cursor = btr_cur_get_page_cur(&cursor);
696
697 rtr_get_mbr_from_rec(page_cursor->rec, offsets, &parent_mbr);
698
699 rtr_update_mbr_field(&cursor, offsets, NULL, page, mbr, NULL, mtr);
700
701 /* Already updated parent MBR, reset in our path */
702 if (sea_cur->rtr_info) {
703 node_visit_t* node_visit = rtr_get_parent_node(
704 sea_cur, level + 1, true);
705 if (node_visit) {
706 node_visit->mbr_inc = 0;
707 }
708 }
709
710 /* Insert the node for the new page. */
711 node_ptr_upper = rtr_index_build_node_ptr(
712 index, new_mbr,
713 page_rec_get_next(page_get_infimum_rec(new_page)),
714 new_page_no, heap, level);
715
716 ulint up_match = 0;
717 ulint low_match = 0;
718
719 buf_block_t* father_block = btr_cur_get_block(&cursor);
720
721 page_cur_search_with_match(
722 father_block, index, node_ptr_upper,
723 PAGE_CUR_LE , &up_match, &low_match,
724 btr_cur_get_page_cur(&cursor), NULL);
725
726 err = btr_cur_optimistic_insert(
727 flags
728 | BTR_NO_LOCKING_FLAG
729 | BTR_KEEP_SYS_FLAG
730 | BTR_NO_UNDO_LOG_FLAG,
731 &cursor, &offsets, &heap,
732 node_ptr_upper, &rec, &dummy_big_rec, 0, NULL, mtr);
733
734 if (err == DB_FAIL) {
735 cursor.rtr_info = sea_cur->rtr_info;
736 cursor.tree_height = sea_cur->tree_height;
737
738 mem_heap_t *new_heap = NULL;
739
740 DBUG_EXECUTE_IF("rtr_page_need_first_split", {
741 DBUG_SET("+d,rtr_page_need_second_split");
742 });
743
744 err = btr_cur_pessimistic_insert(flags
745 | BTR_NO_LOCKING_FLAG
746 | BTR_KEEP_SYS_FLAG
747 | BTR_NO_UNDO_LOG_FLAG,
748 &cursor, &offsets, &new_heap,
749 node_ptr_upper, &rec,
750 &dummy_big_rec, 0, NULL, mtr);
751
752 DBUG_EXECUTE_IF("rtr_page_need_first_split", {
753 DBUG_SET("-d,rtr_page_need_second_split");
754 });
755
756 cursor.rtr_info = NULL;
757 ut_a(err == DB_SUCCESS);
758
759 if (new_heap) {
760 mem_heap_free(new_heap);
761 }
762 }
763
764 prdt.data = static_cast<void*>(mbr);
765 prdt.op = 0;
766 new_prdt.data = static_cast<void*>(new_mbr);
767 new_prdt.op = 0;
768 parent_prdt.data = static_cast<void*>(&parent_mbr);
769 parent_prdt.op = 0;
770
771 lock_prdt_update_parent(block, new_block, &prdt, &new_prdt,
772 &parent_prdt, dict_index_get_space(index),
773 page_cursor->block->page.id.page_no());
774
775 mem_heap_free(heap);
776
777 /* Get the previous and next pages of page */
778 prev_page_no = btr_page_get_prev(page, mtr);
779 next_page_no = btr_page_get_next(page, mtr);
780 space = block->page.id.space();
781 const page_size_t& page_size = dict_table_page_size(index->table);
782
783 /* Update page links of the level */
784 if (prev_page_no != FIL_NULL) {
785 page_id_t prev_page_id(space, prev_page_no);
786
787 buf_block_t* prev_block = btr_block_get(
788 prev_page_id, page_size, RW_X_LATCH, index, mtr);
789 #ifdef UNIV_BTR_DEBUG
790 ut_a(page_is_comp(prev_block->frame) == page_is_comp(page));
791 ut_a(btr_page_get_next(prev_block->frame, mtr)
792 == block->page.id.page_no());
793 #endif /* UNIV_BTR_DEBUG */
794
795 btr_page_set_next(buf_block_get_frame(prev_block),
796 buf_block_get_page_zip(prev_block),
797 page_no, mtr);
798 }
799
800 if (next_page_no != FIL_NULL) {
801 page_id_t next_page_id(space, next_page_no);
802
803 buf_block_t* next_block = btr_block_get(
804 next_page_id, page_size, RW_X_LATCH, index, mtr);
805 #ifdef UNIV_BTR_DEBUG
806 ut_a(page_is_comp(next_block->frame) == page_is_comp(page));
807 ut_a(btr_page_get_prev(next_block->frame, mtr)
808 == page_get_page_no(page));
809 #endif /* UNIV_BTR_DEBUG */
810
811 btr_page_set_prev(buf_block_get_frame(next_block),
812 buf_block_get_page_zip(next_block),
813 new_page_no, mtr);
814 }
815
816 btr_page_set_prev(page, page_zip, prev_page_no, mtr);
817 btr_page_set_next(page, page_zip, new_page_no, mtr);
818
819 btr_page_set_prev(new_page, new_page_zip, page_no, mtr);
820 btr_page_set_next(new_page, new_page_zip, next_page_no, mtr);
821 }
822
823 /*************************************************************//**
824 Moves record list to another page for rtree splitting.
825
826 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
827 if new_block is a compressed leaf page in a secondary index.
828 This has to be done either within the same mini-transaction,
829 or by invoking ibuf_reset_free_bits() before mtr_commit().
830
831 @return TRUE on success; FALSE on compression failure */
832 ibool
rtr_split_page_move_rec_list(rtr_split_node_t * node_array,int first_rec_group,buf_block_t * new_block,buf_block_t * block,rec_t * first_rec,dict_index_t * index,mem_heap_t * heap,mtr_t * mtr)833 rtr_split_page_move_rec_list(
834 /*=========================*/
835 rtr_split_node_t* node_array, /*!< in: split node array. */
836 int first_rec_group,/*!< in: group number of the
837 first rec. */
838 buf_block_t* new_block, /*!< in/out: index page
839 where to move */
840 buf_block_t* block, /*!< in/out: page containing
841 split_rec */
842 rec_t* first_rec, /*!< in: first record not to
843 move */
844 dict_index_t* index, /*!< in: record descriptor */
845 mem_heap_t* heap, /*!< in: pointer to memory
846 heap, or NULL */
847 mtr_t* mtr) /*!< in: mtr */
848 {
849 rtr_split_node_t* cur_split_node;
850 rtr_split_node_t* end_split_node;
851 page_cur_t page_cursor;
852 page_cur_t new_page_cursor;
853 page_t* page;
854 page_t* new_page;
855 ulint offsets_[REC_OFFS_NORMAL_SIZE];
856 ulint* offsets = offsets_;
857 page_zip_des_t* new_page_zip
858 = buf_block_get_page_zip(new_block);
859 rec_t* rec;
860 rec_t* ret;
861 ulint moved = 0;
862 ulint max_to_move = 0;
863 rtr_rec_move_t* rec_move = NULL;
864
865 rec_offs_init(offsets_);
866
867 page_cur_set_before_first(block, &page_cursor);
868 page_cur_set_before_first(new_block, &new_page_cursor);
869
870 page = buf_block_get_frame(block);
871 new_page = buf_block_get_frame(new_block);
872 ret = page_rec_get_prev(page_get_supremum_rec(new_page));
873
874 end_split_node = node_array + page_get_n_recs(page);
875
876 mtr_log_t log_mode = MTR_LOG_NONE;
877
878 if (new_page_zip) {
879 log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
880 }
881
882 max_to_move = page_get_n_recs(
883 buf_block_get_frame(block));
884 rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
885 heap,
886 sizeof (*rec_move) * max_to_move));
887
888 /* Insert the recs in group 2 to new page. */
889 for (cur_split_node = node_array;
890 cur_split_node < end_split_node; ++cur_split_node) {
891 if (cur_split_node->n_node != first_rec_group) {
892 lock_rec_store_on_page_infimum(
893 block, cur_split_node->key);
894
895 offsets = rec_get_offsets(cur_split_node->key,
896 index, offsets,
897 ULINT_UNDEFINED, &heap);
898
899 ut_ad (cur_split_node->key != first_rec
900 || !page_is_leaf(page));
901
902 rec = page_cur_insert_rec_low(
903 page_cur_get_rec(&new_page_cursor),
904 index,
905 cur_split_node->key,
906 offsets,
907 mtr);
908
909 ut_a(rec);
910
911 lock_rec_restore_from_page_infimum(
912 new_block, rec, block);
913
914 page_cur_move_to_next(&new_page_cursor);
915
916 rec_move[moved].new_rec = rec;
917 rec_move[moved].old_rec = cur_split_node->key;
918 rec_move[moved].moved = false;
919 moved++;
920
921 if (moved > max_to_move) {
922 ut_ad(0);
923 break;
924 }
925 }
926 }
927
928 /* Update PAGE_MAX_TRX_ID on the uncompressed page.
929 Modifications will be redo logged and copied to the compressed
930 page in page_zip_compress() or page_zip_reorganize() below.
931 Multiple transactions cannot simultaneously operate on the
932 same temp-table in parallel.
933 max_trx_id is ignored for temp tables because it not required
934 for MVCC. */
935 if (dict_index_is_sec_or_ibuf(index)
936 && page_is_leaf(page)
937 && !dict_table_is_temporary(index->table)) {
938 page_update_max_trx_id(new_block, NULL,
939 page_get_max_trx_id(page),
940 mtr);
941 }
942
943 if (new_page_zip) {
944 mtr_set_log_mode(mtr, log_mode);
945
946 if (!page_zip_compress(new_page_zip, new_page, index,
947 page_zip_level, NULL, mtr)) {
948 ulint ret_pos;
949
950 /* Before trying to reorganize the page,
951 store the number of preceding records on the page. */
952 ret_pos = page_rec_get_n_recs_before(ret);
953 /* Before copying, "ret" was the predecessor
954 of the predefined supremum record. If it was
955 the predefined infimum record, then it would
956 still be the infimum, and we would have
957 ret_pos == 0. */
958
959 if (UNIV_UNLIKELY
960 (!page_zip_reorganize(new_block, index, mtr))) {
961
962 if (UNIV_UNLIKELY
963 (!page_zip_decompress(new_page_zip,
964 new_page, FALSE))) {
965 ut_error;
966 }
967 #ifdef UNIV_GIS_DEBUG
968 ut_ad(page_validate(new_page, index));
969 #endif
970
971 return(false);
972 }
973
974 /* The page was reorganized: Seek to ret_pos. */
975 ret = page_rec_get_nth(new_page, ret_pos);
976 }
977 }
978
979 /* Update the lock table */
980 lock_rtr_move_rec_list(new_block, block, rec_move, moved);
981
982 /* Delete recs in second group from the old page. */
983 for (cur_split_node = node_array;
984 cur_split_node < end_split_node; ++cur_split_node) {
985 if (cur_split_node->n_node != first_rec_group) {
986 page_cur_position(cur_split_node->key,
987 block, &page_cursor);
988 offsets = rec_get_offsets(
989 page_cur_get_rec(&page_cursor), index,
990 offsets, ULINT_UNDEFINED,
991 &heap);
992 page_cur_delete_rec(&page_cursor,
993 index, offsets, mtr);
994 }
995 }
996
997 return(true);
998 }
999
1000 /*************************************************************//**
1001 Splits an R-tree index page to halves and inserts the tuple. It is assumed
1002 that mtr holds an x-latch to the index tree. NOTE: the tree x-latch is
1003 released within this function! NOTE that the operation of this
1004 function must always succeed, we cannot reverse it: therefore enough
1005 free disk space (2 pages) must be guaranteed to be available before
1006 this function is called.
1007 @return inserted record */
1008 rec_t*
rtr_page_split_and_insert(ulint flags,btr_cur_t * cursor,ulint ** offsets,mem_heap_t ** heap,const dtuple_t * tuple,ulint n_ext,mtr_t * mtr)1009 rtr_page_split_and_insert(
1010 /*======================*/
1011 ulint flags, /*!< in: undo logging and locking flags */
1012 btr_cur_t* cursor, /*!< in/out: cursor at which to insert; when the
1013 function returns, the cursor is positioned
1014 on the predecessor of the inserted record */
1015 ulint** offsets,/*!< out: offsets on inserted record */
1016 mem_heap_t** heap, /*!< in/out: pointer to memory heap, or NULL */
1017 const dtuple_t* tuple, /*!< in: tuple to insert */
1018 ulint n_ext, /*!< in: number of externally stored columns */
1019 mtr_t* mtr) /*!< in: mtr */
1020 {
1021 buf_block_t* block;
1022 page_t* page;
1023 page_t* new_page;
1024 ulint page_no;
1025 byte direction;
1026 ulint hint_page_no;
1027 buf_block_t* new_block;
1028 page_zip_des_t* page_zip;
1029 page_zip_des_t* new_page_zip;
1030 buf_block_t* insert_block;
1031 page_cur_t* page_cursor;
1032 rec_t* rec = 0;
1033 ulint n_recs;
1034 ulint total_data;
1035 ulint insert_size;
1036 rtr_split_node_t* rtr_split_node_array;
1037 rtr_split_node_t* cur_split_node;
1038 rtr_split_node_t* end_split_node;
1039 double* buf_pos;
1040 ulint page_level;
1041 node_seq_t current_ssn;
1042 node_seq_t next_ssn;
1043 buf_block_t* root_block;
1044 rtr_mbr_t mbr;
1045 rtr_mbr_t new_mbr;
1046 lock_prdt_t prdt;
1047 lock_prdt_t new_prdt;
1048 rec_t* first_rec = NULL;
1049 int first_rec_group = 1;
1050 ulint n_iterations = 0;
1051
1052 if (!*heap) {
1053 *heap = mem_heap_create(1024);
1054 }
1055
1056 func_start:
1057 ut_ad(tuple->m_heap != *heap);
1058 mem_heap_empty(*heap);
1059 *offsets = NULL;
1060
1061 ut_ad(mtr_memo_contains_flagged(mtr, dict_index_get_lock(cursor->index),
1062 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK));
1063 ut_ad(!dict_index_is_online_ddl(cursor->index)
1064 || (flags & BTR_CREATE_FLAG)
1065 || dict_index_is_clust(cursor->index));
1066 ut_ad(rw_lock_own_flagged(dict_index_get_lock(cursor->index),
1067 RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));
1068
1069 block = btr_cur_get_block(cursor);
1070 page = buf_block_get_frame(block);
1071 page_zip = buf_block_get_page_zip(block);
1072 page_level = btr_page_get_level(page, mtr);
1073 current_ssn = page_get_ssn_id(page);
1074
1075 ut_ad(mtr_memo_contains(mtr, block, MTR_MEMO_PAGE_X_FIX));
1076 ut_ad(page_get_n_recs(page) >= 1);
1077
1078 page_no = block->page.id.page_no();
1079
1080 if (btr_page_get_prev(page, mtr) == FIL_NULL && !page_is_leaf(page)) {
1081 first_rec = page_rec_get_next(
1082 page_get_infimum_rec(buf_block_get_frame(block)));
1083 }
1084
1085 /* Initial split nodes array. */
1086 rtr_split_node_array = rtr_page_split_initialize_nodes(
1087 *heap, cursor, offsets, tuple, &buf_pos);
1088
1089 /* Divide all mbrs to two groups. */
1090 n_recs = page_get_n_recs(page) + 1;
1091
1092 end_split_node = rtr_split_node_array + n_recs;
1093
1094 #ifdef UNIV_GIS_DEBUG
1095 fprintf(stderr, "Before split a page:\n");
1096 for (cur_split_node = rtr_split_node_array;
1097 cur_split_node < end_split_node; ++cur_split_node) {
1098 for (int i = 0; i < SPDIMS * 2; i++) {
1099 fprintf(stderr, "%.2lf ",
1100 *(cur_split_node->coords + i));
1101 }
1102 fprintf(stderr, "\n");
1103 }
1104 #endif
1105
1106 insert_size = rec_get_converted_size(cursor->index, tuple, n_ext);
1107 total_data = page_get_data_size(page) + insert_size;
1108 first_rec_group = split_rtree_node(rtr_split_node_array,
1109 static_cast<int>(n_recs),
1110 static_cast<int>(total_data),
1111 static_cast<int>(insert_size),
1112 0, 2, 2, &buf_pos, SPDIMS,
1113 static_cast<uchar*>(first_rec));
1114
1115 /* Allocate a new page to the index */
1116 direction = FSP_UP;
1117 hint_page_no = page_no + 1;
1118 new_block = btr_page_alloc(cursor->index, hint_page_no, direction,
1119 page_level, mtr, mtr);
1120 new_page_zip = buf_block_get_page_zip(new_block);
1121 btr_page_create(new_block, new_page_zip, cursor->index,
1122 page_level, mtr);
1123
1124 new_page = buf_block_get_frame(new_block);
1125 ut_ad(page_get_ssn_id(new_page) == 0);
1126
1127 /* Set new ssn to the new page and page. */
1128 page_set_ssn_id(new_block, new_page_zip, current_ssn, mtr);
1129 next_ssn = rtr_get_new_ssn_id(cursor->index);
1130
1131 page_set_ssn_id(block, page_zip, next_ssn, mtr);
1132
1133 /* Keep recs in first group to the old page, move recs in second
1134 groups to the new page. */
1135 if (0
1136 #ifdef UNIV_ZIP_COPY
1137 || page_zip
1138 #endif
1139 || !rtr_split_page_move_rec_list(rtr_split_node_array,
1140 first_rec_group,
1141 new_block, block, first_rec,
1142 cursor->index, *heap, mtr)) {
1143 ulint n = 0;
1144 rec_t* rec;
1145 ulint moved = 0;
1146 ulint max_to_move = 0;
1147 rtr_rec_move_t* rec_move = NULL;
1148 ulint pos;
1149
1150 /* For some reason, compressing new_page failed,
1151 even though it should contain fewer records than
1152 the original page. Copy the page byte for byte
1153 and then delete the records from both pages
1154 as appropriate. Deleting will always succeed. */
1155 ut_a(new_page_zip);
1156
1157 page_zip_copy_recs(new_page_zip, new_page,
1158 page_zip, page, cursor->index, mtr);
1159
1160 page_cursor = btr_cur_get_page_cur(cursor);
1161
1162 /* Move locks on recs. */
1163 max_to_move = page_get_n_recs(page);
1164 rec_move = static_cast<rtr_rec_move_t*>(mem_heap_alloc(
1165 *heap,
1166 sizeof (*rec_move) * max_to_move));
1167
1168 /* Init the rec_move array for moving lock on recs. */
1169 for (cur_split_node = rtr_split_node_array;
1170 cur_split_node < end_split_node - 1; ++cur_split_node) {
1171 if (cur_split_node->n_node != first_rec_group) {
1172 pos = page_rec_get_n_recs_before(
1173 cur_split_node->key);
1174 rec = page_rec_get_nth(new_page, pos);
1175 ut_a(rec);
1176
1177 rec_move[moved].new_rec = rec;
1178 rec_move[moved].old_rec = cur_split_node->key;
1179 rec_move[moved].moved = false;
1180 moved++;
1181
1182 if (moved > max_to_move) {
1183 ut_ad(0);
1184 break;
1185 }
1186 }
1187 }
1188
1189 /* Update the lock table */
1190 lock_rtr_move_rec_list(new_block, block, rec_move, moved);
1191
1192 /* Delete recs in first group from the new page. */
1193 for (cur_split_node = rtr_split_node_array;
1194 cur_split_node < end_split_node - 1; ++cur_split_node) {
1195 if (cur_split_node->n_node == first_rec_group) {
1196 ulint pos;
1197
1198 pos = page_rec_get_n_recs_before(
1199 cur_split_node->key);
1200 ut_a(pos > 0);
1201 rec_t* new_rec = page_rec_get_nth(new_page,
1202 pos - n);
1203
1204 ut_a(new_rec && page_rec_is_user_rec(new_rec));
1205 page_cur_position(new_rec, new_block,
1206 page_cursor);
1207
1208 *offsets = rec_get_offsets(
1209 page_cur_get_rec(page_cursor),
1210 cursor->index,
1211 *offsets, ULINT_UNDEFINED,
1212 heap);
1213
1214 page_cur_delete_rec(page_cursor,
1215 cursor->index, *offsets, mtr);
1216 n++;
1217 }
1218 }
1219
1220 /* Delete recs in second group from the old page. */
1221 for (cur_split_node = rtr_split_node_array;
1222 cur_split_node < end_split_node - 1; ++cur_split_node) {
1223 if (cur_split_node->n_node != first_rec_group) {
1224 page_cur_position(cur_split_node->key,
1225 block, page_cursor);
1226 *offsets = rec_get_offsets(
1227 page_cur_get_rec(page_cursor),
1228 cursor->index,
1229 *offsets, ULINT_UNDEFINED,
1230 heap);
1231 page_cur_delete_rec(page_cursor,
1232 cursor->index, *offsets, mtr);
1233 }
1234 }
1235
1236 #ifdef UNIV_GIS_DEBUG
1237 ut_ad(page_validate(new_page, cursor->index));
1238 ut_ad(page_validate(page, cursor->index));
1239 #endif
1240 }
1241
1242 /* Insert the new rec to the proper page. */
1243 cur_split_node = end_split_node - 1;
1244 if (cur_split_node->n_node != first_rec_group) {
1245 insert_block = new_block;
1246 } else {
1247 insert_block = block;
1248 }
1249
1250 /* Reposition the cursor for insert and try insertion */
1251 page_cursor = btr_cur_get_page_cur(cursor);
1252
1253 page_cur_search(insert_block, cursor->index, tuple,
1254 PAGE_CUR_LE, page_cursor);
1255
1256 /* It's possible that the new record is too big to be inserted into
1257 the page, and it'll need the second round split in this case.
1258 We test this scenario here*/
1259 DBUG_EXECUTE_IF("rtr_page_need_second_split",
1260 if (n_iterations == 0) {
1261 rec = NULL;
1262 goto after_insert; }
1263 );
1264
1265 rec = page_cur_tuple_insert(page_cursor, tuple, cursor->index,
1266 offsets, heap, n_ext, mtr);
1267
1268 /* If insert did not fit, try page reorganization.
1269 For compressed pages, page_cur_tuple_insert() will have
1270 attempted this already. */
1271 if (rec == NULL) {
1272 if (!page_cur_get_page_zip(page_cursor)
1273 && btr_page_reorganize(page_cursor, cursor->index, mtr)) {
1274 rec = page_cur_tuple_insert(page_cursor, tuple,
1275 cursor->index, offsets,
1276 heap, n_ext, mtr);
1277
1278 }
1279 /* If insert fail, we will try to split the insert_block
1280 again. */
1281 }
1282
1283 #ifdef UNIV_DEBUG
1284 after_insert:
1285 #endif
1286 /* Calculate the mbr on the upper half-page, and the mbr on
1287 original page. */
1288 rtr_page_cal_mbr(cursor->index, block, &mbr, *heap);
1289 rtr_page_cal_mbr(cursor->index, new_block, &new_mbr, *heap);
1290 prdt.data = &mbr;
1291 new_prdt.data = &new_mbr;
1292
1293 /* Check any predicate locks need to be moved/copied to the
1294 new page */
1295 lock_prdt_update_split(block, new_block, &prdt, &new_prdt,
1296 dict_index_get_space(cursor->index), page_no);
1297
1298 /* Adjust the upper level. */
1299 rtr_adjust_upper_level(cursor, flags, block, new_block,
1300 &mbr, &new_mbr, direction, mtr);
1301
1302 /* Save the new ssn to the root page, since we need to reinit
1303 the first ssn value from it after restart server. */
1304
1305 root_block = btr_root_block_get(cursor->index, RW_SX_LATCH, mtr);
1306
1307 page_zip = buf_block_get_page_zip(root_block);
1308 page_set_ssn_id(root_block, page_zip, next_ssn, mtr);
1309
1310 /* Insert fit on the page: update the free bits for the
1311 left and right pages in the same mtr */
1312
1313 if (page_is_leaf(page)) {
1314 ibuf_update_free_bits_for_two_pages_low(
1315 block, new_block, mtr);
1316 }
1317
1318
1319 /* If the new res insert fail, we need to do another split
1320 again. */
1321 if (!rec) {
1322 /* We play safe and reset the free bits for new_page */
1323 if (!dict_index_is_clust(cursor->index)
1324 && !dict_table_is_temporary(cursor->index->table)) {
1325 ibuf_reset_free_bits(new_block);
1326 ibuf_reset_free_bits(block);
1327 }
1328
1329 /* We need to clean the parent path here and search father
1330 node later, otherwise, it's possible that find a wrong
1331 parent. */
1332 rtr_clean_rtr_info(cursor->rtr_info, true);
1333 cursor->rtr_info = NULL;
1334 n_iterations++;
1335
1336 rec_t* i_rec = page_rec_get_next(page_get_infimum_rec(
1337 buf_block_get_frame(block)));
1338 btr_cur_position(cursor->index, i_rec, block, cursor);
1339
1340 goto func_start;
1341 }
1342
1343 #ifdef UNIV_GIS_DEBUG
1344 ut_ad(page_validate(buf_block_get_frame(block), cursor->index));
1345 ut_ad(page_validate(buf_block_get_frame(new_block), cursor->index));
1346
1347 ut_ad(!rec || rec_offs_validate(rec, cursor->index, *offsets));
1348 #endif
1349 MONITOR_INC(MONITOR_INDEX_SPLIT);
1350
1351 return(rec);
1352 }
1353
1354 /****************************************************************//**
1355 Following the right link to find the proper block for insert.
1356 @return the proper block.*/
1357 dberr_t
rtr_ins_enlarge_mbr(btr_cur_t * btr_cur,que_thr_t * thr,mtr_t * mtr)1358 rtr_ins_enlarge_mbr(
1359 /*================*/
1360 btr_cur_t* btr_cur, /*!< in: btr cursor */
1361 que_thr_t* thr, /*!< in: query thread */
1362 mtr_t* mtr) /*!< in: mtr */
1363 {
1364 dberr_t err = DB_SUCCESS;
1365 rtr_mbr_t new_mbr;
1366 buf_block_t* block;
1367 mem_heap_t* heap;
1368 dict_index_t* index = btr_cur->index;
1369 page_cur_t* page_cursor;
1370 ulint* offsets;
1371 node_visit_t* node_visit;
1372 page_t* page;
1373
1374 ut_ad(dict_index_is_spatial(index));
1375
1376 /* If no rtr_info or rtree is one level tree, return. */
1377 if (!btr_cur->rtr_info || btr_cur->tree_height == 1) {
1378 return(err);
1379 }
1380
1381 /* Check path info is not empty. */
1382 ut_ad(!btr_cur->rtr_info->parent_path->empty());
1383
1384 /* Create a memory heap. */
1385 heap = mem_heap_create(1024);
1386
1387 /* Leaf level page is stored in cursor */
1388 page_cursor = btr_cur_get_page_cur(btr_cur);
1389 block = page_cur_get_block(page_cursor);
1390
1391 for (ulint i = 1; i < btr_cur->tree_height; i++) {
1392 node_visit = rtr_get_parent_node(btr_cur, i, true);
1393 ut_ad(node_visit != NULL);
1394
1395 /* If there's no mbr enlarge, return.*/
1396 if (node_visit->mbr_inc == 0) {
1397 block = btr_pcur_get_block(node_visit->cursor);
1398 continue;
1399 }
1400
1401 /* Calculate the mbr of the child page. */
1402 rtr_page_cal_mbr(index, block, &new_mbr, heap);
1403
1404 /* Get father block. */
1405 btr_cur_t cursor;
1406 offsets = rtr_page_get_father_block(
1407 NULL, heap, index, block, mtr, btr_cur, &cursor);
1408
1409 page = buf_block_get_frame(block);
1410
1411 /* Update the mbr field of the rec. */
1412 if (!rtr_update_mbr_field(&cursor, offsets, NULL, page,
1413 &new_mbr, NULL, mtr)) {
1414 err = DB_ERROR;
1415 break;
1416 }
1417
1418 page_cursor = btr_cur_get_page_cur(&cursor);
1419 block = page_cur_get_block(page_cursor);
1420 }
1421
1422 mem_heap_free(heap);
1423
1424 return(err);
1425 }
1426
1427 /*************************************************************//**
1428 Copy recs from a page to new_block of rtree.
1429 Differs from page_copy_rec_list_end, because this function does not
1430 touch the lock table and max trx id on page or compress the page.
1431
1432 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1433 if new_block is a compressed leaf page in a secondary index.
1434 This has to be done either within the same mini-transaction,
1435 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
1436 void
rtr_page_copy_rec_list_end_no_locks(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mem_heap_t * heap,rtr_rec_move_t * rec_move,ulint max_move,ulint * num_moved,mtr_t * mtr)1437 rtr_page_copy_rec_list_end_no_locks(
1438 /*================================*/
1439 buf_block_t* new_block, /*!< in: index page to copy to */
1440 buf_block_t* block, /*!< in: index page of rec */
1441 rec_t* rec, /*!< in: record on page */
1442 dict_index_t* index, /*!< in: record descriptor */
1443 mem_heap_t* heap, /*!< in/out: heap memory */
1444 rtr_rec_move_t* rec_move, /*!< in: recording records moved */
1445 ulint max_move, /*!< in: num of rec to move */
1446 ulint* num_moved, /*!< out: num of rec to move */
1447 mtr_t* mtr) /*!< in: mtr */
1448 {
1449 page_t* new_page = buf_block_get_frame(new_block);
1450 page_cur_t page_cur;
1451 page_cur_t cur1;
1452 rec_t* cur_rec;
1453 ulint offsets_1[REC_OFFS_NORMAL_SIZE];
1454 ulint* offsets1 = offsets_1;
1455 ulint offsets_2[REC_OFFS_NORMAL_SIZE];
1456 ulint* offsets2 = offsets_2;
1457 ulint moved = 0;
1458 bool is_leaf = page_is_leaf(new_page);
1459
1460 rec_offs_init(offsets_1);
1461 rec_offs_init(offsets_2);
1462
1463 page_cur_position(rec, block, &cur1);
1464
1465 if (page_cur_is_before_first(&cur1)) {
1466 page_cur_move_to_next(&cur1);
1467 }
1468
1469 btr_assert_not_corrupted(new_block, index);
1470 ut_a(page_is_comp(new_page) == page_rec_is_comp(rec));
1471 ut_a(mach_read_from_2(new_page + UNIV_PAGE_SIZE - 10) == (ulint)
1472 (page_is_comp(new_page) ? PAGE_NEW_INFIMUM : PAGE_OLD_INFIMUM));
1473
1474 cur_rec = page_rec_get_next(
1475 page_get_infimum_rec(buf_block_get_frame(new_block)));
1476 page_cur_position(cur_rec, new_block, &page_cur);
1477
1478 /* Copy records from the original page to the new page */
1479 while (!page_cur_is_after_last(&cur1)) {
1480 rec_t* cur1_rec = page_cur_get_rec(&cur1);
1481 rec_t* ins_rec;
1482
1483 if (page_rec_is_infimum(cur_rec)) {
1484 cur_rec = page_rec_get_next(cur_rec);
1485 }
1486
1487 offsets1 = rec_get_offsets(cur1_rec, index, offsets1,
1488 ULINT_UNDEFINED, &heap);
1489 while (!page_rec_is_supremum(cur_rec)) {
1490 ulint cur_matched_fields = 0;
1491 int cmp;
1492
1493 offsets2 = rec_get_offsets(cur_rec, index, offsets2,
1494 ULINT_UNDEFINED, &heap);
1495 cmp = cmp_rec_rec_with_match(cur1_rec, cur_rec,
1496 offsets1, offsets2,
1497 index,
1498 page_is_spatial_non_leaf(cur1_rec, index),
1499 false,
1500 &cur_matched_fields);
1501 if (cmp < 0) {
1502 page_cur_move_to_prev(&page_cur);
1503 break;
1504 } else if (cmp > 0) {
1505 /* Skip small recs. */
1506 page_cur_move_to_next(&page_cur);
1507 cur_rec = page_cur_get_rec(&page_cur);
1508 } else if (is_leaf) {
1509 if (rec_get_deleted_flag(cur1_rec,
1510 dict_table_is_comp(index->table))) {
1511 goto next;
1512 } else {
1513 /* We have two identical leaf records,
1514 skip copying the undeleted one, and
1515 unmark deleted on the current page */
1516 btr_rec_set_deleted_flag(
1517 cur_rec, NULL, FALSE);
1518 goto next;
1519 }
1520 }
1521 }
1522
1523 /* If position is on suprenum rec, need to move to
1524 previous rec. */
1525 if (page_rec_is_supremum(cur_rec)) {
1526 page_cur_move_to_prev(&page_cur);
1527 }
1528
1529 cur_rec = page_cur_get_rec(&page_cur);
1530
1531 offsets1 = rec_get_offsets(cur1_rec, index, offsets1,
1532 ULINT_UNDEFINED, &heap);
1533
1534 ins_rec = page_cur_insert_rec_low(cur_rec, index,
1535 cur1_rec, offsets1, mtr);
1536 if (UNIV_UNLIKELY(!ins_rec)) {
1537 fprintf(stderr, "page number %ld and %ld\n",
1538 (long)new_block->page.id.page_no(),
1539 (long)block->page.id.page_no());
1540
1541 ib::fatal() << "rec offset " << page_offset(rec)
1542 << ", cur1 offset "
1543 << page_offset(page_cur_get_rec(&cur1))
1544 << ", cur_rec offset "
1545 << page_offset(cur_rec);
1546 }
1547
1548 rec_move[moved].new_rec = ins_rec;
1549 rec_move[moved].old_rec = cur1_rec;
1550 rec_move[moved].moved = false;
1551 moved++;
1552 next:
1553 if (moved > max_move) {
1554 ut_ad(0);
1555 break;
1556 }
1557
1558 page_cur_move_to_next(&cur1);
1559 }
1560
1561 *num_moved = moved;
1562 }
1563
1564 /*************************************************************//**
1565 Copy recs till a specified rec from a page to new_block of rtree. */
1566 void
rtr_page_copy_rec_list_start_no_locks(buf_block_t * new_block,buf_block_t * block,rec_t * rec,dict_index_t * index,mem_heap_t * heap,rtr_rec_move_t * rec_move,ulint max_move,ulint * num_moved,mtr_t * mtr)1567 rtr_page_copy_rec_list_start_no_locks(
1568 /*==================================*/
1569 buf_block_t* new_block, /*!< in: index page to copy to */
1570 buf_block_t* block, /*!< in: index page of rec */
1571 rec_t* rec, /*!< in: record on page */
1572 dict_index_t* index, /*!< in: record descriptor */
1573 mem_heap_t* heap, /*!< in/out: heap memory */
1574 rtr_rec_move_t* rec_move, /*!< in: recording records moved */
1575 ulint max_move, /*!< in: num of rec to move */
1576 ulint* num_moved, /*!< out: num of rec to move */
1577 mtr_t* mtr) /*!< in: mtr */
1578 {
1579 page_cur_t cur1;
1580 rec_t* cur_rec;
1581 ulint offsets_1[REC_OFFS_NORMAL_SIZE];
1582 ulint* offsets1 = offsets_1;
1583 ulint offsets_2[REC_OFFS_NORMAL_SIZE];
1584 ulint* offsets2 = offsets_2;
1585 page_cur_t page_cur;
1586 ulint moved = 0;
1587 bool is_leaf = page_is_leaf(buf_block_get_frame(block));
1588
1589 rec_offs_init(offsets_1);
1590 rec_offs_init(offsets_2);
1591
1592 page_cur_set_before_first(block, &cur1);
1593 page_cur_move_to_next(&cur1);
1594
1595 cur_rec = page_rec_get_next(
1596 page_get_infimum_rec(buf_block_get_frame(new_block)));
1597 page_cur_position(cur_rec, new_block, &page_cur);
1598
1599 while (page_cur_get_rec(&cur1) != rec) {
1600 rec_t* cur1_rec = page_cur_get_rec(&cur1);
1601 rec_t* ins_rec;
1602
1603 if (page_rec_is_infimum(cur_rec)) {
1604 cur_rec = page_rec_get_next(cur_rec);
1605 }
1606
1607 offsets1 = rec_get_offsets(cur1_rec, index, offsets1,
1608 ULINT_UNDEFINED, &heap);
1609
1610 while (!page_rec_is_supremum(cur_rec)) {
1611 ulint cur_matched_fields = 0;
1612 int cmp;
1613
1614 offsets2 = rec_get_offsets(cur_rec, index, offsets2,
1615 ULINT_UNDEFINED, &heap);
1616 cmp = cmp_rec_rec_with_match(cur1_rec, cur_rec,
1617 offsets1, offsets2,
1618 index,
1619 page_is_spatial_non_leaf(cur1_rec, index),
1620 false,
1621 &cur_matched_fields);
1622 if (cmp < 0) {
1623 page_cur_move_to_prev(&page_cur);
1624 cur_rec = page_cur_get_rec(&page_cur);
1625 break;
1626 } else if (cmp > 0) {
1627 /* Skip small recs. */
1628 page_cur_move_to_next(&page_cur);
1629 cur_rec = page_cur_get_rec(&page_cur);
1630 } else if (is_leaf) {
1631 if (rec_get_deleted_flag(
1632 cur1_rec,
1633 dict_table_is_comp(index->table))) {
1634 goto next;
1635 } else {
1636 /* We have two identical leaf records,
1637 skip copying the undeleted one, and
1638 unmark deleted on the current page */
1639 btr_rec_set_deleted_flag(
1640 cur_rec, NULL, FALSE);
1641 goto next;
1642 }
1643 }
1644 }
1645
1646 /* If position is on suprenum rec, need to move to
1647 previous rec. */
1648 if (page_rec_is_supremum(cur_rec)) {
1649 page_cur_move_to_prev(&page_cur);
1650 }
1651
1652 cur_rec = page_cur_get_rec(&page_cur);
1653
1654 offsets1 = rec_get_offsets(cur1_rec, index, offsets1,
1655 ULINT_UNDEFINED, &heap);
1656
1657 ins_rec = page_cur_insert_rec_low(cur_rec, index,
1658 cur1_rec, offsets1, mtr);
1659 if (UNIV_UNLIKELY(!ins_rec)) {
1660 fprintf(stderr, "page number %ld and %ld\n",
1661 (long)new_block->page.id.page_no(),
1662 (long)block->page.id.page_no());
1663
1664 ib::fatal() << "rec offset " << page_offset(rec)
1665 << ", cur1 offset "
1666 << page_offset(page_cur_get_rec(&cur1))
1667 << ", cur_rec offset "
1668 << page_offset(cur_rec);
1669 }
1670
1671 rec_move[moved].new_rec = ins_rec;
1672 rec_move[moved].old_rec = cur1_rec;
1673 rec_move[moved].moved = false;
1674 moved++;
1675 next:
1676 if (moved > max_move) {
1677 ut_ad(0);
1678 break;
1679 }
1680
1681 page_cur_move_to_next(&cur1);
1682 }
1683
1684 *num_moved = moved;
1685 }
1686
1687 /****************************************************************//**
1688 Check two MBRs are identical or need to be merged */
1689 bool
rtr_merge_mbr_changed(btr_cur_t * cursor,btr_cur_t * cursor2,ulint * offsets,ulint * offsets2,rtr_mbr_t * new_mbr,buf_block_t * merge_block,buf_block_t * block,dict_index_t * index)1690 rtr_merge_mbr_changed(
1691 /*==================*/
1692 btr_cur_t* cursor, /*!< in/out: cursor */
1693 btr_cur_t* cursor2, /*!< in: the other cursor */
1694 ulint* offsets, /*!< in: rec offsets */
1695 ulint* offsets2, /*!< in: rec offsets */
1696 rtr_mbr_t* new_mbr, /*!< out: MBR to update */
1697 buf_block_t* merge_block, /*!< in: page to merge */
1698 buf_block_t* block, /*!< in: page be merged */
1699 dict_index_t* index) /*!< in: index */
1700 {
1701 double* mbr;
1702 double mbr1[SPDIMS * 2];
1703 double mbr2[SPDIMS * 2];
1704 rec_t* rec;
1705 ulint len;
1706 bool changed = false;
1707
1708 ut_ad(dict_index_is_spatial(cursor->index));
1709
1710 rec = btr_cur_get_rec(cursor);
1711
1712 rtr_read_mbr(rec_get_nth_field(rec, offsets, 0, &len),
1713 reinterpret_cast<rtr_mbr_t*>(mbr1));
1714
1715 rec = btr_cur_get_rec(cursor2);
1716
1717 rtr_read_mbr(rec_get_nth_field(rec, offsets2, 0, &len),
1718 reinterpret_cast<rtr_mbr_t*>(mbr2));
1719
1720 mbr = reinterpret_cast<double*>(new_mbr);
1721
1722 for (int i = 0; i < SPDIMS * 2; i += 2) {
1723 changed = (changed || mbr1[i] != mbr2[i]);
1724 *mbr = mbr1[i] < mbr2[i] ? mbr1[i] : mbr2[i];
1725 mbr++;
1726 changed = (changed || mbr1[i + 1] != mbr2 [i + 1]);
1727 *mbr = mbr1[i + 1] > mbr2[i + 1] ? mbr1[i + 1] : mbr2[i + 1];
1728 mbr++;
1729 }
1730
1731 return(changed);
1732 }
1733
1734 /****************************************************************//**
1735 Merge 2 mbrs and update the the mbr that cursor is on. */
1736 dberr_t
rtr_merge_and_update_mbr(btr_cur_t * cursor,btr_cur_t * cursor2,ulint * offsets,ulint * offsets2,page_t * child_page,buf_block_t * merge_block,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1737 rtr_merge_and_update_mbr(
1738 /*=====================*/
1739 btr_cur_t* cursor, /*!< in/out: cursor */
1740 btr_cur_t* cursor2, /*!< in: the other cursor */
1741 ulint* offsets, /*!< in: rec offsets */
1742 ulint* offsets2, /*!< in: rec offsets */
1743 page_t* child_page, /*!< in: the page. */
1744 buf_block_t* merge_block, /*!< in: page to merge */
1745 buf_block_t* block, /*!< in: page be merged */
1746 dict_index_t* index, /*!< in: index */
1747 mtr_t* mtr) /*!< in: mtr */
1748 {
1749 dberr_t err = DB_SUCCESS;
1750 rtr_mbr_t new_mbr;
1751 bool changed = false;
1752
1753 ut_ad(dict_index_is_spatial(cursor->index));
1754
1755 changed = rtr_merge_mbr_changed(cursor, cursor2, offsets, offsets2,
1756 &new_mbr, merge_block,
1757 block, index);
1758
1759 /* Update the mbr field of the rec. And will delete the record
1760 pointed by cursor2 */
1761 if (changed) {
1762 if (!rtr_update_mbr_field(cursor, offsets, cursor2, child_page,
1763 &new_mbr, NULL, mtr)) {
1764 err = DB_ERROR;
1765 }
1766 } else {
1767 rtr_node_ptr_delete(cursor2->index, cursor2, block, mtr);
1768 }
1769
1770 return(err);
1771 }
1772
1773 /*************************************************************//**
1774 Deletes on the upper level the node pointer to a page. */
1775 void
rtr_node_ptr_delete(dict_index_t * index,btr_cur_t * cursor,buf_block_t * block,mtr_t * mtr)1776 rtr_node_ptr_delete(
1777 /*================*/
1778 dict_index_t* index, /*!< in: index tree */
1779 btr_cur_t* cursor, /*!< in: search cursor, contains information
1780 about parent nodes in search */
1781 buf_block_t* block, /*!< in: page whose node pointer is deleted */
1782 mtr_t* mtr) /*!< in: mtr */
1783 {
1784 ibool compressed;
1785 dberr_t err;
1786
1787 compressed = btr_cur_pessimistic_delete(&err, TRUE, cursor,
1788 BTR_CREATE_FLAG, false, mtr);
1789 ut_a(err == DB_SUCCESS);
1790
1791 if (!compressed) {
1792 btr_cur_compress_if_useful(cursor, FALSE, mtr);
1793 }
1794 }
1795
1796 /**************************************************************//**
1797 Check whether a Rtree page is child of a parent page
1798 @return true if there is child/parent relationship */
1799 bool
rtr_check_same_block(dict_index_t * index,btr_cur_t * cursor,buf_block_t * parentb,buf_block_t * childb,mem_heap_t * heap)1800 rtr_check_same_block(
1801 /*================*/
1802 dict_index_t* index, /*!< in: index tree */
1803 btr_cur_t* cursor, /*!< in/out: position at the parent entry
1804 pointing to the child if successful */
1805 buf_block_t* parentb,/*!< in: parent page to check */
1806 buf_block_t* childb, /*!< in: child Page */
1807 mem_heap_t* heap) /*!< in: memory heap */
1808
1809 {
1810 ulint page_no = childb->page.id.page_no();
1811 ulint* offsets;
1812 rec_t* rec = page_rec_get_next(page_get_infimum_rec(
1813 buf_block_get_frame(parentb)));
1814
1815 while (!page_rec_is_supremum(rec)) {
1816 offsets = rec_get_offsets(
1817 rec, index, NULL, ULINT_UNDEFINED, &heap);
1818
1819 if (btr_node_ptr_get_child_page_no(rec, offsets) == page_no) {
1820 btr_cur_position(index, rec, parentb, cursor);
1821 return(true);
1822 }
1823
1824 rec = page_rec_get_next(rec);
1825 }
1826
1827 return(false);
1828 }
1829
1830 /****************************************************************//**
1831 Calculate the area increased for a new record
1832 @return area increased */
1833 double
rtr_rec_cal_increase(const dtuple_t * dtuple,const rec_t * rec,const ulint * offsets,double * area)1834 rtr_rec_cal_increase(
1835 /*=================*/
1836 const dtuple_t* dtuple, /*!< in: data tuple to insert, which
1837 cause area increase */
1838 const rec_t* rec, /*!< in: physical record which differs from
1839 dtuple in some of the common fields, or which
1840 has an equal number or more fields than
1841 dtuple */
1842 const ulint* offsets,/*!< in: array returned by rec_get_offsets() */
1843 double* area) /*!< out: increased area */
1844 {
1845 const dfield_t* dtuple_field;
1846 ulint dtuple_f_len;
1847 ulint rec_f_len;
1848 const byte* rec_b_ptr;
1849 double ret = 0;
1850
1851 ut_ad(!page_rec_is_supremum(rec));
1852 ut_ad(!page_rec_is_infimum(rec));
1853
1854 dtuple_field = dtuple_get_nth_field(dtuple, 0);
1855 dtuple_f_len = dfield_get_len(dtuple_field);
1856
1857 rec_b_ptr = rec_get_nth_field(rec, offsets, 0, &rec_f_len);
1858 ret = rtree_area_increase(
1859 rec_b_ptr,
1860 static_cast<const byte*>(dfield_get_data(dtuple_field)),
1861 static_cast<int>(dtuple_f_len), area);
1862
1863 return(ret);
1864 }
1865
1866 /** Estimates the number of rows in a given area.
1867 @param[in] index index
1868 @param[in] tuple range tuple containing mbr, may also be empty tuple
1869 @param[in] mode search mode
1870 @return estimated number of rows */
1871 int64_t
rtr_estimate_n_rows_in_range(dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode)1872 rtr_estimate_n_rows_in_range(
1873 dict_index_t* index,
1874 const dtuple_t* tuple,
1875 page_cur_mode_t mode)
1876 {
1877 /* Check tuple & mode */
1878 if (tuple->n_fields == 0) {
1879 return(HA_POS_ERROR);
1880 }
1881
1882 switch (mode) {
1883 case PAGE_CUR_DISJOINT:
1884 case PAGE_CUR_CONTAIN:
1885 case PAGE_CUR_INTERSECT:
1886 case PAGE_CUR_WITHIN:
1887 case PAGE_CUR_MBR_EQUAL:
1888 break;
1889 default:
1890 return(HA_POS_ERROR);
1891 }
1892
1893 DBUG_EXECUTE_IF("rtr_pcur_move_to_next_return",
1894 return(2);
1895 );
1896
1897 /* Read mbr from tuple. */
1898 const dfield_t* dtuple_field;
1899 ulint dtuple_f_len MY_ATTRIBUTE((unused));
1900 rtr_mbr_t range_mbr;
1901 double range_area;
1902 byte* range_mbr_ptr;
1903
1904 dtuple_field = dtuple_get_nth_field(tuple, 0);
1905 dtuple_f_len = dfield_get_len(dtuple_field);
1906 range_mbr_ptr = reinterpret_cast<byte*>(dfield_get_data(dtuple_field));
1907
1908 ut_ad(dtuple_f_len >= DATA_MBR_LEN);
1909 rtr_read_mbr(range_mbr_ptr, &range_mbr);
1910 range_area = (range_mbr.xmax - range_mbr.xmin)
1911 * (range_mbr.ymax - range_mbr.ymin);
1912
1913 /* Get index root page. */
1914 page_size_t page_size(dict_table_page_size(index->table));
1915 page_id_t page_id(dict_index_get_space(index),
1916 dict_index_get_page(index));
1917 mtr_t mtr;
1918 buf_block_t* block;
1919 page_t* page;
1920 ulint n_recs;
1921
1922 mtr_start(&mtr);
1923 mtr.set_named_space(dict_index_get_space(index));
1924 mtr_s_lock(dict_index_get_lock(index), &mtr);
1925
1926 block = btr_block_get(page_id, page_size, RW_S_LATCH, index, &mtr);
1927 page = buf_block_get_frame(block);
1928 n_recs = page_header_get_field(page, PAGE_N_RECS);
1929
1930 if (n_recs == 0) {
1931 mtr_commit(&mtr);
1932 return(HA_POS_ERROR);
1933 }
1934
1935 rec_t* rec;
1936 byte* field;
1937 ulint len;
1938 ulint* offsets = NULL;
1939 mem_heap_t* heap;
1940
1941 heap = mem_heap_create(512);
1942 rec = page_rec_get_next(page_get_infimum_rec(page));
1943 offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap);
1944
1945 /* Scan records in root page and calculate area. */
1946 double area = 0;
1947 while (!page_rec_is_supremum(rec)) {
1948 rtr_mbr_t mbr;
1949 double rec_area;
1950
1951 field = rec_get_nth_field(rec, offsets, 0, &len);
1952 ut_ad(len == DATA_MBR_LEN);
1953
1954 rtr_read_mbr(field, &mbr);
1955
1956 rec_area = (mbr.xmax - mbr.xmin) * (mbr.ymax - mbr.ymin);
1957
1958 if (rec_area == 0) {
1959 switch (mode) {
1960 case PAGE_CUR_CONTAIN:
1961 case PAGE_CUR_INTERSECT:
1962 area += 1;
1963 break;
1964
1965 case PAGE_CUR_DISJOINT:
1966 break;
1967
1968 case PAGE_CUR_WITHIN:
1969 case PAGE_CUR_MBR_EQUAL:
1970 if (rtree_key_cmp(
1971 PAGE_CUR_WITHIN, range_mbr_ptr,
1972 DATA_MBR_LEN, field, DATA_MBR_LEN)
1973 == 0) {
1974 area += 1;
1975 }
1976
1977 break;
1978
1979 default:
1980 ut_error;
1981 }
1982 } else {
1983 switch (mode) {
1984 case PAGE_CUR_CONTAIN:
1985 case PAGE_CUR_INTERSECT:
1986 area += rtree_area_overlapping(range_mbr_ptr,
1987 field, DATA_MBR_LEN) / rec_area;
1988 break;
1989
1990 case PAGE_CUR_DISJOINT:
1991 area += 1;
1992 area -= rtree_area_overlapping(range_mbr_ptr,
1993 field, DATA_MBR_LEN) / rec_area;
1994 break;
1995
1996 case PAGE_CUR_WITHIN:
1997 case PAGE_CUR_MBR_EQUAL:
1998 if (rtree_key_cmp(
1999 PAGE_CUR_WITHIN, range_mbr_ptr,
2000 DATA_MBR_LEN, field, DATA_MBR_LEN)
2001 == 0) {
2002 area += range_area / rec_area;
2003 }
2004
2005 break;
2006 default:
2007 ut_error;
2008 }
2009 }
2010
2011 rec = page_rec_get_next(rec);
2012 }
2013
2014 mtr_commit(&mtr);
2015 mem_heap_free(heap);
2016
2017 if (my_isinf(area) || my_isnan(area)) {
2018 return(HA_POS_ERROR);
2019 }
2020
2021 return(static_cast<int64_t>(dict_table_get_n_rows(index->table)
2022 * area / n_recs));
2023 }
2024