1 /*****************************************************************************
2
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2018, 2021, MariaDB Corporation.
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *****************************************************************************/
20
21 /********************************************************************//**
22 @file page/page0cur.cc
23 The page cursor
24
25 Created 10/4/1994 Heikki Tuuri
26 *************************************************************************/
27
28 #include "page0cur.h"
29 #include "page0zip.h"
30 #include "btr0btr.h"
31 #include "mtr0log.h"
32 #include "log0recv.h"
33 #include "rem0cmp.h"
34 #include "gis0rtree.h"
35
36 #include <algorithm>
37
38 #ifdef BTR_CUR_HASH_ADAPT
39 # ifdef UNIV_SEARCH_PERF_STAT
40 static ulint page_cur_short_succ;
41 # endif /* UNIV_SEARCH_PERF_STAT */
42
43 /** Try a search shortcut based on the last insert.
44 @param[in] block index page
45 @param[in] index index tree
46 @param[in] tuple search key
47 @param[in,out] iup_matched_fields already matched fields in the
48 upper limit record
49 @param[in,out] ilow_matched_fields already matched fields in the
50 lower limit record
51 @param[out] cursor page cursor
52 @return true on success */
53 UNIV_INLINE
54 bool
page_cur_try_search_shortcut(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor)55 page_cur_try_search_shortcut(
56 const buf_block_t* block,
57 const dict_index_t* index,
58 const dtuple_t* tuple,
59 ulint* iup_matched_fields,
60 ulint* ilow_matched_fields,
61 page_cur_t* cursor)
62 {
63 const rec_t* rec;
64 const rec_t* next_rec;
65 ulint low_match;
66 ulint up_match;
67 ibool success = FALSE;
68 const page_t* page = buf_block_get_frame(block);
69 mem_heap_t* heap = NULL;
70 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
71 rec_offs* offsets = offsets_;
72 rec_offs_init(offsets_);
73
74 ut_ad(dtuple_check_typed(tuple));
75 ut_ad(page_is_leaf(page));
76
77 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
78 offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
79 dtuple_get_n_fields(tuple), &heap);
80
81 ut_ad(rec);
82 ut_ad(page_rec_is_user_rec(rec));
83
84 low_match = up_match = std::min(*ilow_matched_fields,
85 *iup_matched_fields);
86
87 if (cmp_dtuple_rec_with_match(tuple, rec, offsets, &low_match) < 0) {
88 goto exit_func;
89 }
90
91 next_rec = page_rec_get_next_const(rec);
92 if (!page_rec_is_supremum(next_rec)) {
93 offsets = rec_get_offsets(next_rec, index, offsets,
94 index->n_core_fields,
95 dtuple_get_n_fields(tuple), &heap);
96
97 if (cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
98 &up_match) >= 0) {
99 goto exit_func;
100 }
101
102 *iup_matched_fields = up_match;
103 }
104
105 page_cur_position(rec, block, cursor);
106
107 *ilow_matched_fields = low_match;
108
109 #ifdef UNIV_SEARCH_PERF_STAT
110 page_cur_short_succ++;
111 #endif
112 success = TRUE;
113 exit_func:
114 if (UNIV_LIKELY_NULL(heap)) {
115 mem_heap_free(heap);
116 }
117 return(success);
118 }
119
120 /** Try a search shortcut based on the last insert.
121 @param[in] block index page
122 @param[in] index index tree
123 @param[in] tuple search key
124 @param[in,out] iup_matched_fields already matched fields in the
125 upper limit record
126 @param[in,out] iup_matched_bytes already matched bytes in the
127 first partially matched field in the upper limit record
128 @param[in,out] ilow_matched_fields already matched fields in the
129 lower limit record
130 @param[in,out] ilow_matched_bytes already matched bytes in the
131 first partially matched field in the lower limit record
132 @param[out] cursor page cursor
133 @return true on success */
134 UNIV_INLINE
135 bool
page_cur_try_search_shortcut_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)136 page_cur_try_search_shortcut_bytes(
137 const buf_block_t* block,
138 const dict_index_t* index,
139 const dtuple_t* tuple,
140 ulint* iup_matched_fields,
141 ulint* iup_matched_bytes,
142 ulint* ilow_matched_fields,
143 ulint* ilow_matched_bytes,
144 page_cur_t* cursor)
145 {
146 const rec_t* rec;
147 const rec_t* next_rec;
148 ulint low_match;
149 ulint low_bytes;
150 ulint up_match;
151 ulint up_bytes;
152 ibool success = FALSE;
153 const page_t* page = buf_block_get_frame(block);
154 mem_heap_t* heap = NULL;
155 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
156 rec_offs* offsets = offsets_;
157 rec_offs_init(offsets_);
158
159 ut_ad(dtuple_check_typed(tuple));
160 ut_ad(page_is_leaf(page));
161
162 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
163 offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
164 dtuple_get_n_fields(tuple), &heap);
165
166 ut_ad(rec);
167 ut_ad(page_rec_is_user_rec(rec));
168 if (ut_pair_cmp(*ilow_matched_fields, *ilow_matched_bytes,
169 *iup_matched_fields, *iup_matched_bytes) < 0) {
170 up_match = low_match = *ilow_matched_fields;
171 up_bytes = low_bytes = *ilow_matched_bytes;
172 } else {
173 up_match = low_match = *iup_matched_fields;
174 up_bytes = low_bytes = *iup_matched_bytes;
175 }
176
177 if (cmp_dtuple_rec_with_match_bytes(
178 tuple, rec, index, offsets, &low_match, &low_bytes) < 0) {
179 goto exit_func;
180 }
181
182 next_rec = page_rec_get_next_const(rec);
183 if (!page_rec_is_supremum(next_rec)) {
184 offsets = rec_get_offsets(next_rec, index, offsets,
185 index->n_core_fields,
186 dtuple_get_n_fields(tuple), &heap);
187
188 if (cmp_dtuple_rec_with_match_bytes(
189 tuple, next_rec, index, offsets,
190 &up_match, &up_bytes)
191 >= 0) {
192 goto exit_func;
193 }
194
195 *iup_matched_fields = up_match;
196 *iup_matched_bytes = up_bytes;
197 }
198
199 page_cur_position(rec, block, cursor);
200
201 *ilow_matched_fields = low_match;
202 *ilow_matched_bytes = low_bytes;
203
204 #ifdef UNIV_SEARCH_PERF_STAT
205 page_cur_short_succ++;
206 #endif
207 success = TRUE;
208 exit_func:
209 if (UNIV_LIKELY_NULL(heap)) {
210 mem_heap_free(heap);
211 }
212 return(success);
213 }
214 #endif /* BTR_CUR_HASH_ADAPT */
215
216 #ifdef PAGE_CUR_LE_OR_EXTENDS
217 /****************************************************************//**
218 Checks if the nth field in a record is a character type field which extends
219 the nth field in tuple, i.e., the field is longer or equal in length and has
220 common first characters.
221 @return TRUE if rec field extends tuple field */
222 static
223 ibool
page_cur_rec_field_extends(const dtuple_t * tuple,const rec_t * rec,const rec_offs * offsets,ulint n)224 page_cur_rec_field_extends(
225 /*=======================*/
226 const dtuple_t* tuple, /*!< in: data tuple */
227 const rec_t* rec, /*!< in: record */
228 const rec_offs* offsets,/*!< in: array returned by rec_get_offsets() */
229 ulint n) /*!< in: compare nth field */
230 {
231 const dtype_t* type;
232 const dfield_t* dfield;
233 const byte* rec_f;
234 ulint rec_f_len;
235
236 ut_ad(rec_offs_validate(rec, NULL, offsets));
237 dfield = dtuple_get_nth_field(tuple, n);
238
239 type = dfield_get_type(dfield);
240
241 rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
242
243 if (type->mtype == DATA_VARCHAR
244 || type->mtype == DATA_CHAR
245 || type->mtype == DATA_FIXBINARY
246 || type->mtype == DATA_BINARY
247 || type->mtype == DATA_BLOB
248 || DATA_GEOMETRY_MTYPE(type->mtype)
249 || type->mtype == DATA_VARMYSQL
250 || type->mtype == DATA_MYSQL) {
251
252 if (dfield_get_len(dfield) != UNIV_SQL_NULL
253 && rec_f_len != UNIV_SQL_NULL
254 && rec_f_len >= dfield_get_len(dfield)
255 && !cmp_data_data(type->mtype, type->prtype,
256 dfield_get_data(dfield),
257 dfield_get_len(dfield),
258 rec_f, dfield_get_len(dfield))) {
259
260 return(TRUE);
261 }
262 }
263
264 return(FALSE);
265 }
266 #endif /* PAGE_CUR_LE_OR_EXTENDS */
267
268 /****************************************************************//**
269 Searches the right position for a page cursor. */
270 void
page_cur_search_with_match(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor,rtr_info_t * rtr_info)271 page_cur_search_with_match(
272 /*=======================*/
273 const buf_block_t* block, /*!< in: buffer block */
274 const dict_index_t* index, /*!< in/out: record descriptor */
275 const dtuple_t* tuple, /*!< in: data tuple */
276 page_cur_mode_t mode, /*!< in: PAGE_CUR_L,
277 PAGE_CUR_LE, PAGE_CUR_G, or
278 PAGE_CUR_GE */
279 ulint* iup_matched_fields,
280 /*!< in/out: already matched
281 fields in upper limit record */
282 ulint* ilow_matched_fields,
283 /*!< in/out: already matched
284 fields in lower limit record */
285 page_cur_t* cursor, /*!< out: page cursor */
286 rtr_info_t* rtr_info)/*!< in/out: rtree search stack */
287 {
288 ulint up;
289 ulint low;
290 ulint mid;
291 const page_t* page;
292 const page_dir_slot_t* slot;
293 const rec_t* up_rec;
294 const rec_t* low_rec;
295 const rec_t* mid_rec;
296 ulint up_matched_fields;
297 ulint low_matched_fields;
298 ulint cur_matched_fields;
299 int cmp;
300 #ifdef UNIV_ZIP_DEBUG
301 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
302 #endif /* UNIV_ZIP_DEBUG */
303 mem_heap_t* heap = NULL;
304 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
305 rec_offs* offsets = offsets_;
306 rec_offs_init(offsets_);
307
308 ut_ad(dtuple_validate(tuple));
309 #ifdef UNIV_DEBUG
310 # ifdef PAGE_CUR_DBG
311 if (mode != PAGE_CUR_DBG)
312 # endif /* PAGE_CUR_DBG */
313 # ifdef PAGE_CUR_LE_OR_EXTENDS
314 if (mode != PAGE_CUR_LE_OR_EXTENDS)
315 # endif /* PAGE_CUR_LE_OR_EXTENDS */
316 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
317 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE
318 || dict_index_is_spatial(index));
319 #endif /* UNIV_DEBUG */
320 page = buf_block_get_frame(block);
321 #ifdef UNIV_ZIP_DEBUG
322 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
323 #endif /* UNIV_ZIP_DEBUG */
324
325 ut_d(page_check_dir(page));
326 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
327
328 #ifdef BTR_CUR_HASH_ADAPT
329 if (n_core
330 && page_get_direction(page) == PAGE_RIGHT
331 && page_header_get_offs(page, PAGE_LAST_INSERT)
332 && mode == PAGE_CUR_LE
333 && !index->is_spatial()
334 && page_header_get_field(page, PAGE_N_DIRECTION) > 3
335 && page_cur_try_search_shortcut(
336 block, index, tuple,
337 iup_matched_fields, ilow_matched_fields, cursor)) {
338 return;
339 }
340 # ifdef PAGE_CUR_DBG
341 if (mode == PAGE_CUR_DBG) {
342 mode = PAGE_CUR_LE;
343 }
344 # endif
345 #endif /* BTR_CUR_HASH_ADAPT */
346
347 /* If the mode is for R-tree indexes, use the special MBR
348 related compare functions */
349 if (index->is_spatial() && mode > PAGE_CUR_LE) {
350 /* For leaf level insert, we still use the traditional
351 compare function for now */
352 if (mode == PAGE_CUR_RTREE_INSERT && n_core) {
353 mode = PAGE_CUR_LE;
354 } else {
355 rtr_cur_search_with_match(
356 block, (dict_index_t*)index, tuple, mode,
357 cursor, rtr_info);
358 return;
359 }
360 }
361
362 /* The following flag does not work for non-latin1 char sets because
363 cmp_full_field does not tell how many bytes matched */
364 #ifdef PAGE_CUR_LE_OR_EXTENDS
365 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
366 #endif /* PAGE_CUR_LE_OR_EXTENDS */
367
368 /* If mode PAGE_CUR_G is specified, we are trying to position the
369 cursor to answer a query of the form "tuple < X", where tuple is
370 the input parameter, and X denotes an arbitrary physical record on
371 the page. We want to position the cursor on the first X which
372 satisfies the condition. */
373
374 up_matched_fields = *iup_matched_fields;
375 low_matched_fields = *ilow_matched_fields;
376
377 /* Perform binary search. First the search is done through the page
378 directory, after that as a linear search in the list of records
379 owned by the upper limit directory slot. */
380
381 low = 0;
382 up = ulint(page_dir_get_n_slots(page)) - 1;
383
384 /* Perform binary search until the lower and upper limit directory
385 slots come to the distance 1 of each other */
386
387 while (up - low > 1) {
388 mid = (low + up) / 2;
389 slot = page_dir_get_nth_slot(page, mid);
390 mid_rec = page_dir_slot_get_rec(slot);
391
392 cur_matched_fields = std::min(low_matched_fields,
393 up_matched_fields);
394
395 offsets = offsets_;
396 offsets = rec_get_offsets(
397 mid_rec, index, offsets, n_core,
398 dtuple_get_n_fields_cmp(tuple), &heap);
399
400 cmp = cmp_dtuple_rec_with_match(
401 tuple, mid_rec, offsets, &cur_matched_fields);
402
403 if (cmp > 0) {
404 low_slot_match:
405 low = mid;
406 low_matched_fields = cur_matched_fields;
407
408 } else if (cmp) {
409 #ifdef PAGE_CUR_LE_OR_EXTENDS
410 if (mode == PAGE_CUR_LE_OR_EXTENDS
411 && page_cur_rec_field_extends(
412 tuple, mid_rec, offsets,
413 cur_matched_fields)) {
414
415 goto low_slot_match;
416 }
417 #endif /* PAGE_CUR_LE_OR_EXTENDS */
418 up_slot_match:
419 up = mid;
420 up_matched_fields = cur_matched_fields;
421
422 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
423 #ifdef PAGE_CUR_LE_OR_EXTENDS
424 || mode == PAGE_CUR_LE_OR_EXTENDS
425 #endif /* PAGE_CUR_LE_OR_EXTENDS */
426 ) {
427 goto low_slot_match;
428 } else {
429
430 goto up_slot_match;
431 }
432 }
433
434 slot = page_dir_get_nth_slot(page, low);
435 low_rec = page_dir_slot_get_rec(slot);
436 slot = page_dir_get_nth_slot(page, up);
437 up_rec = page_dir_slot_get_rec(slot);
438
439 /* Perform linear search until the upper and lower records come to
440 distance 1 of each other. */
441
442 while (page_rec_get_next_const(low_rec) != up_rec) {
443
444 mid_rec = page_rec_get_next_const(low_rec);
445
446 cur_matched_fields = std::min(low_matched_fields,
447 up_matched_fields);
448
449 offsets = offsets_;
450 offsets = rec_get_offsets(
451 mid_rec, index, offsets, n_core,
452 dtuple_get_n_fields_cmp(tuple), &heap);
453
454 cmp = cmp_dtuple_rec_with_match(
455 tuple, mid_rec, offsets, &cur_matched_fields);
456
457 if (cmp > 0) {
458 low_rec_match:
459 low_rec = mid_rec;
460 low_matched_fields = cur_matched_fields;
461
462 } else if (cmp) {
463 #ifdef PAGE_CUR_LE_OR_EXTENDS
464 if (mode == PAGE_CUR_LE_OR_EXTENDS
465 && page_cur_rec_field_extends(
466 tuple, mid_rec, offsets,
467 cur_matched_fields)) {
468
469 goto low_rec_match;
470 }
471 #endif /* PAGE_CUR_LE_OR_EXTENDS */
472 up_rec_match:
473 up_rec = mid_rec;
474 up_matched_fields = cur_matched_fields;
475 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
476 #ifdef PAGE_CUR_LE_OR_EXTENDS
477 || mode == PAGE_CUR_LE_OR_EXTENDS
478 #endif /* PAGE_CUR_LE_OR_EXTENDS */
479 ) {
480 if (!cmp && !cur_matched_fields) {
481 #ifdef UNIV_DEBUG
482 mtr_t mtr;
483 mtr_start(&mtr);
484
485 /* We got a match, but cur_matched_fields is
486 0, it must have REC_INFO_MIN_REC_FLAG */
487 ulint rec_info = rec_get_info_bits(mid_rec,
488 rec_offs_comp(offsets));
489 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
490 ut_ad(!page_has_prev(page));
491 mtr_commit(&mtr);
492 #endif
493
494 cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
495 }
496
497 goto low_rec_match;
498 } else {
499
500 goto up_rec_match;
501 }
502 }
503
504 if (mode <= PAGE_CUR_GE) {
505 page_cur_position(up_rec, block, cursor);
506 } else {
507 page_cur_position(low_rec, block, cursor);
508 }
509
510 *iup_matched_fields = up_matched_fields;
511 *ilow_matched_fields = low_matched_fields;
512 if (UNIV_LIKELY_NULL(heap)) {
513 mem_heap_free(heap);
514 }
515 }
516
517 #ifdef BTR_CUR_HASH_ADAPT
518 /** Search the right position for a page cursor.
519 @param[in] block buffer block
520 @param[in] index index tree
521 @param[in] tuple key to be searched for
522 @param[in] mode search mode
523 @param[in,out] iup_matched_fields already matched fields in the
524 upper limit record
525 @param[in,out] iup_matched_bytes already matched bytes in the
526 first partially matched field in the upper limit record
527 @param[in,out] ilow_matched_fields already matched fields in the
528 lower limit record
529 @param[in,out] ilow_matched_bytes already matched bytes in the
530 first partially matched field in the lower limit record
531 @param[out] cursor page cursor */
532 void
page_cur_search_with_match_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)533 page_cur_search_with_match_bytes(
534 const buf_block_t* block,
535 const dict_index_t* index,
536 const dtuple_t* tuple,
537 page_cur_mode_t mode,
538 ulint* iup_matched_fields,
539 ulint* iup_matched_bytes,
540 ulint* ilow_matched_fields,
541 ulint* ilow_matched_bytes,
542 page_cur_t* cursor)
543 {
544 ulint up;
545 ulint low;
546 ulint mid;
547 const page_t* page;
548 const page_dir_slot_t* slot;
549 const rec_t* up_rec;
550 const rec_t* low_rec;
551 const rec_t* mid_rec;
552 ulint up_matched_fields;
553 ulint up_matched_bytes;
554 ulint low_matched_fields;
555 ulint low_matched_bytes;
556 ulint cur_matched_fields;
557 ulint cur_matched_bytes;
558 int cmp;
559 #ifdef UNIV_ZIP_DEBUG
560 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
561 #endif /* UNIV_ZIP_DEBUG */
562 mem_heap_t* heap = NULL;
563 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
564 rec_offs* offsets = offsets_;
565 rec_offs_init(offsets_);
566
567 ut_ad(dtuple_validate(tuple));
568 ut_ad(!(tuple->info_bits & REC_INFO_MIN_REC_FLAG));
569 #ifdef UNIV_DEBUG
570 # ifdef PAGE_CUR_DBG
571 if (mode != PAGE_CUR_DBG)
572 # endif /* PAGE_CUR_DBG */
573 # ifdef PAGE_CUR_LE_OR_EXTENDS
574 if (mode != PAGE_CUR_LE_OR_EXTENDS)
575 # endif /* PAGE_CUR_LE_OR_EXTENDS */
576 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
577 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
578 #endif /* UNIV_DEBUG */
579 page = buf_block_get_frame(block);
580 #ifdef UNIV_ZIP_DEBUG
581 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
582 #endif /* UNIV_ZIP_DEBUG */
583
584 ut_d(page_check_dir(page));
585
586 #ifdef BTR_CUR_HASH_ADAPT
587 if (page_is_leaf(page)
588 && page_get_direction(page) == PAGE_RIGHT
589 && page_header_get_offs(page, PAGE_LAST_INSERT)
590 && mode == PAGE_CUR_LE
591 && page_header_get_field(page, PAGE_N_DIRECTION) > 3
592 && page_cur_try_search_shortcut_bytes(
593 block, index, tuple,
594 iup_matched_fields, iup_matched_bytes,
595 ilow_matched_fields, ilow_matched_bytes,
596 cursor)) {
597 return;
598 }
599 # ifdef PAGE_CUR_DBG
600 if (mode == PAGE_CUR_DBG) {
601 mode = PAGE_CUR_LE;
602 }
603 # endif
604 #endif /* BTR_CUR_HASH_ADAPT */
605
606 /* The following flag does not work for non-latin1 char sets because
607 cmp_full_field does not tell how many bytes matched */
608 #ifdef PAGE_CUR_LE_OR_EXTENDS
609 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
610 #endif /* PAGE_CUR_LE_OR_EXTENDS */
611
612 /* If mode PAGE_CUR_G is specified, we are trying to position the
613 cursor to answer a query of the form "tuple < X", where tuple is
614 the input parameter, and X denotes an arbitrary physical record on
615 the page. We want to position the cursor on the first X which
616 satisfies the condition. */
617
618 up_matched_fields = *iup_matched_fields;
619 up_matched_bytes = *iup_matched_bytes;
620 low_matched_fields = *ilow_matched_fields;
621 low_matched_bytes = *ilow_matched_bytes;
622
623 /* Perform binary search. First the search is done through the page
624 directory, after that as a linear search in the list of records
625 owned by the upper limit directory slot. */
626
627 low = 0;
628 up = ulint(page_dir_get_n_slots(page)) - 1;
629
630 /* Perform binary search until the lower and upper limit directory
631 slots come to the distance 1 of each other */
632 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
633
634 while (up - low > 1) {
635 mid = (low + up) / 2;
636 slot = page_dir_get_nth_slot(page, mid);
637 mid_rec = page_dir_slot_get_rec(slot);
638
639 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
640 low_matched_fields, low_matched_bytes,
641 up_matched_fields, up_matched_bytes);
642
643 offsets = rec_get_offsets(
644 mid_rec, index, offsets_, n_core,
645 dtuple_get_n_fields_cmp(tuple), &heap);
646
647 cmp = cmp_dtuple_rec_with_match_bytes(
648 tuple, mid_rec, index, offsets,
649 &cur_matched_fields, &cur_matched_bytes);
650
651 if (cmp > 0) {
652 low_slot_match:
653 low = mid;
654 low_matched_fields = cur_matched_fields;
655 low_matched_bytes = cur_matched_bytes;
656
657 } else if (cmp) {
658 #ifdef PAGE_CUR_LE_OR_EXTENDS
659 if (mode == PAGE_CUR_LE_OR_EXTENDS
660 && page_cur_rec_field_extends(
661 tuple, mid_rec, offsets,
662 cur_matched_fields)) {
663
664 goto low_slot_match;
665 }
666 #endif /* PAGE_CUR_LE_OR_EXTENDS */
667 up_slot_match:
668 up = mid;
669 up_matched_fields = cur_matched_fields;
670 up_matched_bytes = cur_matched_bytes;
671
672 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
673 #ifdef PAGE_CUR_LE_OR_EXTENDS
674 || mode == PAGE_CUR_LE_OR_EXTENDS
675 #endif /* PAGE_CUR_LE_OR_EXTENDS */
676 ) {
677 goto low_slot_match;
678 } else {
679
680 goto up_slot_match;
681 }
682 }
683
684 slot = page_dir_get_nth_slot(page, low);
685 low_rec = page_dir_slot_get_rec(slot);
686 slot = page_dir_get_nth_slot(page, up);
687 up_rec = page_dir_slot_get_rec(slot);
688
689 /* Perform linear search until the upper and lower records come to
690 distance 1 of each other. */
691
692 while (page_rec_get_next_const(low_rec) != up_rec) {
693
694 mid_rec = page_rec_get_next_const(low_rec);
695
696 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
697 low_matched_fields, low_matched_bytes,
698 up_matched_fields, up_matched_bytes);
699
700 if (UNIV_UNLIKELY(rec_get_info_bits(
701 mid_rec,
702 dict_table_is_comp(index->table))
703 & REC_INFO_MIN_REC_FLAG)) {
704 ut_ad(!page_has_prev(page_align(mid_rec)));
705 ut_ad(!page_rec_is_leaf(mid_rec)
706 || rec_is_metadata(mid_rec, *index));
707 cmp = 1;
708 goto low_rec_match;
709 }
710
711 offsets = rec_get_offsets(
712 mid_rec, index, offsets_, n_core,
713 dtuple_get_n_fields_cmp(tuple), &heap);
714
715 cmp = cmp_dtuple_rec_with_match_bytes(
716 tuple, mid_rec, index, offsets,
717 &cur_matched_fields, &cur_matched_bytes);
718
719 if (cmp > 0) {
720 low_rec_match:
721 low_rec = mid_rec;
722 low_matched_fields = cur_matched_fields;
723 low_matched_bytes = cur_matched_bytes;
724
725 } else if (cmp) {
726 #ifdef PAGE_CUR_LE_OR_EXTENDS
727 if (mode == PAGE_CUR_LE_OR_EXTENDS
728 && page_cur_rec_field_extends(
729 tuple, mid_rec, offsets,
730 cur_matched_fields)) {
731
732 goto low_rec_match;
733 }
734 #endif /* PAGE_CUR_LE_OR_EXTENDS */
735 up_rec_match:
736 up_rec = mid_rec;
737 up_matched_fields = cur_matched_fields;
738 up_matched_bytes = cur_matched_bytes;
739 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
740 #ifdef PAGE_CUR_LE_OR_EXTENDS
741 || mode == PAGE_CUR_LE_OR_EXTENDS
742 #endif /* PAGE_CUR_LE_OR_EXTENDS */
743 ) {
744 goto low_rec_match;
745 } else {
746
747 goto up_rec_match;
748 }
749 }
750
751 if (mode <= PAGE_CUR_GE) {
752 page_cur_position(up_rec, block, cursor);
753 } else {
754 page_cur_position(low_rec, block, cursor);
755 }
756
757 *iup_matched_fields = up_matched_fields;
758 *iup_matched_bytes = up_matched_bytes;
759 *ilow_matched_fields = low_matched_fields;
760 *ilow_matched_bytes = low_matched_bytes;
761 if (UNIV_LIKELY_NULL(heap)) {
762 mem_heap_free(heap);
763 }
764 }
765 #endif /* BTR_CUR_HASH_ADAPT */
766
767 /***********************************************************//**
768 Positions a page cursor on a randomly chosen user record on a page. If there
769 are no user records, sets the cursor on the infimum record. */
770 void
page_cur_open_on_rnd_user_rec(buf_block_t * block,page_cur_t * cursor)771 page_cur_open_on_rnd_user_rec(
772 /*==========================*/
773 buf_block_t* block, /*!< in: page */
774 page_cur_t* cursor) /*!< out: page cursor */
775 {
776 const ulint n_recs = page_get_n_recs(block->frame);
777
778 page_cur_set_before_first(block, cursor);
779
780 if (UNIV_UNLIKELY(n_recs == 0)) {
781
782 return;
783 }
784
785 cursor->rec = page_rec_get_nth(block->frame,
786 ut_rnd_interval(n_recs) + 1);
787 }
788
789 /** Write a redo log record of inserting a record into an index page.
790 @param[in] insert_rec inserted record
791 @param[in] rec_size rec_get_size(insert_rec)
792 @param[in] cursor_rec predecessor of insert_rec
793 @param[in,out] index index tree
794 @param[in,out] mtr mini-transaction */
795 void
page_cur_insert_rec_write_log(const rec_t * insert_rec,ulint rec_size,const rec_t * cursor_rec,dict_index_t * index,mtr_t * mtr)796 page_cur_insert_rec_write_log(
797 const rec_t* insert_rec,
798 ulint rec_size,
799 const rec_t* cursor_rec,
800 dict_index_t* index,
801 mtr_t* mtr)
802 {
803 ulint cur_rec_size;
804 ulint extra_size;
805 ulint cur_extra_size;
806 const byte* ins_ptr;
807 const byte* log_end;
808 ulint i;
809
810 if (index->table->is_temporary()) {
811 mtr->set_modified();
812 ut_ad(mtr->get_log_mode() == MTR_LOG_NO_REDO);
813 return;
814 }
815
816 ut_a(rec_size < srv_page_size);
817 ut_ad(mtr->is_named_space(index->table->space));
818 ut_ad(page_align(insert_rec) == page_align(cursor_rec));
819 ut_ad(!page_rec_is_comp(insert_rec)
820 == !dict_table_is_comp(index->table));
821
822 const ulint n_core = page_rec_is_leaf(cursor_rec)
823 ? index->n_core_fields : 0;
824
825 {
826 mem_heap_t* heap = NULL;
827 rec_offs cur_offs_[REC_OFFS_NORMAL_SIZE];
828 rec_offs ins_offs_[REC_OFFS_NORMAL_SIZE];
829
830 rec_offs* cur_offs;
831 rec_offs* ins_offs;
832
833 rec_offs_init(cur_offs_);
834 rec_offs_init(ins_offs_);
835
836 cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
837 n_core, ULINT_UNDEFINED, &heap);
838 ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
839 n_core, ULINT_UNDEFINED, &heap);
840
841 extra_size = rec_offs_extra_size(ins_offs);
842 cur_extra_size = rec_offs_extra_size(cur_offs);
843 ut_ad(rec_size == rec_offs_size(ins_offs));
844 cur_rec_size = rec_offs_size(cur_offs);
845
846 if (UNIV_LIKELY_NULL(heap)) {
847 mem_heap_free(heap);
848 }
849 }
850
851 ins_ptr = insert_rec - extra_size;
852
853 i = 0;
854
855 if (cur_extra_size == extra_size) {
856 ulint min_rec_size = ut_min(cur_rec_size, rec_size);
857
858 const byte* cur_ptr = cursor_rec - cur_extra_size;
859
860 /* Find out the first byte in insert_rec which differs from
861 cursor_rec; skip the bytes in the record info */
862
863 do {
864 if (*ins_ptr == *cur_ptr) {
865 i++;
866 ins_ptr++;
867 cur_ptr++;
868 } else if ((i < extra_size)
869 && (i >= extra_size
870 - page_rec_get_base_extra_size
871 (insert_rec))) {
872 i = extra_size;
873 ins_ptr = insert_rec;
874 cur_ptr = cursor_rec;
875 } else {
876 break;
877 }
878 } while (i < min_rec_size);
879 }
880
881 byte* log_ptr;
882
883 if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
884
885 if (page_rec_is_comp(insert_rec)) {
886 log_ptr = mlog_open_and_write_index(
887 mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
888 2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
889 if (UNIV_UNLIKELY(!log_ptr)) {
890 /* Logging in mtr is switched off
891 during crash recovery: in that case
892 mlog_open returns NULL */
893 return;
894 }
895 } else {
896 log_ptr = mlog_open(mtr, 11
897 + 2 + 5 + 1 + 5 + 5
898 + MLOG_BUF_MARGIN);
899 if (UNIV_UNLIKELY(!log_ptr)) {
900 /* Logging in mtr is switched off
901 during crash recovery: in that case
902 mlog_open returns NULL */
903 return;
904 }
905
906 log_ptr = mlog_write_initial_log_record_fast(
907 insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
908 }
909
910 log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
911 /* Write the cursor rec offset as a 2-byte ulint */
912 mach_write_to_2(log_ptr, page_offset(cursor_rec));
913 log_ptr += 2;
914 } else {
915 log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
916 if (!log_ptr) {
917 /* Logging in mtr is switched off during crash
918 recovery: in that case mlog_open returns NULL */
919 return;
920 }
921 log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
922 }
923
924 if (page_rec_is_comp(insert_rec)) {
925 if (UNIV_UNLIKELY
926 (rec_get_info_and_status_bits(insert_rec, TRUE)
927 != rec_get_info_and_status_bits(cursor_rec, TRUE))) {
928
929 goto need_extra_info;
930 }
931 } else {
932 if (UNIV_UNLIKELY
933 (rec_get_info_and_status_bits(insert_rec, FALSE)
934 != rec_get_info_and_status_bits(cursor_rec, FALSE))) {
935
936 goto need_extra_info;
937 }
938 }
939
940 if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
941 need_extra_info:
942 /* Write the record end segment length
943 and the extra info storage flag */
944 log_ptr += mach_write_compressed(log_ptr,
945 2 * (rec_size - i) + 1);
946
947 /* Write the info bits */
948 mach_write_to_1(log_ptr,
949 rec_get_info_and_status_bits(
950 insert_rec,
951 page_rec_is_comp(insert_rec)));
952 log_ptr++;
953
954 /* Write the record origin offset */
955 log_ptr += mach_write_compressed(log_ptr, extra_size);
956
957 /* Write the mismatch index */
958 log_ptr += mach_write_compressed(log_ptr, i);
959
960 ut_a(i < srv_page_size);
961 ut_a(extra_size < srv_page_size);
962 } else {
963 /* Write the record end segment length
964 and the extra info storage flag */
965 log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
966 }
967
968 /* Write to the log the inserted index record end segment which
969 differs from the cursor record */
970
971 rec_size -= i;
972
973 if (log_ptr + rec_size <= log_end) {
974 memcpy(log_ptr, ins_ptr, rec_size);
975 mlog_close(mtr, log_ptr + rec_size);
976 } else {
977 mlog_close(mtr, log_ptr);
978 ut_a(rec_size < srv_page_size);
979 mlog_catenate_string(mtr, ins_ptr, rec_size);
980 }
981 }
982
983 /***********************************************************//**
984 Parses a log record of a record insert on a page.
985 @return end of log record or NULL */
986 byte*
page_cur_parse_insert_rec(ibool is_short,const byte * ptr,const byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)987 page_cur_parse_insert_rec(
988 /*======================*/
989 ibool is_short,/*!< in: TRUE if short inserts */
990 const byte* ptr, /*!< in: buffer */
991 const byte* end_ptr,/*!< in: buffer end */
992 buf_block_t* block, /*!< in: page or NULL */
993 dict_index_t* index, /*!< in: record descriptor */
994 mtr_t* mtr) /*!< in: mtr or NULL */
995 {
996 ulint origin_offset = 0; /* remove warning */
997 ulint end_seg_len;
998 ulint mismatch_index = 0; /* remove warning */
999 page_t* page;
1000 rec_t* cursor_rec;
1001 byte buf1[1024];
1002 byte* buf;
1003 const byte* ptr2 = ptr;
1004 ulint info_and_status_bits = 0; /* remove warning */
1005 page_cur_t cursor;
1006 mem_heap_t* heap = NULL;
1007 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
1008 rec_offs* offsets = offsets_;
1009 rec_offs_init(offsets_);
1010
1011 page = block ? buf_block_get_frame(block) : NULL;
1012
1013 if (is_short) {
1014 cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
1015 } else {
1016 ulint offset;
1017
1018 /* Read the cursor rec offset as a 2-byte ulint */
1019
1020 if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
1021
1022 return(NULL);
1023 }
1024
1025 offset = mach_read_from_2(ptr);
1026 ptr += 2;
1027
1028 cursor_rec = page + offset;
1029
1030 if (offset >= srv_page_size) {
1031
1032 recv_sys.found_corrupt_log = TRUE;
1033
1034 return(NULL);
1035 }
1036 }
1037
1038 end_seg_len = mach_parse_compressed(&ptr, end_ptr);
1039
1040 if (ptr == NULL) {
1041
1042 return(NULL);
1043 }
1044
1045 if (end_seg_len >= srv_page_size << 1) {
1046 recv_sys.found_corrupt_log = TRUE;
1047
1048 return(NULL);
1049 }
1050
1051 if (end_seg_len & 0x1UL) {
1052 /* Read the info bits */
1053
1054 if (end_ptr < ptr + 1) {
1055
1056 return(NULL);
1057 }
1058
1059 info_and_status_bits = mach_read_from_1(ptr);
1060 ptr++;
1061
1062 origin_offset = mach_parse_compressed(&ptr, end_ptr);
1063
1064 if (ptr == NULL) {
1065
1066 return(NULL);
1067 }
1068
1069 ut_a(origin_offset < srv_page_size);
1070
1071 mismatch_index = mach_parse_compressed(&ptr, end_ptr);
1072
1073 if (ptr == NULL) {
1074
1075 return(NULL);
1076 }
1077
1078 ut_a(mismatch_index < srv_page_size);
1079 }
1080
1081 if (end_ptr < ptr + (end_seg_len >> 1)) {
1082
1083 return(NULL);
1084 }
1085
1086 if (!block) {
1087
1088 return(const_cast<byte*>(ptr + (end_seg_len >> 1)));
1089 }
1090
1091 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1092 ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
1093
1094 /* Read from the log the inserted index record end segment which
1095 differs from the cursor record */
1096
1097 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
1098
1099 offsets = rec_get_offsets(cursor_rec, index, offsets, n_core,
1100 ULINT_UNDEFINED, &heap);
1101
1102 if (!(end_seg_len & 0x1UL)) {
1103 info_and_status_bits = rec_get_info_and_status_bits(
1104 cursor_rec, page_is_comp(page));
1105 origin_offset = rec_offs_extra_size(offsets);
1106 mismatch_index = rec_offs_size(offsets) - (end_seg_len >> 1);
1107 }
1108
1109 end_seg_len >>= 1;
1110
1111 if (mismatch_index + end_seg_len < sizeof buf1) {
1112 buf = buf1;
1113 } else {
1114 buf = static_cast<byte*>(
1115 ut_malloc_nokey(mismatch_index + end_seg_len));
1116 }
1117
1118 /* Build the inserted record to buf */
1119
1120 if (UNIV_UNLIKELY(mismatch_index >= srv_page_size)) {
1121
1122 ib::fatal() << "is_short " << is_short << ", "
1123 << "info_and_status_bits " << info_and_status_bits
1124 << ", offset " << page_offset(cursor_rec) << ","
1125 " o_offset " << origin_offset << ", mismatch index "
1126 << mismatch_index << ", end_seg_len " << end_seg_len
1127 << " parsed len " << (ptr - ptr2);
1128 }
1129
1130 ut_memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
1131 ut_memcpy(buf + mismatch_index, ptr, end_seg_len);
1132
1133 if (page_is_comp(page)) {
1134 rec_set_heap_no_new(buf + origin_offset,
1135 PAGE_HEAP_NO_USER_LOW);
1136 rec_set_info_and_status_bits(buf + origin_offset,
1137 info_and_status_bits);
1138 } else {
1139 rec_set_heap_no_old(buf + origin_offset,
1140 PAGE_HEAP_NO_USER_LOW);
1141 rec_set_info_bits_old(buf + origin_offset,
1142 info_and_status_bits);
1143 }
1144
1145 page_cur_position(cursor_rec, block, &cursor);
1146
1147 offsets = rec_get_offsets(buf + origin_offset, index, offsets,
1148 n_core, ULINT_UNDEFINED, &heap);
1149 if (UNIV_UNLIKELY(!page_cur_rec_insert(&cursor,
1150 buf + origin_offset,
1151 index, offsets, mtr))) {
1152 /* The redo log record should only have been written
1153 after the write was successful. */
1154 ut_error;
1155 }
1156
1157 if (buf != buf1) {
1158
1159 ut_free(buf);
1160 }
1161
1162 if (UNIV_LIKELY_NULL(heap)) {
1163 mem_heap_free(heap);
1164 }
1165
1166 return(const_cast<byte*>(ptr + end_seg_len));
1167 }
1168
1169 /** Reset PAGE_DIRECTION and PAGE_N_DIRECTION.
1170 @param[in,out] ptr the PAGE_DIRECTION_B field
1171 @param[in,out] page index tree page frame
1172 @param[in] page_zip compressed page descriptor, or NULL */
1173 static inline
1174 void
page_direction_reset(byte * ptr,page_t * page,page_zip_des_t * page_zip)1175 page_direction_reset(byte* ptr, page_t* page, page_zip_des_t* page_zip)
1176 {
1177 ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
1178 page_ptr_set_direction(ptr, PAGE_NO_DIRECTION);
1179 if (page_zip) {
1180 page_zip_write_header(page_zip, ptr, 1, NULL);
1181 }
1182 ptr = PAGE_HEADER + PAGE_N_DIRECTION + page;
1183 *reinterpret_cast<uint16_t*>(ptr) = 0;
1184 if (page_zip) {
1185 page_zip_write_header(page_zip, ptr, 2, NULL);
1186 }
1187 }
1188
1189 /** Increment PAGE_N_DIRECTION.
1190 @param[in,out] ptr the PAGE_DIRECTION_B field
1191 @param[in,out] page index tree page frame
1192 @param[in] page_zip compressed page descriptor, or NULL
1193 @param[in] dir PAGE_RIGHT or PAGE_LEFT */
1194 static inline
1195 void
page_direction_increment(byte * ptr,page_t * page,page_zip_des_t * page_zip,uint dir)1196 page_direction_increment(
1197 byte* ptr,
1198 page_t* page,
1199 page_zip_des_t* page_zip,
1200 uint dir)
1201 {
1202 ut_ad(ptr == PAGE_HEADER + PAGE_DIRECTION_B + page);
1203 ut_ad(dir == PAGE_RIGHT || dir == PAGE_LEFT);
1204 page_ptr_set_direction(ptr, dir);
1205 if (page_zip) {
1206 page_zip_write_header(page_zip, ptr, 1, NULL);
1207 }
1208 page_header_set_field(
1209 page, page_zip, PAGE_N_DIRECTION,
1210 1U + page_header_get_field(page, PAGE_N_DIRECTION));
1211 }
1212
1213 /************************************************************//**
1214 Allocates a block of memory from the heap of an index page.
1215 @return pointer to start of allocated buffer, or NULL if allocation fails */
1216 static
1217 byte*
page_mem_alloc_heap(page_t * page,page_zip_des_t * page_zip,ulint need,ulint * heap_no)1218 page_mem_alloc_heap(
1219 /*================*/
1220 page_t* page, /*!< in/out: index page */
1221 page_zip_des_t* page_zip,/*!< in/out: compressed page with enough
1222 space available for inserting the record,
1223 or NULL */
1224 ulint need, /*!< in: total number of bytes needed */
1225 ulint* heap_no)/*!< out: this contains the heap number
1226 of the allocated record
1227 if allocation succeeds */
1228 {
1229 byte* block;
1230 ulint avl_space;
1231
1232 ut_ad(page && heap_no);
1233
1234 avl_space = page_get_max_insert_size(page, 1);
1235
1236 if (avl_space >= need) {
1237 const ulint h = page_dir_get_n_heap(page);
1238 if (UNIV_UNLIKELY(h >= 8191)) {
1239 /* At the minimum record size of 5+2 bytes,
1240 we can only reach this condition when using
1241 innodb_page_size=64k. */
1242 ut_ad(srv_page_size == 65536);
1243 return(NULL);
1244 }
1245 *heap_no = h;
1246
1247 block = page_header_get_ptr(page, PAGE_HEAP_TOP);
1248
1249 page_header_set_ptr(page, page_zip, PAGE_HEAP_TOP,
1250 block + need);
1251 page_dir_set_n_heap(page, page_zip, 1 + *heap_no);
1252
1253 return(block);
1254 }
1255
1256 return(NULL);
1257 }
1258
1259 /***********************************************************//**
1260 Inserts a record next to page cursor on an uncompressed page.
1261 Returns pointer to inserted record if succeed, i.e., enough
1262 space available, NULL otherwise. The cursor stays at the same position.
1263 @return pointer to record if succeed, NULL otherwise */
1264 rec_t*
page_cur_insert_rec_low(rec_t * current_rec,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1265 page_cur_insert_rec_low(
1266 /*====================*/
1267 rec_t* current_rec,/*!< in: pointer to current record after
1268 which the new record is inserted */
1269 dict_index_t* index, /*!< in: record descriptor */
1270 const rec_t* rec, /*!< in: pointer to a physical record */
1271 rec_offs* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1272 mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
1273 {
1274 byte* insert_buf;
1275 ulint rec_size;
1276 page_t* page; /*!< the relevant page */
1277 rec_t* last_insert; /*!< cursor position at previous
1278 insert */
1279 rec_t* free_rec; /*!< a free record that was reused,
1280 or NULL */
1281 rec_t* insert_rec; /*!< inserted record */
1282 ulint heap_no; /*!< heap number of the inserted
1283 record */
1284
1285 ut_ad(rec_offs_validate(rec, index, offsets));
1286
1287 page = page_align(current_rec);
1288 ut_ad(dict_table_is_comp(index->table)
1289 == (ibool) !!page_is_comp(page));
1290 ut_ad(fil_page_index_page_check(page));
1291 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1292 || index->is_dummy
1293 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1294
1295 ut_ad(!page_rec_is_supremum(current_rec));
1296
1297 /* 1. Get the size of the physical record in the page */
1298 rec_size = rec_offs_size(offsets);
1299
1300 #ifdef HAVE_valgrind
1301 {
1302 const void* rec_start __attribute__((unused))
1303 = rec - rec_offs_extra_size(offsets);
1304 ulint extra_size __attribute__((unused))
1305 = rec_offs_extra_size(offsets)
1306 - (rec_offs_comp(offsets)
1307 ? REC_N_NEW_EXTRA_BYTES
1308 : REC_N_OLD_EXTRA_BYTES);
1309
1310 /* All data bytes of the record must be valid. */
1311 MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1312 /* The variable-length header must be valid. */
1313 MEM_CHECK_DEFINED(rec_start, extra_size);
1314 }
1315 #endif /* HAVE_valgrind */
1316
1317 /* 2. Try to find suitable space from page memory management */
1318
1319 free_rec = page_header_get_ptr(page, PAGE_FREE);
1320 if (UNIV_LIKELY_NULL(free_rec)) {
1321 /* Try to allocate from the head of the free list. */
1322 rec_offs foffsets_[REC_OFFS_NORMAL_SIZE];
1323 rec_offs* foffsets = foffsets_;
1324 mem_heap_t* heap = NULL;
1325
1326 rec_offs_init(foffsets_);
1327
1328 foffsets = rec_get_offsets(
1329 free_rec, index, foffsets,
1330 page_is_leaf(page) ? index->n_core_fields : 0,
1331 ULINT_UNDEFINED, &heap);
1332 if (rec_offs_size(foffsets) < rec_size) {
1333 if (UNIV_LIKELY_NULL(heap)) {
1334 mem_heap_free(heap);
1335 }
1336
1337 goto use_heap;
1338 }
1339
1340 insert_buf = free_rec - rec_offs_extra_size(foffsets);
1341
1342 if (page_is_comp(page)) {
1343 heap_no = rec_get_heap_no_new(free_rec);
1344 page_mem_alloc_free(page, NULL,
1345 rec_get_next_ptr(free_rec, TRUE),
1346 rec_size);
1347 } else {
1348 heap_no = rec_get_heap_no_old(free_rec);
1349 page_mem_alloc_free(page, NULL,
1350 rec_get_next_ptr(free_rec, FALSE),
1351 rec_size);
1352 }
1353
1354 if (UNIV_LIKELY_NULL(heap)) {
1355 mem_heap_free(heap);
1356 }
1357 } else {
1358 use_heap:
1359 free_rec = NULL;
1360 insert_buf = page_mem_alloc_heap(page, NULL,
1361 rec_size, &heap_no);
1362
1363 if (UNIV_UNLIKELY(insert_buf == NULL)) {
1364 return(NULL);
1365 }
1366 }
1367
1368 /* 3. Create the record */
1369 insert_rec = rec_copy(insert_buf, rec, offsets);
1370 rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
1371
1372 /* 4. Insert the record in the linked list of records */
1373 ut_ad(current_rec != insert_rec);
1374
1375 {
1376 /* next record after current before the insertion */
1377 rec_t* next_rec = page_rec_get_next(current_rec);
1378 #ifdef UNIV_DEBUG
1379 if (page_is_comp(page)) {
1380 switch (rec_get_status(current_rec)) {
1381 case REC_STATUS_ORDINARY:
1382 case REC_STATUS_NODE_PTR:
1383 case REC_STATUS_INSTANT:
1384 case REC_STATUS_INFIMUM:
1385 break;
1386 case REC_STATUS_SUPREMUM:
1387 ut_ad(!"wrong status on current_rec");
1388 }
1389 switch (rec_get_status(insert_rec)) {
1390 case REC_STATUS_ORDINARY:
1391 case REC_STATUS_NODE_PTR:
1392 case REC_STATUS_INSTANT:
1393 break;
1394 case REC_STATUS_INFIMUM:
1395 case REC_STATUS_SUPREMUM:
1396 ut_ad(!"wrong status on insert_rec");
1397 }
1398 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1399 }
1400 #endif
1401 page_rec_set_next(insert_rec, next_rec);
1402 page_rec_set_next(current_rec, insert_rec);
1403 }
1404
1405 page_header_set_field(page, NULL, PAGE_N_RECS,
1406 1U + page_get_n_recs(page));
1407
1408 /* 5. Set the n_owned field in the inserted record to zero,
1409 and set the heap_no field */
1410 if (page_is_comp(page)) {
1411 rec_set_n_owned_new(insert_rec, NULL, 0);
1412 rec_set_heap_no_new(insert_rec, heap_no);
1413 } else {
1414 rec_set_n_owned_old(insert_rec, 0);
1415 rec_set_heap_no_old(insert_rec, heap_no);
1416 }
1417
1418 MEM_CHECK_DEFINED(rec_get_start(insert_rec, offsets),
1419 rec_offs_size(offsets));
1420 /* 6. Update the last insertion info in page header */
1421
1422 last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1423 ut_ad(!last_insert || !page_is_comp(page)
1424 || rec_get_node_ptr_flag(last_insert)
1425 == rec_get_node_ptr_flag(insert_rec));
1426
1427 if (!dict_index_is_spatial(index)) {
1428 byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
1429 if (UNIV_UNLIKELY(last_insert == NULL)) {
1430 no_direction:
1431 page_direction_reset(ptr, page, NULL);
1432 } else if (last_insert == current_rec
1433 && page_ptr_get_direction(ptr) != PAGE_LEFT) {
1434 page_direction_increment(ptr, page, NULL, PAGE_RIGHT);
1435 } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
1436 && page_rec_get_next(insert_rec) == last_insert) {
1437 page_direction_increment(ptr, page, NULL, PAGE_LEFT);
1438 } else {
1439 goto no_direction;
1440 }
1441 }
1442
1443 page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
1444
1445 /* 7. It remains to update the owner record. */
1446 {
1447 rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1448 ulint n_owned;
1449 if (page_is_comp(page)) {
1450 n_owned = rec_get_n_owned_new(owner_rec);
1451 rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
1452 } else {
1453 n_owned = rec_get_n_owned_old(owner_rec);
1454 rec_set_n_owned_old(owner_rec, n_owned + 1);
1455 }
1456
1457 /* 8. Now we have incremented the n_owned field of the owner
1458 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1459 we have to split the corresponding directory slot in two. */
1460
1461 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1462 page_dir_split_slot(
1463 page, NULL,
1464 page_dir_find_owner_slot(owner_rec));
1465 }
1466 }
1467
1468 /* 9. Write log record of the insert */
1469 if (UNIV_LIKELY(mtr != NULL)) {
1470 page_cur_insert_rec_write_log(insert_rec, rec_size,
1471 current_rec, index, mtr);
1472 }
1473
1474 return(insert_rec);
1475 }
1476
1477 /***********************************************************//**
1478 Inserts a record next to page cursor on a compressed and uncompressed
1479 page. Returns pointer to inserted record if succeed, i.e.,
1480 enough space available, NULL otherwise.
1481 The cursor stays at the same position.
1482
1483 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1484 if this is a compressed leaf page in a secondary index.
1485 This has to be done either within the same mini-transaction,
1486 or by invoking ibuf_reset_free_bits() before mtr_commit().
1487
1488 @return pointer to record if succeed, NULL otherwise */
1489 rec_t*
page_cur_insert_rec_zip(page_cur_t * cursor,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1490 page_cur_insert_rec_zip(
1491 /*====================*/
1492 page_cur_t* cursor, /*!< in/out: page cursor */
1493 dict_index_t* index, /*!< in: record descriptor */
1494 const rec_t* rec, /*!< in: pointer to a physical record */
1495 rec_offs* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1496 mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
1497 {
1498 byte* insert_buf;
1499 ulint rec_size;
1500 page_t* page; /*!< the relevant page */
1501 rec_t* last_insert; /*!< cursor position at previous
1502 insert */
1503 rec_t* free_rec; /*!< a free record that was reused,
1504 or NULL */
1505 rec_t* insert_rec; /*!< inserted record */
1506 ulint heap_no; /*!< heap number of the inserted
1507 record */
1508 page_zip_des_t* page_zip;
1509
1510 page_zip = page_cur_get_page_zip(cursor);
1511 ut_ad(page_zip);
1512
1513 ut_ad(rec_offs_validate(rec, index, offsets));
1514
1515 page = page_cur_get_page(cursor);
1516 ut_ad(dict_table_is_comp(index->table));
1517 ut_ad(page_is_comp(page));
1518 ut_ad(fil_page_index_page_check(page));
1519 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
1520 || index->is_dummy
1521 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
1522 ut_ad(!page_get_instant(page));
1523 ut_ad(!page_cur_is_after_last(cursor));
1524 #ifdef UNIV_ZIP_DEBUG
1525 ut_a(page_zip_validate(page_zip, page, index));
1526 #endif /* UNIV_ZIP_DEBUG */
1527
1528 /* 1. Get the size of the physical record in the page */
1529 rec_size = rec_offs_size(offsets);
1530
1531 #ifdef HAVE_valgrind
1532 {
1533 const void* rec_start __attribute__((unused))
1534 = rec - rec_offs_extra_size(offsets);
1535 ulint extra_size __attribute__((unused))
1536 = rec_offs_extra_size(offsets)
1537 - (rec_offs_comp(offsets)
1538 ? REC_N_NEW_EXTRA_BYTES
1539 : REC_N_OLD_EXTRA_BYTES);
1540
1541 /* All data bytes of the record must be valid. */
1542 MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1543 /* The variable-length header must be valid. */
1544 MEM_CHECK_DEFINED(rec_start, extra_size);
1545 }
1546 #endif /* HAVE_valgrind */
1547
1548 const bool reorg_before_insert = page_has_garbage(page)
1549 && rec_size > page_get_max_insert_size(page, 1)
1550 && rec_size <= page_get_max_insert_size_after_reorganize(
1551 page, 1);
1552
1553 /* 2. Try to find suitable space from page memory management */
1554 if (!page_zip_available(page_zip, dict_index_is_clust(index),
1555 rec_size, 1)
1556 || reorg_before_insert) {
1557 /* The values can change dynamically. */
1558 bool log_compressed = page_zip_log_pages;
1559 ulint level = page_zip_level;
1560 #ifdef UNIV_DEBUG
1561 rec_t* cursor_rec = page_cur_get_rec(cursor);
1562 #endif /* UNIV_DEBUG */
1563
1564 /* If we are not writing compressed page images, we
1565 must reorganize the page before attempting the
1566 insert. */
1567 if (recv_recovery_is_on()) {
1568 /* Insert into the uncompressed page only.
1569 The page reorganization or creation that we
1570 would attempt outside crash recovery would
1571 have been covered by a previous redo log record. */
1572 } else if (page_is_empty(page)) {
1573 ut_ad(page_cur_is_before_first(cursor));
1574
1575 /* This is an empty page. Recreate it to
1576 get rid of the modification log. */
1577 page_create_zip(page_cur_get_block(cursor), index,
1578 page_header_get_field(page, PAGE_LEVEL),
1579 0, mtr);
1580 ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1581
1582 if (page_zip_available(
1583 page_zip, dict_index_is_clust(index),
1584 rec_size, 1)) {
1585 goto use_heap;
1586 }
1587
1588 /* The cursor should remain on the page infimum. */
1589 return(NULL);
1590 } else if (!page_zip->m_nonempty && !page_has_garbage(page)) {
1591 /* The page has been freshly compressed, so
1592 reorganizing it will not help. */
1593 } else if (log_compressed && !reorg_before_insert) {
1594 /* Insert into uncompressed page only, and
1595 try page_zip_reorganize() afterwards. */
1596 } else if (btr_page_reorganize_low(
1597 recv_recovery_is_on(), level,
1598 cursor, index, mtr)) {
1599 ut_ad(!page_header_get_ptr(page, PAGE_FREE));
1600
1601 if (page_zip_available(
1602 page_zip, dict_index_is_clust(index),
1603 rec_size, 1)) {
1604 /* After reorganizing, there is space
1605 available. */
1606 goto use_heap;
1607 }
1608 } else {
1609 ut_ad(cursor->rec == cursor_rec);
1610 return(NULL);
1611 }
1612
1613 /* Try compressing the whole page afterwards. */
1614 insert_rec = page_cur_insert_rec_low(
1615 cursor->rec, index, rec, offsets, NULL);
1616
1617 /* If recovery is on, this implies that the compression
1618 of the page was successful during runtime. Had that not
1619 been the case or had the redo logging of compressed
1620 pages been enabled during runtime then we'd have seen
1621 a MLOG_ZIP_PAGE_COMPRESS redo record. Therefore, we
1622 know that we don't need to reorganize the page. We,
1623 however, do need to recompress the page. That will
1624 happen when the next redo record is read which must
1625 be of type MLOG_ZIP_PAGE_COMPRESS_NO_DATA and it must
1626 contain a valid compression level value.
1627 This implies that during recovery from this point till
1628 the next redo is applied the uncompressed and
1629 compressed versions are not identical and
1630 page_zip_validate will fail but that is OK because
1631 we call page_zip_validate only after processing
1632 all changes to a page under a single mtr during
1633 recovery. */
1634 if (insert_rec == NULL) {
1635 /* Out of space.
1636 This should never occur during crash recovery,
1637 because the MLOG_COMP_REC_INSERT should only
1638 be logged after a successful operation. */
1639 ut_ad(!recv_recovery_is_on());
1640 ut_ad(!index->is_dummy);
1641 } else if (recv_recovery_is_on()) {
1642 /* This should be followed by
1643 MLOG_ZIP_PAGE_COMPRESS_NO_DATA,
1644 which should succeed. */
1645 rec_offs_make_valid(insert_rec, index,
1646 page_is_leaf(page), offsets);
1647 } else {
1648 ulint pos = page_rec_get_n_recs_before(insert_rec);
1649 ut_ad(pos > 0);
1650
1651 if (!log_compressed) {
1652 if (page_zip_compress(
1653 page_zip, page, index,
1654 level, NULL)) {
1655 page_cur_insert_rec_write_log(
1656 insert_rec, rec_size,
1657 cursor->rec, index, mtr);
1658 page_zip_compress_write_log_no_data(
1659 level, page, index, mtr);
1660
1661 rec_offs_make_valid(
1662 insert_rec, index,
1663 page_is_leaf(page), offsets);
1664 return(insert_rec);
1665 }
1666
1667 /* Page compress failed. If this happened on a
1668 leaf page, put the data size into the sample
1669 buffer. */
1670 if (page_is_leaf(page)) {
1671 ulint occupied = page_get_data_size(page)
1672 + page_dir_calc_reserved_space(
1673 page_get_n_recs(page));
1674 index->stat_defrag_data_size_sample[
1675 index->stat_defrag_sample_next_slot] =
1676 occupied;
1677 index->stat_defrag_sample_next_slot =
1678 (index->stat_defrag_sample_next_slot
1679 + 1) % STAT_DEFRAG_DATA_SIZE_N_SAMPLE;
1680 }
1681
1682 ut_ad(cursor->rec
1683 == (pos > 1
1684 ? page_rec_get_nth(
1685 page, pos - 1)
1686 : page + PAGE_NEW_INFIMUM));
1687 } else {
1688 /* We are writing entire page images
1689 to the log. Reduce the redo log volume
1690 by reorganizing the page at the same time. */
1691 if (page_zip_reorganize(
1692 cursor->block, index, mtr)) {
1693 /* The page was reorganized:
1694 Seek to pos. */
1695 if (pos > 1) {
1696 cursor->rec = page_rec_get_nth(
1697 page, pos - 1);
1698 } else {
1699 cursor->rec = page
1700 + PAGE_NEW_INFIMUM;
1701 }
1702
1703 insert_rec = page + rec_get_next_offs(
1704 cursor->rec, TRUE);
1705 rec_offs_make_valid(
1706 insert_rec, index,
1707 page_is_leaf(page), offsets);
1708 return(insert_rec);
1709 }
1710
1711 /* Theoretically, we could try one
1712 last resort of btr_page_reorganize_low()
1713 followed by page_zip_available(), but
1714 that would be very unlikely to
1715 succeed. (If the full reorganized page
1716 failed to compress, why would it
1717 succeed to compress the page, plus log
1718 the insert of this record? */
1719 }
1720
1721 /* Out of space: restore the page */
1722 if (!page_zip_decompress(page_zip, page, FALSE)) {
1723 ut_error; /* Memory corrupted? */
1724 }
1725 ut_ad(page_validate(page, index));
1726 insert_rec = NULL;
1727 }
1728
1729 return(insert_rec);
1730 }
1731
1732 free_rec = page_header_get_ptr(page, PAGE_FREE);
1733 if (UNIV_LIKELY_NULL(free_rec)) {
1734 /* Try to allocate from the head of the free list. */
1735 lint extra_size_diff;
1736 rec_offs foffsets_[REC_OFFS_NORMAL_SIZE];
1737 rec_offs* foffsets = foffsets_;
1738 mem_heap_t* heap = NULL;
1739
1740 rec_offs_init(foffsets_);
1741
1742 foffsets = rec_get_offsets(free_rec, index, foffsets,
1743 page_rec_is_leaf(free_rec)
1744 ? index->n_core_fields : 0,
1745 ULINT_UNDEFINED, &heap);
1746 if (rec_offs_size(foffsets) < rec_size) {
1747 too_small:
1748 if (UNIV_LIKELY_NULL(heap)) {
1749 mem_heap_free(heap);
1750 }
1751
1752 goto use_heap;
1753 }
1754
1755 insert_buf = free_rec - rec_offs_extra_size(foffsets);
1756
1757 /* On compressed pages, do not relocate records from
1758 the free list. If extra_size would grow, use the heap. */
1759 extra_size_diff = lint(rec_offs_extra_size(offsets)
1760 - rec_offs_extra_size(foffsets));
1761
1762 if (UNIV_UNLIKELY(extra_size_diff < 0)) {
1763 /* Add an offset to the extra_size. */
1764 if (rec_offs_size(foffsets)
1765 < rec_size - ulint(extra_size_diff)) {
1766
1767 goto too_small;
1768 }
1769
1770 insert_buf -= extra_size_diff;
1771 } else if (UNIV_UNLIKELY(extra_size_diff)) {
1772 /* Do not allow extra_size to grow */
1773
1774 goto too_small;
1775 }
1776
1777 heap_no = rec_get_heap_no_new(free_rec);
1778 page_mem_alloc_free(page, page_zip,
1779 rec_get_next_ptr(free_rec, TRUE),
1780 rec_size);
1781
1782 if (!page_is_leaf(page)) {
1783 /* Zero out the node pointer of free_rec,
1784 in case it will not be overwritten by
1785 insert_rec. */
1786
1787 ut_ad(rec_size > REC_NODE_PTR_SIZE);
1788
1789 if (rec_offs_extra_size(foffsets)
1790 + rec_offs_data_size(foffsets) > rec_size) {
1791
1792 memset(rec_get_end(free_rec, foffsets)
1793 - REC_NODE_PTR_SIZE, 0,
1794 REC_NODE_PTR_SIZE);
1795 }
1796 } else if (dict_index_is_clust(index)) {
1797 /* Zero out the DB_TRX_ID and DB_ROLL_PTR
1798 columns of free_rec, in case it will not be
1799 overwritten by insert_rec. */
1800
1801 ulint trx_id_offs;
1802 ulint len;
1803
1804 trx_id_offs = rec_get_nth_field_offs(
1805 foffsets, index->db_trx_id(), &len);
1806 ut_ad(len == DATA_TRX_ID_LEN);
1807
1808 if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs
1809 + rec_offs_extra_size(foffsets) > rec_size) {
1810 /* We will have to zero out the
1811 DB_TRX_ID and DB_ROLL_PTR, because
1812 they will not be fully overwritten by
1813 insert_rec. */
1814
1815 memset(free_rec + trx_id_offs, 0,
1816 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1817 }
1818
1819 ut_ad(free_rec + trx_id_offs + DATA_TRX_ID_LEN
1820 == rec_get_nth_field(free_rec, foffsets,
1821 index->db_roll_ptr(), &len));
1822 ut_ad(len == DATA_ROLL_PTR_LEN);
1823 }
1824
1825 if (UNIV_LIKELY_NULL(heap)) {
1826 mem_heap_free(heap);
1827 }
1828 } else {
1829 use_heap:
1830 free_rec = NULL;
1831 insert_buf = page_mem_alloc_heap(page, page_zip,
1832 rec_size, &heap_no);
1833
1834 if (UNIV_UNLIKELY(insert_buf == NULL)) {
1835 return(NULL);
1836 }
1837
1838 page_zip_dir_add_slot(page_zip, dict_index_is_clust(index));
1839 }
1840
1841 /* 3. Create the record */
1842 insert_rec = rec_copy(insert_buf, rec, offsets);
1843 rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
1844
1845 /* 4. Insert the record in the linked list of records */
1846 ut_ad(cursor->rec != insert_rec);
1847
1848 {
1849 /* next record after current before the insertion */
1850 const rec_t* next_rec = page_rec_get_next_low(
1851 cursor->rec, TRUE);
1852 ut_ad(rec_get_status(cursor->rec)
1853 <= REC_STATUS_INFIMUM);
1854 ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1855 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1856
1857 page_rec_set_next(insert_rec, next_rec);
1858 page_rec_set_next(cursor->rec, insert_rec);
1859 }
1860
1861 page_header_set_field(page, page_zip, PAGE_N_RECS,
1862 1U + page_get_n_recs(page));
1863
1864 /* 5. Set the n_owned field in the inserted record to zero,
1865 and set the heap_no field */
1866 rec_set_n_owned_new(insert_rec, NULL, 0);
1867 rec_set_heap_no_new(insert_rec, heap_no);
1868
1869 MEM_CHECK_DEFINED(rec_get_start(insert_rec, offsets),
1870 rec_offs_size(offsets));
1871
1872 page_zip_dir_insert(page_zip, cursor->rec, free_rec, insert_rec);
1873
1874 /* 6. Update the last insertion info in page header */
1875
1876 last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
1877 ut_ad(!last_insert
1878 || rec_get_node_ptr_flag(last_insert)
1879 == rec_get_node_ptr_flag(insert_rec));
1880
1881 if (!dict_index_is_spatial(index)) {
1882 byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
1883 if (UNIV_UNLIKELY(last_insert == NULL)) {
1884 no_direction:
1885 page_direction_reset(ptr, page, page_zip);
1886 } else if (last_insert == cursor->rec
1887 && page_ptr_get_direction(ptr) != PAGE_LEFT) {
1888 page_direction_increment(ptr, page, page_zip,
1889 PAGE_RIGHT);
1890 } else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
1891 && page_rec_get_next(insert_rec) == last_insert) {
1892 page_direction_increment(ptr, page, page_zip,
1893 PAGE_LEFT);
1894 } else {
1895 goto no_direction;
1896 }
1897 }
1898
1899 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, insert_rec);
1900
1901 /* 7. It remains to update the owner record. */
1902 {
1903 rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
1904 ulint n_owned;
1905
1906 n_owned = rec_get_n_owned_new(owner_rec);
1907 rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
1908
1909 /* 8. Now we have incremented the n_owned field of the owner
1910 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
1911 we have to split the corresponding directory slot in two. */
1912
1913 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
1914 page_dir_split_slot(
1915 page, page_zip,
1916 page_dir_find_owner_slot(owner_rec));
1917 }
1918 }
1919
1920 page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
1921
1922 /* 9. Write log record of the insert */
1923 if (UNIV_LIKELY(mtr != NULL)) {
1924 page_cur_insert_rec_write_log(insert_rec, rec_size,
1925 cursor->rec, index, mtr);
1926 }
1927
1928 return(insert_rec);
1929 }
1930
1931 /**********************************************************//**
1932 Writes a log record of copying a record list end to a new created page.
1933 @return 4-byte field where to write the log data length, or NULL if
1934 logging is disabled */
1935 UNIV_INLINE
1936 byte*
page_copy_rec_list_to_created_page_write_log(page_t * page,dict_index_t * index,mtr_t * mtr)1937 page_copy_rec_list_to_created_page_write_log(
1938 /*=========================================*/
1939 page_t* page, /*!< in: index page */
1940 dict_index_t* index, /*!< in: record descriptor */
1941 mtr_t* mtr) /*!< in: mtr */
1942 {
1943 byte* log_ptr;
1944
1945 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
1946 ut_ad(mtr->is_named_space(index->table->space));
1947
1948 log_ptr = mlog_open_and_write_index(mtr, page, index,
1949 page_is_comp(page)
1950 ? MLOG_COMP_LIST_END_COPY_CREATED
1951 : MLOG_LIST_END_COPY_CREATED, 4);
1952 if (UNIV_LIKELY(log_ptr != NULL)) {
1953 mlog_close(mtr, log_ptr + 4);
1954 }
1955
1956 return(log_ptr);
1957 }
1958
1959 /**********************************************************//**
1960 Parses a log record of copying a record list end to a new created page.
1961 @return end of log record or NULL */
1962 byte*
page_parse_copy_rec_list_to_created_page(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)1963 page_parse_copy_rec_list_to_created_page(
1964 /*=====================================*/
1965 byte* ptr, /*!< in: buffer */
1966 byte* end_ptr,/*!< in: buffer end */
1967 buf_block_t* block, /*!< in: page or NULL */
1968 dict_index_t* index, /*!< in: record descriptor */
1969 mtr_t* mtr) /*!< in: mtr or NULL */
1970 {
1971 byte* rec_end;
1972 ulint log_data_len;
1973 page_t* page;
1974 page_zip_des_t* page_zip;
1975
1976 ut_ad(index->is_dummy);
1977
1978 if (ptr + 4 > end_ptr) {
1979
1980 return(NULL);
1981 }
1982
1983 log_data_len = mach_read_from_4(ptr);
1984 ptr += 4;
1985
1986 rec_end = ptr + log_data_len;
1987
1988 if (rec_end > end_ptr) {
1989
1990 return(NULL);
1991 }
1992
1993 if (!block) {
1994
1995 return(rec_end);
1996 }
1997
1998 ut_ad(fil_page_index_page_check(block->frame));
1999 /* This function is never invoked on the clustered index root page,
2000 except in the redo log apply of
2001 page_copy_rec_list_end_to_created_page() which was logged by.
2002 page_copy_rec_list_to_created_page_write_log().
2003 For other pages, this field must be zero-initialized. */
2004 ut_ad(!page_get_instant(block->frame)
2005 || !page_has_siblings(block->frame));
2006
2007 while (ptr < rec_end) {
2008 ptr = page_cur_parse_insert_rec(TRUE, ptr, end_ptr,
2009 block, index, mtr);
2010 }
2011
2012 ut_a(ptr == rec_end);
2013
2014 page = buf_block_get_frame(block);
2015 page_zip = buf_block_get_page_zip(block);
2016
2017 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2018
2019 if (!dict_index_is_spatial(index)) {
2020 page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + page,
2021 page, page_zip);
2022 }
2023
2024 return(rec_end);
2025 }
2026
2027 /*************************************************************//**
2028 Copies records from page to a newly created page, from a given record onward,
2029 including that record. Infimum and supremum records are not copied.
2030
2031 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
2032 if this is a compressed leaf page in a secondary index.
2033 This has to be done either within the same mini-transaction,
2034 or by invoking ibuf_reset_free_bits() before mtr_commit(). */
2035 void
page_copy_rec_list_end_to_created_page(page_t * new_page,rec_t * rec,dict_index_t * index,mtr_t * mtr)2036 page_copy_rec_list_end_to_created_page(
2037 /*===================================*/
2038 page_t* new_page, /*!< in/out: index page to copy to */
2039 rec_t* rec, /*!< in: first record to copy */
2040 dict_index_t* index, /*!< in: record descriptor */
2041 mtr_t* mtr) /*!< in: mtr */
2042 {
2043 page_dir_slot_t* slot = 0; /* remove warning */
2044 byte* heap_top;
2045 rec_t* insert_rec = 0; /* remove warning */
2046 rec_t* prev_rec;
2047 ulint count;
2048 ulint n_recs;
2049 ulint slot_index;
2050 ulint rec_size;
2051 byte* log_ptr;
2052 ulint log_data_len;
2053 mem_heap_t* heap = NULL;
2054 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
2055 rec_offs* offsets = offsets_;
2056 rec_offs_init(offsets_);
2057
2058 ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
2059 ut_ad(page_align(rec) != new_page);
2060 ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
2061 ut_ad(fil_page_index_page_check(new_page));
2062 /* This function is never invoked on the clustered index root page,
2063 except in btr_lift_page_up(). */
2064 ut_ad(!page_get_instant(new_page) || !page_has_siblings(new_page));
2065
2066 if (page_rec_is_infimum(rec)) {
2067
2068 rec = page_rec_get_next(rec);
2069 }
2070
2071 if (page_rec_is_supremum(rec)) {
2072
2073 return;
2074 }
2075
2076 #ifdef UNIV_DEBUG
2077 /* To pass the debug tests we have to set these dummy values
2078 in the debug version */
2079 page_dir_set_n_slots(new_page, NULL, srv_page_size / 2);
2080 page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
2081 new_page + srv_page_size - 1);
2082 #endif
2083 log_ptr = page_copy_rec_list_to_created_page_write_log(new_page,
2084 index, mtr);
2085
2086 log_data_len = mtr->get_log()->size();
2087
2088 /* Individual inserts are logged in a shorter form */
2089
2090 const mtr_log_t log_mode = index->table->is_temporary()
2091 || !index->is_readable() /* IMPORT TABLESPACE */
2092 ? mtr_get_log_mode(mtr)
2093 : mtr_set_log_mode(mtr, MTR_LOG_SHORT_INSERTS);
2094
2095 prev_rec = page_get_infimum_rec(new_page);
2096 if (page_is_comp(new_page)) {
2097 heap_top = new_page + PAGE_NEW_SUPREMUM_END;
2098 } else {
2099 heap_top = new_page + PAGE_OLD_SUPREMUM_END;
2100 }
2101 count = 0;
2102 slot_index = 0;
2103 n_recs = 0;
2104
2105 const ulint n_core = page_is_leaf(new_page)
2106 ? index->n_core_fields : 0;
2107
2108 do {
2109 offsets = rec_get_offsets(rec, index, offsets, n_core,
2110 ULINT_UNDEFINED, &heap);
2111 insert_rec = rec_copy(heap_top, rec, offsets);
2112
2113 if (page_is_comp(new_page)) {
2114 rec_set_next_offs_new(prev_rec,
2115 page_offset(insert_rec));
2116
2117 rec_set_n_owned_new(insert_rec, NULL, 0);
2118 rec_set_heap_no_new(insert_rec,
2119 PAGE_HEAP_NO_USER_LOW + n_recs);
2120 } else {
2121 rec_set_next_offs_old(prev_rec,
2122 page_offset(insert_rec));
2123
2124 rec_set_n_owned_old(insert_rec, 0);
2125 rec_set_heap_no_old(insert_rec,
2126 PAGE_HEAP_NO_USER_LOW + n_recs);
2127 }
2128
2129 count++;
2130 n_recs++;
2131
2132 if (UNIV_UNLIKELY
2133 (count == (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2)) {
2134
2135 slot_index++;
2136
2137 slot = page_dir_get_nth_slot(new_page, slot_index);
2138
2139 page_dir_slot_set_rec(slot, insert_rec);
2140 page_dir_slot_set_n_owned(slot, NULL, count);
2141
2142 count = 0;
2143 }
2144
2145 rec_size = rec_offs_size(offsets);
2146
2147 ut_ad(heap_top < new_page + srv_page_size);
2148
2149 heap_top += rec_size;
2150
2151 rec_offs_make_valid(insert_rec, index, n_core != 0, offsets);
2152 page_cur_insert_rec_write_log(insert_rec, rec_size, prev_rec,
2153 index, mtr);
2154 prev_rec = insert_rec;
2155 rec = page_rec_get_next(rec);
2156 } while (!page_rec_is_supremum(rec));
2157
2158 ut_ad(n_recs);
2159
2160 if ((slot_index > 0) && (count + 1
2161 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
2162 <= PAGE_DIR_SLOT_MAX_N_OWNED)) {
2163 /* We can merge the two last dir slots. This operation is
2164 here to make this function imitate exactly the equivalent
2165 task made using page_cur_insert_rec, which we use in database
2166 recovery to reproduce the task performed by this function.
2167 To be able to check the correctness of recovery, it is good
2168 that it imitates exactly. */
2169
2170 count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
2171
2172 page_dir_slot_set_n_owned(slot, NULL, 0);
2173
2174 slot_index--;
2175 }
2176
2177 if (UNIV_LIKELY_NULL(heap)) {
2178 mem_heap_free(heap);
2179 }
2180
2181 /* Restore the log mode */
2182
2183 mtr_set_log_mode(mtr, log_mode);
2184
2185 log_data_len = mtr->get_log()->size() - log_data_len;
2186
2187 ut_a(log_data_len < 100U << srv_page_size_shift);
2188
2189 if (log_ptr != NULL) {
2190 mach_write_to_4(log_ptr, log_data_len);
2191 }
2192
2193 if (page_is_comp(new_page)) {
2194 rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
2195 } else {
2196 rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
2197 }
2198
2199 slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
2200
2201 page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
2202 page_dir_slot_set_n_owned(slot, NULL, count + 1);
2203
2204 page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
2205 page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
2206 page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
2207 page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
2208
2209 *reinterpret_cast<uint16_t*>(PAGE_HEADER + PAGE_LAST_INSERT + new_page)
2210 = 0;
2211 page_direction_reset(PAGE_HEADER + PAGE_DIRECTION_B + new_page,
2212 new_page, NULL);
2213 }
2214
2215 /***********************************************************//**
2216 Writes log record of a record delete on a page. */
2217 UNIV_INLINE
2218 void
page_cur_delete_rec_write_log(rec_t * rec,const dict_index_t * index,mtr_t * mtr)2219 page_cur_delete_rec_write_log(
2220 /*==========================*/
2221 rec_t* rec, /*!< in: record to be deleted */
2222 const dict_index_t* index, /*!< in: record descriptor */
2223 mtr_t* mtr) /*!< in: mini-transaction handle */
2224 {
2225 byte* log_ptr;
2226
2227 ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
2228 ut_ad(mtr->is_named_space(index->table->space));
2229
2230 log_ptr = mlog_open_and_write_index(mtr, rec, index,
2231 page_rec_is_comp(rec)
2232 ? MLOG_COMP_REC_DELETE
2233 : MLOG_REC_DELETE, 2);
2234
2235 if (!log_ptr) {
2236 /* Logging in mtr is switched off during crash recovery:
2237 in that case mlog_open returns NULL */
2238 return;
2239 }
2240
2241 /* Write the cursor rec offset as a 2-byte ulint */
2242 mach_write_to_2(log_ptr, page_offset(rec));
2243
2244 mlog_close(mtr, log_ptr + 2);
2245 }
2246
2247 /***********************************************************//**
2248 Parses log record of a record delete on a page.
2249 @return pointer to record end or NULL */
2250 byte*
page_cur_parse_delete_rec(byte * ptr,byte * end_ptr,buf_block_t * block,dict_index_t * index,mtr_t * mtr)2251 page_cur_parse_delete_rec(
2252 /*======================*/
2253 byte* ptr, /*!< in: buffer */
2254 byte* end_ptr,/*!< in: buffer end */
2255 buf_block_t* block, /*!< in: page or NULL */
2256 dict_index_t* index, /*!< in: record descriptor */
2257 mtr_t* mtr) /*!< in: mtr or NULL */
2258 {
2259 ulint offset;
2260 page_cur_t cursor;
2261
2262 if (end_ptr < ptr + 2) {
2263
2264 return(NULL);
2265 }
2266
2267 /* Read the cursor rec offset as a 2-byte ulint */
2268 offset = mach_read_from_2(ptr);
2269 ptr += 2;
2270
2271 if (UNIV_UNLIKELY(offset >= srv_page_size)) {
2272 recv_sys.found_corrupt_log = true;
2273 return NULL;
2274 }
2275
2276 if (block) {
2277 page_t* page = buf_block_get_frame(block);
2278 mem_heap_t* heap = NULL;
2279 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
2280 rec_t* rec = page + offset;
2281 rec_offs_init(offsets_);
2282
2283 page_cur_position(rec, block, &cursor);
2284 ut_ad(!buf_block_get_page_zip(block) || page_is_comp(page));
2285
2286 page_cur_delete_rec(&cursor, index,
2287 rec_get_offsets(rec, index, offsets_,
2288 page_rec_is_leaf(rec)
2289 ? index->n_core_fields : 0,
2290 ULINT_UNDEFINED, &heap),
2291 mtr);
2292 if (UNIV_LIKELY_NULL(heap)) {
2293 mem_heap_free(heap);
2294 }
2295 }
2296
2297 return(ptr);
2298 }
2299
2300 /***********************************************************//**
2301 Deletes a record at the page cursor. The cursor is moved to the next
2302 record after the deleted one. */
2303 void
page_cur_delete_rec(page_cur_t * cursor,const dict_index_t * index,const rec_offs * offsets,mtr_t * mtr)2304 page_cur_delete_rec(
2305 /*================*/
2306 page_cur_t* cursor, /*!< in/out: a page cursor */
2307 const dict_index_t* index, /*!< in: record descriptor */
2308 const rec_offs* offsets,/*!< in: rec_get_offsets(
2309 cursor->rec, index) */
2310 mtr_t* mtr) /*!< in: mini-transaction handle
2311 or NULL */
2312 {
2313 page_dir_slot_t* cur_dir_slot;
2314 page_dir_slot_t* prev_slot;
2315 page_t* page;
2316 page_zip_des_t* page_zip;
2317 rec_t* current_rec;
2318 rec_t* prev_rec = NULL;
2319 rec_t* next_rec;
2320 ulint cur_slot_no;
2321 ulint cur_n_owned;
2322 rec_t* rec;
2323
2324 page = page_cur_get_page(cursor);
2325 page_zip = page_cur_get_page_zip(cursor);
2326
2327 /* page_zip_validate() will fail here when
2328 btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
2329 Then, both "page_zip" and "page" would have the min-rec-mark
2330 set on the smallest user record, but "page" would additionally
2331 have it set on the smallest-but-one record. Because sloppy
2332 page_zip_validate_low() only ignores min-rec-flag differences
2333 in the smallest user record, it cannot be used here either. */
2334
2335 current_rec = cursor->rec;
2336 ut_ad(rec_offs_validate(current_rec, index, offsets));
2337 ut_ad(!!page_is_comp(page) == dict_table_is_comp(index->table));
2338 ut_ad(fil_page_index_page_check(page));
2339 ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
2340 || index->is_dummy
2341 || (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
2342 ut_ad(!mtr || mtr->is_named_space(index->table->space));
2343
2344 /* The record must not be the supremum or infimum record. */
2345 ut_ad(page_rec_is_user_rec(current_rec));
2346
2347 if (page_get_n_recs(page) == 1 && !recv_recovery_is_on()
2348 && !rec_is_alter_metadata(current_rec, *index)) {
2349 /* Empty the page, unless we are applying the redo log
2350 during crash recovery. During normal operation, the
2351 page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
2352 MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
2353 ut_ad(page_is_leaf(page));
2354 /* Usually, this should be the root page,
2355 and the whole index tree should become empty.
2356 However, this could also be a call in
2357 btr_cur_pessimistic_update() to delete the only
2358 record in the page and to insert another one. */
2359 page_cur_move_to_next(cursor);
2360 ut_ad(page_cur_is_after_last(cursor));
2361 page_create_empty(page_cur_get_block(cursor),
2362 const_cast<dict_index_t*>(index), mtr);
2363 return;
2364 }
2365
2366 /* Save to local variables some data associated with current_rec */
2367 cur_slot_no = page_dir_find_owner_slot(current_rec);
2368 ut_ad(cur_slot_no > 0);
2369 cur_dir_slot = page_dir_get_nth_slot(page, cur_slot_no);
2370 cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
2371
2372 /* 0. Write the log record */
2373 if (mtr != 0) {
2374 page_cur_delete_rec_write_log(current_rec, index, mtr);
2375 }
2376
2377 /* 1. Reset the last insert info in the page header and increment
2378 the modify clock for the frame */
2379
2380 page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
2381
2382 /* The page gets invalid for optimistic searches: increment the
2383 frame modify clock only if there is an mini-transaction covering
2384 the change. During IMPORT we allocate local blocks that are not
2385 part of the buffer pool. */
2386
2387 if (mtr != 0) {
2388 buf_block_modify_clock_inc(page_cur_get_block(cursor));
2389 }
2390
2391 /* 2. Find the next and the previous record. Note that the cursor is
2392 left at the next record. */
2393
2394 ut_ad(cur_slot_no > 0);
2395 prev_slot = page_dir_get_nth_slot(page, cur_slot_no - 1);
2396
2397 rec = (rec_t*) page_dir_slot_get_rec(prev_slot);
2398
2399 /* rec now points to the record of the previous directory slot. Look
2400 for the immediate predecessor of current_rec in a loop. */
2401
2402 while (current_rec != rec) {
2403 prev_rec = rec;
2404 rec = page_rec_get_next(rec);
2405 }
2406
2407 page_cur_move_to_next(cursor);
2408 next_rec = cursor->rec;
2409
2410 /* 3. Remove the record from the linked list of records */
2411
2412 page_rec_set_next(prev_rec, next_rec);
2413
2414 /* 4. If the deleted record is pointed to by a dir slot, update the
2415 record pointer in slot. In the following if-clause we assume that
2416 prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
2417 >= 2. */
2418
2419 compile_time_assert(PAGE_DIR_SLOT_MIN_N_OWNED >= 2);
2420 ut_ad(cur_n_owned > 1);
2421
2422 if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
2423 page_dir_slot_set_rec(cur_dir_slot, prev_rec);
2424 }
2425
2426 /* 5. Update the number of owned records of the slot */
2427
2428 page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
2429
2430 /* 6. Free the memory occupied by the record */
2431 page_mem_free(page, page_zip, current_rec, index, offsets);
2432
2433 /* 7. Now we have decremented the number of owned records of the slot.
2434 If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
2435 slots. */
2436
2437 if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2438 page_dir_balance_slot(page, page_zip, cur_slot_no);
2439 }
2440 }
2441
2442 #ifdef UNIV_COMPILE_TEST_FUNCS
2443
2444 /*******************************************************************//**
2445 Print the first n numbers, generated by ut_rnd_gen() to make sure
2446 (visually) that it works properly. */
2447 void
test_ut_rnd_gen(int n)2448 test_ut_rnd_gen(
2449 int n) /*!< in: print first n numbers */
2450 {
2451 int i;
2452 unsigned long long rnd;
2453
2454 for (i = 0; i < n; i++) {
2455 rnd = ut_rnd_gen();
2456 printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
2457 rnd,
2458 rnd % 2,
2459 rnd % 3,
2460 rnd % 5,
2461 rnd % 7,
2462 rnd % 11);
2463 }
2464 }
2465
2466 #endif /* UNIV_COMPILE_TEST_FUNCS */
2467