1 /*****************************************************************************
2
3 Copyright (c) 1994, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2012, Facebook Inc.
5 Copyright (c) 2018, 2021, MariaDB Corporation.
6
7 This program is free software; you can redistribute it and/or modify it under
8 the terms of the GNU General Public License as published by the Free Software
9 Foundation; version 2 of the License.
10
11 This program is distributed in the hope that it will be useful, but WITHOUT
12 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along with
16 this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
18
19 *****************************************************************************/
20
21 /********************************************************************//**
22 @file page/page0cur.cc
23 The page cursor
24
25 Created 10/4/1994 Heikki Tuuri
26 *************************************************************************/
27
28 #include "page0cur.h"
29 #include "page0zip.h"
30 #include "btr0btr.h"
31 #include "mtr0log.h"
32 #include "log0recv.h"
33 #include "rem0cmp.h"
34 #include "gis0rtree.h"
35
36 #include <algorithm>
37
38 #ifdef BTR_CUR_HASH_ADAPT
39 # ifdef UNIV_SEARCH_PERF_STAT
40 static ulint page_cur_short_succ;
41 # endif /* UNIV_SEARCH_PERF_STAT */
42
43 /** Try a search shortcut based on the last insert.
44 @param[in] block index page
45 @param[in] index index tree
46 @param[in] tuple search key
47 @param[in,out] iup_matched_fields already matched fields in the
48 upper limit record
49 @param[in,out] ilow_matched_fields already matched fields in the
50 lower limit record
51 @param[out] cursor page cursor
52 @return true on success */
53 UNIV_INLINE
54 bool
page_cur_try_search_shortcut(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor)55 page_cur_try_search_shortcut(
56 const buf_block_t* block,
57 const dict_index_t* index,
58 const dtuple_t* tuple,
59 ulint* iup_matched_fields,
60 ulint* ilow_matched_fields,
61 page_cur_t* cursor)
62 {
63 const rec_t* rec;
64 const rec_t* next_rec;
65 ulint low_match;
66 ulint up_match;
67 ibool success = FALSE;
68 const page_t* page = buf_block_get_frame(block);
69 mem_heap_t* heap = NULL;
70 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
71 rec_offs* offsets = offsets_;
72 rec_offs_init(offsets_);
73
74 ut_ad(dtuple_check_typed(tuple));
75 ut_ad(page_is_leaf(page));
76
77 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
78 offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
79 dtuple_get_n_fields(tuple), &heap);
80
81 ut_ad(rec);
82 ut_ad(page_rec_is_user_rec(rec));
83
84 low_match = up_match = std::min(*ilow_matched_fields,
85 *iup_matched_fields);
86
87 if (cmp_dtuple_rec_with_match(tuple, rec, offsets, &low_match) < 0) {
88 goto exit_func;
89 }
90
91 next_rec = page_rec_get_next_const(rec);
92 if (!page_rec_is_supremum(next_rec)) {
93 offsets = rec_get_offsets(next_rec, index, offsets,
94 index->n_core_fields,
95 dtuple_get_n_fields(tuple), &heap);
96
97 if (cmp_dtuple_rec_with_match(tuple, next_rec, offsets,
98 &up_match) >= 0) {
99 goto exit_func;
100 }
101
102 *iup_matched_fields = up_match;
103 }
104
105 page_cur_position(rec, block, cursor);
106
107 *ilow_matched_fields = low_match;
108
109 #ifdef UNIV_SEARCH_PERF_STAT
110 page_cur_short_succ++;
111 #endif
112 success = TRUE;
113 exit_func:
114 if (UNIV_LIKELY_NULL(heap)) {
115 mem_heap_free(heap);
116 }
117 return(success);
118 }
119
120 /** Try a search shortcut based on the last insert.
121 @param[in] block index page
122 @param[in] index index tree
123 @param[in] tuple search key
124 @param[in,out] iup_matched_fields already matched fields in the
125 upper limit record
126 @param[in,out] iup_matched_bytes already matched bytes in the
127 first partially matched field in the upper limit record
128 @param[in,out] ilow_matched_fields already matched fields in the
129 lower limit record
130 @param[in,out] ilow_matched_bytes already matched bytes in the
131 first partially matched field in the lower limit record
132 @param[out] cursor page cursor
133 @return true on success */
134 UNIV_INLINE
135 bool
page_cur_try_search_shortcut_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)136 page_cur_try_search_shortcut_bytes(
137 const buf_block_t* block,
138 const dict_index_t* index,
139 const dtuple_t* tuple,
140 ulint* iup_matched_fields,
141 ulint* iup_matched_bytes,
142 ulint* ilow_matched_fields,
143 ulint* ilow_matched_bytes,
144 page_cur_t* cursor)
145 {
146 const rec_t* rec;
147 const rec_t* next_rec;
148 ulint low_match;
149 ulint low_bytes;
150 ulint up_match;
151 ulint up_bytes;
152 ibool success = FALSE;
153 const page_t* page = buf_block_get_frame(block);
154 mem_heap_t* heap = NULL;
155 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
156 rec_offs* offsets = offsets_;
157 rec_offs_init(offsets_);
158
159 ut_ad(dtuple_check_typed(tuple));
160 ut_ad(page_is_leaf(page));
161
162 rec = page_header_get_ptr(page, PAGE_LAST_INSERT);
163 offsets = rec_get_offsets(rec, index, offsets, index->n_core_fields,
164 dtuple_get_n_fields(tuple), &heap);
165
166 ut_ad(rec);
167 ut_ad(page_rec_is_user_rec(rec));
168 if (ut_pair_cmp(*ilow_matched_fields, *ilow_matched_bytes,
169 *iup_matched_fields, *iup_matched_bytes) < 0) {
170 up_match = low_match = *ilow_matched_fields;
171 up_bytes = low_bytes = *ilow_matched_bytes;
172 } else {
173 up_match = low_match = *iup_matched_fields;
174 up_bytes = low_bytes = *iup_matched_bytes;
175 }
176
177 if (cmp_dtuple_rec_with_match_bytes(
178 tuple, rec, index, offsets, &low_match, &low_bytes) < 0) {
179 goto exit_func;
180 }
181
182 next_rec = page_rec_get_next_const(rec);
183 if (!page_rec_is_supremum(next_rec)) {
184 offsets = rec_get_offsets(next_rec, index, offsets,
185 index->n_core_fields,
186 dtuple_get_n_fields(tuple), &heap);
187
188 if (cmp_dtuple_rec_with_match_bytes(
189 tuple, next_rec, index, offsets,
190 &up_match, &up_bytes)
191 >= 0) {
192 goto exit_func;
193 }
194
195 *iup_matched_fields = up_match;
196 *iup_matched_bytes = up_bytes;
197 }
198
199 page_cur_position(rec, block, cursor);
200
201 *ilow_matched_fields = low_match;
202 *ilow_matched_bytes = low_bytes;
203
204 #ifdef UNIV_SEARCH_PERF_STAT
205 page_cur_short_succ++;
206 #endif
207 success = TRUE;
208 exit_func:
209 if (UNIV_LIKELY_NULL(heap)) {
210 mem_heap_free(heap);
211 }
212 return(success);
213 }
214 #endif /* BTR_CUR_HASH_ADAPT */
215
216 #ifdef PAGE_CUR_LE_OR_EXTENDS
217 /****************************************************************//**
218 Checks if the nth field in a record is a character type field which extends
219 the nth field in tuple, i.e., the field is longer or equal in length and has
220 common first characters.
221 @return TRUE if rec field extends tuple field */
222 static
223 ibool
page_cur_rec_field_extends(const dtuple_t * tuple,const rec_t * rec,const rec_offs * offsets,ulint n)224 page_cur_rec_field_extends(
225 /*=======================*/
226 const dtuple_t* tuple, /*!< in: data tuple */
227 const rec_t* rec, /*!< in: record */
228 const rec_offs* offsets,/*!< in: array returned by rec_get_offsets() */
229 ulint n) /*!< in: compare nth field */
230 {
231 const dtype_t* type;
232 const dfield_t* dfield;
233 const byte* rec_f;
234 ulint rec_f_len;
235
236 ut_ad(rec_offs_validate(rec, NULL, offsets));
237 dfield = dtuple_get_nth_field(tuple, n);
238
239 type = dfield_get_type(dfield);
240
241 rec_f = rec_get_nth_field(rec, offsets, n, &rec_f_len);
242
243 if (type->mtype == DATA_VARCHAR
244 || type->mtype == DATA_CHAR
245 || type->mtype == DATA_FIXBINARY
246 || type->mtype == DATA_BINARY
247 || type->mtype == DATA_BLOB
248 || DATA_GEOMETRY_MTYPE(type->mtype)
249 || type->mtype == DATA_VARMYSQL
250 || type->mtype == DATA_MYSQL) {
251
252 if (dfield_get_len(dfield) != UNIV_SQL_NULL
253 && rec_f_len != UNIV_SQL_NULL
254 && rec_f_len >= dfield_get_len(dfield)
255 && !cmp_data_data(type->mtype, type->prtype,
256 dfield_get_data(dfield),
257 dfield_get_len(dfield),
258 rec_f, dfield_get_len(dfield))) {
259
260 return(TRUE);
261 }
262 }
263
264 return(FALSE);
265 }
266 #endif /* PAGE_CUR_LE_OR_EXTENDS */
267
268 /****************************************************************//**
269 Searches the right position for a page cursor. */
270 void
page_cur_search_with_match(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * ilow_matched_fields,page_cur_t * cursor,rtr_info_t * rtr_info)271 page_cur_search_with_match(
272 /*=======================*/
273 const buf_block_t* block, /*!< in: buffer block */
274 const dict_index_t* index, /*!< in/out: record descriptor */
275 const dtuple_t* tuple, /*!< in: data tuple */
276 page_cur_mode_t mode, /*!< in: PAGE_CUR_L,
277 PAGE_CUR_LE, PAGE_CUR_G, or
278 PAGE_CUR_GE */
279 ulint* iup_matched_fields,
280 /*!< in/out: already matched
281 fields in upper limit record */
282 ulint* ilow_matched_fields,
283 /*!< in/out: already matched
284 fields in lower limit record */
285 page_cur_t* cursor, /*!< out: page cursor */
286 rtr_info_t* rtr_info)/*!< in/out: rtree search stack */
287 {
288 ulint up;
289 ulint low;
290 ulint mid;
291 const page_t* page;
292 const page_dir_slot_t* slot;
293 const rec_t* up_rec;
294 const rec_t* low_rec;
295 const rec_t* mid_rec;
296 ulint up_matched_fields;
297 ulint low_matched_fields;
298 ulint cur_matched_fields;
299 int cmp;
300 #ifdef UNIV_ZIP_DEBUG
301 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
302 #endif /* UNIV_ZIP_DEBUG */
303 mem_heap_t* heap = NULL;
304 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
305 rec_offs* offsets = offsets_;
306 rec_offs_init(offsets_);
307
308 ut_ad(dtuple_validate(tuple));
309 #ifdef UNIV_DEBUG
310 # ifdef PAGE_CUR_DBG
311 if (mode != PAGE_CUR_DBG)
312 # endif /* PAGE_CUR_DBG */
313 # ifdef PAGE_CUR_LE_OR_EXTENDS
314 if (mode != PAGE_CUR_LE_OR_EXTENDS)
315 # endif /* PAGE_CUR_LE_OR_EXTENDS */
316 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
317 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE
318 || dict_index_is_spatial(index));
319 #endif /* UNIV_DEBUG */
320 page = buf_block_get_frame(block);
321 #ifdef UNIV_ZIP_DEBUG
322 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
323 #endif /* UNIV_ZIP_DEBUG */
324
325 ut_d(page_check_dir(page));
326 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
327
328 #ifdef BTR_CUR_HASH_ADAPT
329 if (n_core
330 && page_get_direction(page) == PAGE_RIGHT
331 && page_header_get_offs(page, PAGE_LAST_INSERT)
332 && mode == PAGE_CUR_LE
333 && !index->is_spatial()
334 && page_header_get_field(page, PAGE_N_DIRECTION) > 3
335 && page_cur_try_search_shortcut(
336 block, index, tuple,
337 iup_matched_fields, ilow_matched_fields, cursor)) {
338 return;
339 }
340 # ifdef PAGE_CUR_DBG
341 if (mode == PAGE_CUR_DBG) {
342 mode = PAGE_CUR_LE;
343 }
344 # endif
345 #endif /* BTR_CUR_HASH_ADAPT */
346
347 /* If the mode is for R-tree indexes, use the special MBR
348 related compare functions */
349 if (index->is_spatial() && mode > PAGE_CUR_LE) {
350 /* For leaf level insert, we still use the traditional
351 compare function for now */
352 if (mode == PAGE_CUR_RTREE_INSERT && n_core) {
353 mode = PAGE_CUR_LE;
354 } else {
355 rtr_cur_search_with_match(
356 block, (dict_index_t*)index, tuple, mode,
357 cursor, rtr_info);
358 return;
359 }
360 }
361
362 /* The following flag does not work for non-latin1 char sets because
363 cmp_full_field does not tell how many bytes matched */
364 #ifdef PAGE_CUR_LE_OR_EXTENDS
365 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
366 #endif /* PAGE_CUR_LE_OR_EXTENDS */
367
368 /* If mode PAGE_CUR_G is specified, we are trying to position the
369 cursor to answer a query of the form "tuple < X", where tuple is
370 the input parameter, and X denotes an arbitrary physical record on
371 the page. We want to position the cursor on the first X which
372 satisfies the condition. */
373
374 up_matched_fields = *iup_matched_fields;
375 low_matched_fields = *ilow_matched_fields;
376
377 /* Perform binary search. First the search is done through the page
378 directory, after that as a linear search in the list of records
379 owned by the upper limit directory slot. */
380
381 low = 0;
382 up = ulint(page_dir_get_n_slots(page)) - 1;
383
384 /* Perform binary search until the lower and upper limit directory
385 slots come to the distance 1 of each other */
386
387 while (up - low > 1) {
388 mid = (low + up) / 2;
389 slot = page_dir_get_nth_slot(page, mid);
390 mid_rec = page_dir_slot_get_rec(slot);
391
392 cur_matched_fields = std::min(low_matched_fields,
393 up_matched_fields);
394
395 offsets = offsets_;
396 offsets = rec_get_offsets(
397 mid_rec, index, offsets, n_core,
398 dtuple_get_n_fields_cmp(tuple), &heap);
399
400 cmp = cmp_dtuple_rec_with_match(
401 tuple, mid_rec, offsets, &cur_matched_fields);
402
403 if (cmp > 0) {
404 low_slot_match:
405 low = mid;
406 low_matched_fields = cur_matched_fields;
407
408 } else if (cmp) {
409 #ifdef PAGE_CUR_LE_OR_EXTENDS
410 if (mode == PAGE_CUR_LE_OR_EXTENDS
411 && page_cur_rec_field_extends(
412 tuple, mid_rec, offsets,
413 cur_matched_fields)) {
414
415 goto low_slot_match;
416 }
417 #endif /* PAGE_CUR_LE_OR_EXTENDS */
418 up_slot_match:
419 up = mid;
420 up_matched_fields = cur_matched_fields;
421
422 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
423 #ifdef PAGE_CUR_LE_OR_EXTENDS
424 || mode == PAGE_CUR_LE_OR_EXTENDS
425 #endif /* PAGE_CUR_LE_OR_EXTENDS */
426 ) {
427 goto low_slot_match;
428 } else {
429
430 goto up_slot_match;
431 }
432 }
433
434 slot = page_dir_get_nth_slot(page, low);
435 low_rec = page_dir_slot_get_rec(slot);
436 slot = page_dir_get_nth_slot(page, up);
437 up_rec = page_dir_slot_get_rec(slot);
438
439 /* Perform linear search until the upper and lower records come to
440 distance 1 of each other. */
441
442 while (page_rec_get_next_const(low_rec) != up_rec) {
443
444 mid_rec = page_rec_get_next_const(low_rec);
445
446 cur_matched_fields = std::min(low_matched_fields,
447 up_matched_fields);
448
449 offsets = offsets_;
450 offsets = rec_get_offsets(
451 mid_rec, index, offsets, n_core,
452 dtuple_get_n_fields_cmp(tuple), &heap);
453
454 cmp = cmp_dtuple_rec_with_match(
455 tuple, mid_rec, offsets, &cur_matched_fields);
456
457 if (cmp > 0) {
458 low_rec_match:
459 low_rec = mid_rec;
460 low_matched_fields = cur_matched_fields;
461
462 } else if (cmp) {
463 #ifdef PAGE_CUR_LE_OR_EXTENDS
464 if (mode == PAGE_CUR_LE_OR_EXTENDS
465 && page_cur_rec_field_extends(
466 tuple, mid_rec, offsets,
467 cur_matched_fields)) {
468
469 goto low_rec_match;
470 }
471 #endif /* PAGE_CUR_LE_OR_EXTENDS */
472 up_rec_match:
473 up_rec = mid_rec;
474 up_matched_fields = cur_matched_fields;
475 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
476 #ifdef PAGE_CUR_LE_OR_EXTENDS
477 || mode == PAGE_CUR_LE_OR_EXTENDS
478 #endif /* PAGE_CUR_LE_OR_EXTENDS */
479 ) {
480 if (!cmp && !cur_matched_fields) {
481 #ifdef UNIV_DEBUG
482 mtr_t mtr;
483 mtr_start(&mtr);
484
485 /* We got a match, but cur_matched_fields is
486 0, it must have REC_INFO_MIN_REC_FLAG */
487 ulint rec_info = rec_get_info_bits(mid_rec,
488 rec_offs_comp(offsets));
489 ut_ad(rec_info & REC_INFO_MIN_REC_FLAG);
490 ut_ad(!page_has_prev(page));
491 mtr_commit(&mtr);
492 #endif
493
494 cur_matched_fields = dtuple_get_n_fields_cmp(tuple);
495 }
496
497 goto low_rec_match;
498 } else {
499
500 goto up_rec_match;
501 }
502 }
503
504 if (mode <= PAGE_CUR_GE) {
505 page_cur_position(up_rec, block, cursor);
506 } else {
507 page_cur_position(low_rec, block, cursor);
508 }
509
510 *iup_matched_fields = up_matched_fields;
511 *ilow_matched_fields = low_matched_fields;
512 if (UNIV_LIKELY_NULL(heap)) {
513 mem_heap_free(heap);
514 }
515 }
516
517 #ifdef BTR_CUR_HASH_ADAPT
518 /** Search the right position for a page cursor.
519 @param[in] block buffer block
520 @param[in] index index tree
521 @param[in] tuple key to be searched for
522 @param[in] mode search mode
523 @param[in,out] iup_matched_fields already matched fields in the
524 upper limit record
525 @param[in,out] iup_matched_bytes already matched bytes in the
526 first partially matched field in the upper limit record
527 @param[in,out] ilow_matched_fields already matched fields in the
528 lower limit record
529 @param[in,out] ilow_matched_bytes already matched bytes in the
530 first partially matched field in the lower limit record
531 @param[out] cursor page cursor */
532 void
page_cur_search_with_match_bytes(const buf_block_t * block,const dict_index_t * index,const dtuple_t * tuple,page_cur_mode_t mode,ulint * iup_matched_fields,ulint * iup_matched_bytes,ulint * ilow_matched_fields,ulint * ilow_matched_bytes,page_cur_t * cursor)533 page_cur_search_with_match_bytes(
534 const buf_block_t* block,
535 const dict_index_t* index,
536 const dtuple_t* tuple,
537 page_cur_mode_t mode,
538 ulint* iup_matched_fields,
539 ulint* iup_matched_bytes,
540 ulint* ilow_matched_fields,
541 ulint* ilow_matched_bytes,
542 page_cur_t* cursor)
543 {
544 ulint up;
545 ulint low;
546 ulint mid;
547 const page_t* page;
548 const page_dir_slot_t* slot;
549 const rec_t* up_rec;
550 const rec_t* low_rec;
551 const rec_t* mid_rec;
552 ulint up_matched_fields;
553 ulint up_matched_bytes;
554 ulint low_matched_fields;
555 ulint low_matched_bytes;
556 ulint cur_matched_fields;
557 ulint cur_matched_bytes;
558 int cmp;
559 #ifdef UNIV_ZIP_DEBUG
560 const page_zip_des_t* page_zip = buf_block_get_page_zip(block);
561 #endif /* UNIV_ZIP_DEBUG */
562 mem_heap_t* heap = NULL;
563 rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
564 rec_offs* offsets = offsets_;
565 rec_offs_init(offsets_);
566
567 ut_ad(dtuple_validate(tuple));
568 ut_ad(!(tuple->info_bits & REC_INFO_MIN_REC_FLAG));
569 #ifdef UNIV_DEBUG
570 # ifdef PAGE_CUR_DBG
571 if (mode != PAGE_CUR_DBG)
572 # endif /* PAGE_CUR_DBG */
573 # ifdef PAGE_CUR_LE_OR_EXTENDS
574 if (mode != PAGE_CUR_LE_OR_EXTENDS)
575 # endif /* PAGE_CUR_LE_OR_EXTENDS */
576 ut_ad(mode == PAGE_CUR_L || mode == PAGE_CUR_LE
577 || mode == PAGE_CUR_G || mode == PAGE_CUR_GE);
578 #endif /* UNIV_DEBUG */
579 page = buf_block_get_frame(block);
580 #ifdef UNIV_ZIP_DEBUG
581 ut_a(!page_zip || page_zip_validate(page_zip, page, index));
582 #endif /* UNIV_ZIP_DEBUG */
583
584 ut_d(page_check_dir(page));
585
586 #ifdef BTR_CUR_HASH_ADAPT
587 if (page_is_leaf(page)
588 && page_get_direction(page) == PAGE_RIGHT
589 && page_header_get_offs(page, PAGE_LAST_INSERT)
590 && mode == PAGE_CUR_LE
591 && page_header_get_field(page, PAGE_N_DIRECTION) > 3
592 && page_cur_try_search_shortcut_bytes(
593 block, index, tuple,
594 iup_matched_fields, iup_matched_bytes,
595 ilow_matched_fields, ilow_matched_bytes,
596 cursor)) {
597 return;
598 }
599 # ifdef PAGE_CUR_DBG
600 if (mode == PAGE_CUR_DBG) {
601 mode = PAGE_CUR_LE;
602 }
603 # endif
604 #endif /* BTR_CUR_HASH_ADAPT */
605
606 /* The following flag does not work for non-latin1 char sets because
607 cmp_full_field does not tell how many bytes matched */
608 #ifdef PAGE_CUR_LE_OR_EXTENDS
609 ut_a(mode != PAGE_CUR_LE_OR_EXTENDS);
610 #endif /* PAGE_CUR_LE_OR_EXTENDS */
611
612 /* If mode PAGE_CUR_G is specified, we are trying to position the
613 cursor to answer a query of the form "tuple < X", where tuple is
614 the input parameter, and X denotes an arbitrary physical record on
615 the page. We want to position the cursor on the first X which
616 satisfies the condition. */
617
618 up_matched_fields = *iup_matched_fields;
619 up_matched_bytes = *iup_matched_bytes;
620 low_matched_fields = *ilow_matched_fields;
621 low_matched_bytes = *ilow_matched_bytes;
622
623 /* Perform binary search. First the search is done through the page
624 directory, after that as a linear search in the list of records
625 owned by the upper limit directory slot. */
626
627 low = 0;
628 up = ulint(page_dir_get_n_slots(page)) - 1;
629
630 /* Perform binary search until the lower and upper limit directory
631 slots come to the distance 1 of each other */
632 const ulint n_core = page_is_leaf(page) ? index->n_core_fields : 0;
633
634 while (up - low > 1) {
635 mid = (low + up) / 2;
636 slot = page_dir_get_nth_slot(page, mid);
637 mid_rec = page_dir_slot_get_rec(slot);
638
639 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
640 low_matched_fields, low_matched_bytes,
641 up_matched_fields, up_matched_bytes);
642
643 offsets = rec_get_offsets(
644 mid_rec, index, offsets_, n_core,
645 dtuple_get_n_fields_cmp(tuple), &heap);
646
647 cmp = cmp_dtuple_rec_with_match_bytes(
648 tuple, mid_rec, index, offsets,
649 &cur_matched_fields, &cur_matched_bytes);
650
651 if (cmp > 0) {
652 low_slot_match:
653 low = mid;
654 low_matched_fields = cur_matched_fields;
655 low_matched_bytes = cur_matched_bytes;
656
657 } else if (cmp) {
658 #ifdef PAGE_CUR_LE_OR_EXTENDS
659 if (mode == PAGE_CUR_LE_OR_EXTENDS
660 && page_cur_rec_field_extends(
661 tuple, mid_rec, offsets,
662 cur_matched_fields)) {
663
664 goto low_slot_match;
665 }
666 #endif /* PAGE_CUR_LE_OR_EXTENDS */
667 up_slot_match:
668 up = mid;
669 up_matched_fields = cur_matched_fields;
670 up_matched_bytes = cur_matched_bytes;
671
672 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
673 #ifdef PAGE_CUR_LE_OR_EXTENDS
674 || mode == PAGE_CUR_LE_OR_EXTENDS
675 #endif /* PAGE_CUR_LE_OR_EXTENDS */
676 ) {
677 goto low_slot_match;
678 } else {
679
680 goto up_slot_match;
681 }
682 }
683
684 slot = page_dir_get_nth_slot(page, low);
685 low_rec = page_dir_slot_get_rec(slot);
686 slot = page_dir_get_nth_slot(page, up);
687 up_rec = page_dir_slot_get_rec(slot);
688
689 /* Perform linear search until the upper and lower records come to
690 distance 1 of each other. */
691
692 while (page_rec_get_next_const(low_rec) != up_rec) {
693
694 mid_rec = page_rec_get_next_const(low_rec);
695
696 ut_pair_min(&cur_matched_fields, &cur_matched_bytes,
697 low_matched_fields, low_matched_bytes,
698 up_matched_fields, up_matched_bytes);
699
700 if (UNIV_UNLIKELY(rec_get_info_bits(
701 mid_rec,
702 dict_table_is_comp(index->table))
703 & REC_INFO_MIN_REC_FLAG)) {
704 ut_ad(!page_has_prev(page_align(mid_rec)));
705 ut_ad(!page_rec_is_leaf(mid_rec)
706 || rec_is_metadata(mid_rec, *index));
707 cmp = 1;
708 goto low_rec_match;
709 }
710
711 offsets = rec_get_offsets(
712 mid_rec, index, offsets_, n_core,
713 dtuple_get_n_fields_cmp(tuple), &heap);
714
715 cmp = cmp_dtuple_rec_with_match_bytes(
716 tuple, mid_rec, index, offsets,
717 &cur_matched_fields, &cur_matched_bytes);
718
719 if (cmp > 0) {
720 low_rec_match:
721 low_rec = mid_rec;
722 low_matched_fields = cur_matched_fields;
723 low_matched_bytes = cur_matched_bytes;
724
725 } else if (cmp) {
726 #ifdef PAGE_CUR_LE_OR_EXTENDS
727 if (mode == PAGE_CUR_LE_OR_EXTENDS
728 && page_cur_rec_field_extends(
729 tuple, mid_rec, offsets,
730 cur_matched_fields)) {
731
732 goto low_rec_match;
733 }
734 #endif /* PAGE_CUR_LE_OR_EXTENDS */
735 up_rec_match:
736 up_rec = mid_rec;
737 up_matched_fields = cur_matched_fields;
738 up_matched_bytes = cur_matched_bytes;
739 } else if (mode == PAGE_CUR_G || mode == PAGE_CUR_LE
740 #ifdef PAGE_CUR_LE_OR_EXTENDS
741 || mode == PAGE_CUR_LE_OR_EXTENDS
742 #endif /* PAGE_CUR_LE_OR_EXTENDS */
743 ) {
744 goto low_rec_match;
745 } else {
746
747 goto up_rec_match;
748 }
749 }
750
751 if (mode <= PAGE_CUR_GE) {
752 page_cur_position(up_rec, block, cursor);
753 } else {
754 page_cur_position(low_rec, block, cursor);
755 }
756
757 *iup_matched_fields = up_matched_fields;
758 *iup_matched_bytes = up_matched_bytes;
759 *ilow_matched_fields = low_matched_fields;
760 *ilow_matched_bytes = low_matched_bytes;
761 if (UNIV_LIKELY_NULL(heap)) {
762 mem_heap_free(heap);
763 }
764 }
765 #endif /* BTR_CUR_HASH_ADAPT */
766
767 /***********************************************************//**
768 Positions a page cursor on a randomly chosen user record on a page. If there
769 are no user records, sets the cursor on the infimum record. */
770 void
page_cur_open_on_rnd_user_rec(buf_block_t * block,page_cur_t * cursor)771 page_cur_open_on_rnd_user_rec(
772 /*==========================*/
773 buf_block_t* block, /*!< in: page */
774 page_cur_t* cursor) /*!< out: page cursor */
775 {
776 const ulint n_recs = page_get_n_recs(block->frame);
777
778 page_cur_set_before_first(block, cursor);
779
780 if (UNIV_UNLIKELY(n_recs == 0)) {
781
782 return;
783 }
784
785 cursor->rec = page_rec_get_nth(block->frame,
786 ut_rnd_interval(n_recs) + 1);
787 }
788
789 /**
790 Set the number of owned records.
791 @param[in,out] rec record in block.frame
792 @param[in] n_owned number of records skipped in the sparse page directory
793 @param[in] comp whether ROW_FORMAT is COMPACT or DYNAMIC */
page_rec_set_n_owned(rec_t * rec,ulint n_owned,bool comp)794 static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp)
795 {
796 rec-= comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED;
797 *rec= static_cast<byte>((*rec & ~REC_N_OWNED_MASK) |
798 (n_owned << REC_N_OWNED_SHIFT));
799 }
800
801 /**
802 Split a directory slot which owns too many records.
803 @param[in,out] block index page
804 @param[in,out] slot the slot that needs to be split */
page_dir_split_slot(const buf_block_t & block,page_dir_slot_t * slot)805 static void page_dir_split_slot(const buf_block_t &block,
806 page_dir_slot_t *slot)
807 {
808 ut_ad(slot <= &block.frame[srv_page_size - PAGE_EMPTY_DIR_START]);
809 slot= my_assume_aligned<2>(slot);
810
811 const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1;
812
813 ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
814 static_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 >=
815 PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility");
816
817 /* Find a record approximately in the middle. */
818 const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE);
819
820 for (ulint i= n_owned / 2; i--; )
821 rec= page_rec_get_next_const(rec);
822
823 /* Add a directory slot immediately below this one. */
824 constexpr uint16_t n_slots_f= PAGE_N_DIR_SLOTS + PAGE_HEADER;
825 byte *n_slots_p= my_assume_aligned<2>(n_slots_f + block.frame);
826 const uint16_t n_slots= mach_read_from_2(n_slots_p);
827
828 page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*>
829 (block.frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) -
830 n_slots * PAGE_DIR_SLOT_SIZE);
831 ut_ad(slot >= last_slot);
832 memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
833 slot - last_slot);
834
835 const ulint half_owned= n_owned / 2;
836
837 mach_write_to_2(n_slots_p, n_slots + 1);
838
839 mach_write_to_2(slot, rec - block.frame);
840 const bool comp= page_is_comp(block.frame) != 0;
841 page_rec_set_n_owned(page_dir_slot_get_rec(slot), half_owned, comp);
842 page_rec_set_n_owned(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE),
843 n_owned - half_owned, comp);
844 }
845
846 /**
847 Split a directory slot which owns too many records.
848 @param[in,out] block index page (ROW_FORMAT=COMPRESSED)
849 @param[in] s the slot that needs to be split
850 @param[in,out] mtr mini-transaction */
page_zip_dir_split_slot(buf_block_t * block,ulint s,mtr_t * mtr)851 static void page_zip_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
852 {
853 ut_ad(block->page.zip.data);
854 ut_ad(page_is_comp(block->frame));
855 ut_ad(s);
856
857 page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, s);
858 const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1;
859
860 ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
861 static_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 >=
862 PAGE_DIR_SLOT_MIN_N_OWNED, "compatibility");
863
864 /* 1. We loop to find a record approximately in the middle of the
865 records owned by the slot. */
866
867 const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE);
868
869 for (ulint i= n_owned / 2; i--; )
870 rec= page_rec_get_next_const(rec);
871
872 /* Add a directory slot immediately below this one. */
873 constexpr uint16_t n_slots_f= PAGE_N_DIR_SLOTS + PAGE_HEADER;
874 byte *n_slots_p= my_assume_aligned<2>(n_slots_f + block->frame);
875 const uint16_t n_slots= mach_read_from_2(n_slots_p);
876
877 page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*>
878 (block->frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) -
879 n_slots * PAGE_DIR_SLOT_SIZE);
880 memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
881 slot - last_slot);
882
883 const ulint half_owned= n_owned / 2;
884
885 mtr->write<2>(*block, n_slots_p, 1U + n_slots);
886
887 /* Log changes to the compressed page header and the dense page directory. */
888 memcpy_aligned<2>(&block->page.zip.data[n_slots_f], n_slots_p, 2);
889 mach_write_to_2(slot, page_offset(rec));
890 page_rec_set_n_owned<true>(block, page_dir_slot_get_rec(slot), half_owned,
891 true, mtr);
892 page_rec_set_n_owned<true>(block,
893 page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE),
894 n_owned - half_owned, true, mtr);
895 }
896
897 /**
898 Try to balance an underfilled directory slot with an adjacent one,
899 so that there are at least the minimum number of records owned by the slot;
900 this may result in merging the two slots.
901 @param[in,out] block ROW_FORMAT=COMPRESSED page
902 @param[in] s the slot to be balanced
903 @param[in,out] mtr mini-transaction */
page_zip_dir_balance_slot(buf_block_t * block,ulint s,mtr_t * mtr)904 static void page_zip_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
905 {
906 ut_ad(block->page.zip.data);
907 ut_ad(page_is_comp(block->frame));
908 ut_ad(s > 0);
909
910 const ulint n_slots = page_dir_get_n_slots(block->frame);
911
912 if (UNIV_UNLIKELY(s + 1 == n_slots)) {
913 /* The last directory slot cannot be balanced. */
914 return;
915 }
916
917 ut_ad(s < n_slots);
918
919 page_dir_slot_t* slot = page_dir_get_nth_slot(block->frame, s);
920 rec_t* const up_rec = const_cast<rec_t*>
921 (page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE));
922 rec_t* const slot_rec = const_cast<rec_t*>
923 (page_dir_slot_get_rec(slot));
924 const ulint up_n_owned = rec_get_n_owned_new(up_rec);
925
926 ut_ad(rec_get_n_owned_new(page_dir_slot_get_rec(slot))
927 == PAGE_DIR_SLOT_MIN_N_OWNED - 1);
928
929 if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
930 compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1
931 <= PAGE_DIR_SLOT_MAX_N_OWNED);
932 /* Merge the slots. */
933 page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr);
934 page_rec_set_n_owned<true>(block, up_rec, up_n_owned
935 + (PAGE_DIR_SLOT_MIN_N_OWNED - 1),
936 true, mtr);
937 /* Shift the slots */
938 page_dir_slot_t* last_slot = page_dir_get_nth_slot(
939 block->frame, n_slots - 1);
940 memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot,
941 slot - last_slot);
942 constexpr uint16_t n_slots_f = PAGE_N_DIR_SLOTS + PAGE_HEADER;
943 byte *n_slots_p= my_assume_aligned<2>
944 (n_slots_f + block->frame);
945 mtr->write<2>(*block, n_slots_p, n_slots - 1);
946 memcpy_aligned<2>(n_slots_f + block->page.zip.data,
947 n_slots_p, 2);
948 memset_aligned<2>(last_slot, 0, 2);
949 return;
950 }
951
952 /* Transfer one record to the underfilled slot */
953 page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr);
954 rec_t* new_rec = rec_get_next_ptr(slot_rec, TRUE);
955 page_rec_set_n_owned<true>(block, new_rec,
956 PAGE_DIR_SLOT_MIN_N_OWNED,
957 true, mtr);
958 mach_write_to_2(slot, page_offset(new_rec));
959 page_rec_set_n_owned(up_rec, up_n_owned - 1, true);
960 }
961
962 /**
963 Try to balance an underfilled directory slot with an adjacent one,
964 so that there are at least the minimum number of records owned by the slot;
965 this may result in merging the two slots.
966 @param[in,out] block index page
967 @param[in] s the slot to be balanced */
page_dir_balance_slot(const buf_block_t & block,ulint s)968 static void page_dir_balance_slot(const buf_block_t &block, ulint s)
969 {
970 const bool comp= page_is_comp(block.frame);
971 ut_ad(!block.page.zip.data);
972 ut_ad(s > 0);
973
974 const ulint n_slots = page_dir_get_n_slots(block.frame);
975
976 if (UNIV_UNLIKELY(s + 1 == n_slots)) {
977 /* The last directory slot cannot be balanced. */
978 return;
979 }
980
981 ut_ad(s < n_slots);
982
983 page_dir_slot_t* slot = page_dir_get_nth_slot(block.frame, s);
984 rec_t* const up_rec = const_cast<rec_t*>
985 (page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE));
986 rec_t* const slot_rec = const_cast<rec_t*>
987 (page_dir_slot_get_rec(slot));
988 const ulint up_n_owned = comp
989 ? rec_get_n_owned_new(up_rec)
990 : rec_get_n_owned_old(up_rec);
991
992 ut_ad(page_dir_slot_get_n_owned(slot)
993 == PAGE_DIR_SLOT_MIN_N_OWNED - 1);
994
995 if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
996 compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1
997 <= PAGE_DIR_SLOT_MAX_N_OWNED);
998 /* Merge the slots. */
999 page_rec_set_n_owned(slot_rec, 0, comp);
1000 page_rec_set_n_owned(up_rec, up_n_owned
1001 + (PAGE_DIR_SLOT_MIN_N_OWNED - 1), comp);
1002 /* Shift the slots */
1003 page_dir_slot_t* last_slot = page_dir_get_nth_slot(
1004 block.frame, n_slots - 1);
1005 memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot,
1006 slot - last_slot);
1007 memset_aligned<2>(last_slot, 0, 2);
1008 constexpr uint16_t n_slots_f = PAGE_N_DIR_SLOTS + PAGE_HEADER;
1009 byte *n_slots_p= my_assume_aligned<2>
1010 (n_slots_f + block.frame);
1011 mach_write_to_2(n_slots_p, n_slots - 1);
1012 return;
1013 }
1014
1015 /* Transfer one record to the underfilled slot */
1016 rec_t* new_rec;
1017
1018 if (comp) {
1019 page_rec_set_n_owned(slot_rec, 0, true);
1020 new_rec = rec_get_next_ptr(slot_rec, TRUE);
1021 page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED, true);
1022 page_rec_set_n_owned(up_rec, up_n_owned - 1, true);
1023 } else {
1024 page_rec_set_n_owned(slot_rec, 0, false);
1025 new_rec = rec_get_next_ptr(slot_rec, FALSE);
1026 page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED,
1027 false);
1028 page_rec_set_n_owned(up_rec, up_n_owned - 1, false);
1029 }
1030
1031 mach_write_to_2(slot, page_offset(new_rec));
1032 }
1033
1034 /** Allocate space for inserting an index record.
1035 @tparam compressed whether to update the ROW_FORMAT=COMPRESSED
1036 @param[in,out] block index page
1037 @param[in] need number of bytes needed
1038 @param[out] heap_no record heap number
1039 @return pointer to the start of the allocated buffer
1040 @retval NULL if allocation fails */
1041 template<bool compressed=false>
page_mem_alloc_heap(buf_block_t * block,ulint need,ulint * heap_no)1042 static byte* page_mem_alloc_heap(buf_block_t *block, ulint need,
1043 ulint *heap_no)
1044 {
1045 ut_ad(!compressed || block->page.zip.data);
1046
1047 byte *heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER +
1048 block->frame);
1049
1050 const uint16_t top= mach_read_from_2(heap_top);
1051
1052 if (need > page_get_max_insert_size(block->frame, 1))
1053 return NULL;
1054
1055 byte *n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER + block->frame);
1056
1057 const uint16_t h= mach_read_from_2(n_heap);
1058 if (UNIV_UNLIKELY((h + 1) & 0x6000))
1059 {
1060 /* At the minimum record size of 5+2 bytes, we can only reach this
1061 condition when using innodb_page_size=64k. */
1062 ut_ad((h & 0x7fff) == 8191);
1063 ut_ad(srv_page_size == 65536);
1064 return NULL;
1065 }
1066
1067 *heap_no= h & 0x7fff;
1068 ut_ad(*heap_no < srv_page_size / REC_N_NEW_EXTRA_BYTES);
1069 compile_time_assert(UNIV_PAGE_SIZE_MAX / REC_N_NEW_EXTRA_BYTES < 0x3fff);
1070
1071 mach_write_to_2(heap_top, top + need);
1072 mach_write_to_2(n_heap, h + 1);
1073
1074 if (compressed)
1075 {
1076 ut_ad(h & 0x8000);
1077 memcpy_aligned<4>(&block->page.zip.data[PAGE_HEAP_TOP + PAGE_HEADER],
1078 heap_top, 4);
1079 }
1080
1081 return &block->frame[top];
1082 }
1083
1084 /** Write log for inserting a B-tree or R-tree record in
1085 ROW_FORMAT=REDUNDANT.
1086 @param block B-tree or R-tree page
1087 @param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
1088 @param prev_rec byte offset of the predecessor of the record to insert,
1089 starting from PAGE_OLD_INFIMUM
1090 @param info_bits info_bits of the record
1091 @param n_fields_s number of fields << 1 | rec_get_1byte_offs_flag()
1092 @param hdr_c number of common record header bytes with prev_rec
1093 @param data_c number of common data bytes with prev_rec
1094 @param hdr record header bytes to copy to the log
1095 @param hdr_l number of copied record header bytes
1096 @param data record payload bytes to copy to the log
1097 @param data_l number of copied record data bytes */
page_insert(const buf_block_t & block,bool reuse,ulint prev_rec,byte info_bits,ulint n_fields_s,size_t hdr_c,size_t data_c,const byte * hdr,size_t hdr_l,const byte * data,size_t data_l)1098 inline void mtr_t::page_insert(const buf_block_t &block, bool reuse,
1099 ulint prev_rec, byte info_bits,
1100 ulint n_fields_s, size_t hdr_c, size_t data_c,
1101 const byte *hdr, size_t hdr_l,
1102 const byte *data, size_t data_l)
1103 {
1104 ut_ad(!block.page.zip.data);
1105 ut_ad(m_log_mode == MTR_LOG_ALL);
1106 ut_d(ulint n_slots= page_dir_get_n_slots(block.frame));
1107 ut_ad(n_slots >= 2);
1108 ut_d(const byte *page_end= page_dir_get_nth_slot(block.frame, n_slots - 1));
1109 ut_ad(&block.frame[prev_rec + PAGE_OLD_INFIMUM] <= page_end);
1110 ut_ad(block.frame + page_header_get_offs(block.frame, PAGE_HEAP_TOP) <=
1111 page_end);
1112 ut_ad(fil_page_index_page_check(block.frame));
1113 ut_ad(!(~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG) & info_bits));
1114 ut_ad(n_fields_s >= 2);
1115 ut_ad((n_fields_s >> 1) <= REC_MAX_N_FIELDS);
1116 ut_ad(data_l + data_c <= REDUNDANT_REC_MAX_DATA_SIZE);
1117
1118 set_modified(block);
1119
1120 static_assert(REC_INFO_MIN_REC_FLAG == 0x10, "compatibility");
1121 static_assert(REC_INFO_DELETED_FLAG == 0x20, "compatibility");
1122 n_fields_s= (n_fields_s - 2) << 2 | info_bits >> 4;
1123
1124 size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4;
1125 static_assert((REC_MAX_N_FIELDS << 1 | 1) <= MIN_3BYTE, "compatibility");
1126 len+= n_fields_s < MIN_2BYTE ? 1 : 2;
1127 len+= hdr_c < MIN_2BYTE ? 1 : 2;
1128 static_assert(REDUNDANT_REC_MAX_DATA_SIZE <= MIN_3BYTE, "compatibility");
1129 len+= data_c < MIN_2BYTE ? 1 : 2;
1130 len+= hdr_l + data_l;
1131
1132 const bool small= len < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5);
1133 byte *l= log_write<EXTENDED>(block.page.id(), &block.page, len, small);
1134
1135 if (UNIV_LIKELY(small))
1136 {
1137 ut_d(const byte * const end = l + len);
1138 *l++= reuse ? INSERT_REUSE_REDUNDANT : INSERT_HEAP_REDUNDANT;
1139 l= mlog_encode_varint(l, prev_rec);
1140 l= mlog_encode_varint(l, n_fields_s);
1141 l= mlog_encode_varint(l, hdr_c);
1142 l= mlog_encode_varint(l, data_c);
1143 ::memcpy(l, hdr, hdr_l);
1144 l+= hdr_l;
1145 ::memcpy(l, data, data_l);
1146 l+= data_l;
1147 ut_ad(end == l);
1148 m_log.close(l);
1149 }
1150 else
1151 {
1152 m_log.close(l);
1153 l= m_log.open(len - hdr_l - data_l);
1154 ut_d(const byte * const end = l + len - hdr_l - data_l);
1155 *l++= reuse ? INSERT_REUSE_REDUNDANT : INSERT_HEAP_REDUNDANT;
1156 l= mlog_encode_varint(l, prev_rec);
1157 l= mlog_encode_varint(l, n_fields_s);
1158 l= mlog_encode_varint(l, hdr_c);
1159 l= mlog_encode_varint(l, data_c);
1160 ut_ad(end == l);
1161 m_log.close(l);
1162 m_log.push(hdr, static_cast<uint32_t>(hdr_l));
1163 m_log.push(data, static_cast<uint32_t>(data_l));
1164 }
1165
1166 m_last_offset= FIL_PAGE_TYPE;
1167 }
1168
1169 /** Write log for inserting a B-tree or R-tree record in
1170 ROW_FORMAT=COMPACT or ROW_FORMAT=DYNAMIC.
1171 @param block B-tree or R-tree page
1172 @param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
1173 @param prev_rec byte offset of the predecessor of the record to insert,
1174 starting from PAGE_NEW_INFIMUM
1175 @param info_status rec_get_info_and_status_bits()
1176 @param shift unless !reuse: number of bytes the PAGE_FREE is moving
1177 @param hdr_c number of common record header bytes with prev_rec
1178 @param data_c number of common data bytes with prev_rec
1179 @param hdr record header bytes to copy to the log
1180 @param hdr_l number of copied record header bytes
1181 @param data record payload bytes to copy to the log
1182 @param data_l number of copied record data bytes */
page_insert(const buf_block_t & block,bool reuse,ulint prev_rec,byte info_status,ssize_t shift,size_t hdr_c,size_t data_c,const byte * hdr,size_t hdr_l,const byte * data,size_t data_l)1183 inline void mtr_t::page_insert(const buf_block_t &block, bool reuse,
1184 ulint prev_rec, byte info_status,
1185 ssize_t shift, size_t hdr_c, size_t data_c,
1186 const byte *hdr, size_t hdr_l,
1187 const byte *data, size_t data_l)
1188 {
1189 ut_ad(!block.page.zip.data);
1190 ut_ad(m_log_mode == MTR_LOG_ALL);
1191 ut_d(ulint n_slots= page_dir_get_n_slots(block.frame));
1192 ut_ad(n_slots >= 2);
1193 ut_d(const byte *page_end= page_dir_get_nth_slot(block.frame, n_slots - 1));
1194 ut_ad(&block.frame[prev_rec + PAGE_NEW_INFIMUM] <= page_end);
1195 ut_ad(block.frame + page_header_get_offs(block.frame, PAGE_HEAP_TOP) <=
1196 page_end);
1197 ut_ad(fil_page_index_page_check(block.frame));
1198 ut_ad(hdr_l + hdr_c + data_l + data_c <=
1199 static_cast<size_t>(page_end - &block.frame[PAGE_NEW_SUPREMUM_END]));
1200 ut_ad(reuse || shift == 0);
1201 #ifdef UNIV_DEBUG
1202 switch (~(REC_INFO_MIN_REC_FLAG | REC_INFO_DELETED_FLAG) & info_status) {
1203 default:
1204 ut_ad(0);
1205 break;
1206 case REC_STATUS_NODE_PTR:
1207 ut_ad(!page_is_leaf(block.frame));
1208 break;
1209 case REC_STATUS_INSTANT:
1210 case REC_STATUS_ORDINARY:
1211 ut_ad(page_is_leaf(block.frame));
1212 }
1213 #endif
1214
1215 set_modified(block);
1216
1217 static_assert(REC_INFO_MIN_REC_FLAG == 0x10, "compatibility");
1218 static_assert(REC_INFO_DELETED_FLAG == 0x20, "compatibility");
1219 static_assert(REC_STATUS_INSTANT == 4, "compatibility");
1220
1221 const size_t enc_hdr_l= hdr_l << 3 |
1222 (info_status & REC_STATUS_INSTANT) | info_status >> 4;
1223 size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4;
1224 static_assert(REC_MAX_N_FIELDS * 2 < MIN_3BYTE, "compatibility");
1225 if (reuse)
1226 {
1227 if (shift < 0)
1228 shift= -shift << 1 | 1;
1229 else
1230 shift<<= 1;
1231 len+= static_cast<size_t>(shift) < MIN_2BYTE
1232 ? 1 : static_cast<size_t>(shift) < MIN_3BYTE ? 2 : 3;
1233 }
1234 ut_ad(hdr_c + hdr_l <= REC_MAX_N_FIELDS * 2);
1235 len+= hdr_c < MIN_2BYTE ? 1 : 2;
1236 len+= enc_hdr_l < MIN_2BYTE ? 1 : enc_hdr_l < MIN_3BYTE ? 2 : 3;
1237 len+= data_c < MIN_2BYTE ? 1 : data_c < MIN_3BYTE ? 2 : 3;
1238 len+= hdr_l + data_l;
1239
1240 const bool small= len < mtr_buf_t::MAX_DATA_SIZE - (1 + 3 + 3 + 5 + 5);
1241 byte *l= log_write<EXTENDED>(block.page.id(), &block.page, len, small);
1242
1243 if (UNIV_LIKELY(small))
1244 {
1245 ut_d(const byte * const end = l + len);
1246 *l++= reuse ? INSERT_REUSE_DYNAMIC : INSERT_HEAP_DYNAMIC;
1247 l= mlog_encode_varint(l, prev_rec);
1248 if (reuse)
1249 l= mlog_encode_varint(l, shift);
1250 l= mlog_encode_varint(l, enc_hdr_l);
1251 l= mlog_encode_varint(l, hdr_c);
1252 l= mlog_encode_varint(l, data_c);
1253 ::memcpy(l, hdr, hdr_l);
1254 l+= hdr_l;
1255 ::memcpy(l, data, data_l);
1256 l+= data_l;
1257 ut_ad(end == l);
1258 m_log.close(l);
1259 }
1260 else
1261 {
1262 m_log.close(l);
1263 l= m_log.open(len - hdr_l - data_l);
1264 ut_d(const byte * const end = l + len - hdr_l - data_l);
1265 *l++= reuse ? INSERT_REUSE_DYNAMIC : INSERT_HEAP_DYNAMIC;
1266 l= mlog_encode_varint(l, prev_rec);
1267 if (reuse)
1268 l= mlog_encode_varint(l, shift);
1269 l= mlog_encode_varint(l, enc_hdr_l);
1270 l= mlog_encode_varint(l, hdr_c);
1271 l= mlog_encode_varint(l, data_c);
1272 ut_ad(end == l);
1273 m_log.close(l);
1274 m_log.push(hdr, static_cast<uint32_t>(hdr_l));
1275 m_log.push(data, static_cast<uint32_t>(data_l));
1276 }
1277
1278 m_last_offset= FIL_PAGE_TYPE;
1279 }
1280
1281 /***********************************************************//**
1282 Inserts a record next to page cursor on an uncompressed page.
1283 Returns pointer to inserted record if succeed, i.e., enough
1284 space available, NULL otherwise. The cursor stays at the same position.
1285 @return pointer to record if succeed, NULL otherwise */
1286 rec_t*
page_cur_insert_rec_low(const page_cur_t * cur,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1287 page_cur_insert_rec_low(
1288 /*====================*/
1289 const page_cur_t*cur, /*!< in: page cursor */
1290 dict_index_t* index, /*!< in: record descriptor */
1291 const rec_t* rec, /*!< in: record to insert after cur */
1292 rec_offs* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1293 mtr_t* mtr) /*!< in/out: mini-transaction */
1294 {
1295 buf_block_t* block= cur->block;
1296
1297 ut_ad(rec_offs_validate(rec, index, offsets));
1298 ut_ad(rec_offs_n_fields(offsets) > 0);
1299 ut_ad(index->table->not_redundant() == !!page_is_comp(block->frame));
1300 ut_ad(!!page_is_comp(block->frame) == !!rec_offs_comp(offsets));
1301 ut_ad(fil_page_index_page_check(block->frame));
1302 ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + block->frame) ==
1303 index->id ||
1304 mtr->is_inside_ibuf());
1305 ut_ad(page_dir_get_n_slots(block->frame) >= 2);
1306
1307 ut_ad(!page_rec_is_supremum(cur->rec));
1308
1309 /* We should not write log for ROW_FORMAT=COMPRESSED pages here. */
1310 ut_ad(mtr->get_log_mode() != MTR_LOG_ALL ||
1311 !(index->table->flags & DICT_TF_MASK_ZIP_SSIZE));
1312
1313 /* 1. Get the size of the physical record in the page */
1314 const ulint rec_size= rec_offs_size(offsets);
1315
1316 #ifdef HAVE_MEM_CHECK
1317 {
1318 const void *rec_start __attribute__((unused))=
1319 rec - rec_offs_extra_size(offsets);
1320 ulint extra_size __attribute__((unused))=
1321 rec_offs_extra_size(offsets) -
1322 (page_is_comp(block->frame)
1323 ? REC_N_NEW_EXTRA_BYTES
1324 : REC_N_OLD_EXTRA_BYTES);
1325 /* All data bytes of the record must be valid. */
1326 MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1327 /* The variable-length header must be valid. */
1328 MEM_CHECK_DEFINED(rec_start, extra_size);
1329 }
1330 #endif /* HAVE_MEM_CHECK */
1331
1332 /* 2. Try to find suitable space from page memory management */
1333 bool reuse= false;
1334 ssize_t free_offset= 0;
1335 ulint heap_no;
1336 byte *insert_buf;
1337
1338 const bool comp= page_is_comp(block->frame);
1339 const ulint extra_size= rec_offs_extra_size(offsets);
1340
1341 if (rec_t* free_rec= page_header_get_ptr(block->frame, PAGE_FREE))
1342 {
1343 /* Try to reuse the head of PAGE_FREE. */
1344 rec_offs foffsets_[REC_OFFS_NORMAL_SIZE];
1345 mem_heap_t *heap= nullptr;
1346
1347 rec_offs_init(foffsets_);
1348
1349 rec_offs *foffsets= rec_get_offsets(free_rec, index, foffsets_,
1350 page_is_leaf(block->frame)
1351 ? index->n_core_fields : 0,
1352 ULINT_UNDEFINED, &heap);
1353 const ulint fextra_size= rec_offs_extra_size(foffsets);
1354 insert_buf= free_rec - fextra_size;
1355 const bool too_small= (fextra_size + rec_offs_data_size(foffsets)) <
1356 rec_size;
1357 if (UNIV_LIKELY_NULL(heap))
1358 mem_heap_free(heap);
1359
1360 if (too_small)
1361 goto use_heap;
1362
1363 byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
1364 block->frame);
1365 if (comp)
1366 {
1367 heap_no= rec_get_heap_no_new(free_rec);
1368 uint16_t next= mach_read_from_2(free_rec - REC_NEXT);
1369 mach_write_to_2(page_free, next
1370 ? static_cast<uint16_t>(free_rec + next - block->frame)
1371 : 0);
1372 }
1373 else
1374 {
1375 heap_no= rec_get_heap_no_old(free_rec);
1376 memcpy(page_free, free_rec - REC_NEXT, 2);
1377 }
1378
1379 static_assert(PAGE_GARBAGE == PAGE_FREE + 2, "compatibility");
1380
1381 byte *page_garbage= my_assume_aligned<2>(page_free + 2);
1382 ut_ad(mach_read_from_2(page_garbage) >= rec_size);
1383 mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) - rec_size);
1384 reuse= true;
1385 free_offset= extra_size - fextra_size;
1386 }
1387 else
1388 {
1389 use_heap:
1390 insert_buf= page_mem_alloc_heap(block, rec_size, &heap_no);
1391
1392 if (UNIV_UNLIKELY(!insert_buf))
1393 return nullptr;
1394 }
1395
1396 ut_ad(cur->rec != insert_buf + extra_size);
1397
1398 rec_t *next_rec= block->frame + rec_get_next_offs(cur->rec, comp);
1399 ut_ad(next_rec != block->frame);
1400
1401 /* Update page header fields */
1402 byte *page_last_insert= my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER +
1403 block->frame);
1404 const uint16_t last_insert= mach_read_from_2(page_last_insert);
1405 ut_ad(!last_insert || !comp ||
1406 rec_get_node_ptr_flag(block->frame + last_insert) ==
1407 rec_get_node_ptr_flag(rec));
1408
1409 /* Write PAGE_LAST_INSERT */
1410 mach_write_to_2(page_last_insert, page_offset(insert_buf + extra_size));
1411
1412 /* Update PAGE_DIRECTION_B, PAGE_N_DIRECTION if needed */
1413 if (block->frame[FIL_PAGE_TYPE + 1] != byte(FIL_PAGE_RTREE))
1414 {
1415 byte *dir= &block->frame[PAGE_DIRECTION_B + PAGE_HEADER];
1416 byte *n= my_assume_aligned<2>
1417 (&block->frame[PAGE_N_DIRECTION + PAGE_HEADER]);
1418 if (UNIV_UNLIKELY(!last_insert))
1419 {
1420 no_direction:
1421 *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_NO_DIRECTION);
1422 memset(n, 0, 2);
1423 }
1424 else if (block->frame + last_insert == cur->rec &&
1425 (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
1426 {
1427 *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_RIGHT);
1428 inc_dir:
1429 mach_write_to_2(n, mach_read_from_2(n) + 1);
1430 }
1431 else if (next_rec == block->frame + last_insert &&
1432 (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
1433 {
1434 *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_LEFT);
1435 goto inc_dir;
1436 }
1437 else
1438 goto no_direction;
1439 }
1440
1441 /* Update PAGE_N_RECS. */
1442 byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
1443 block->frame);
1444
1445 mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1);
1446
1447 /* Update the preceding record header, the 'owner' record and
1448 prepare the record to insert. */
1449 rec_t *insert_rec= insert_buf + extra_size;
1450 const ulint data_size= rec_offs_data_size(offsets);
1451 memcpy(insert_buf, rec - extra_size, extra_size + data_size);
1452 size_t hdr_common= 0;
1453 ulint n_owned;
1454 const byte info_status= static_cast<byte>
1455 (rec_get_info_and_status_bits(rec, comp));
1456 ut_ad(!(rec_get_info_bits(rec, comp) &
1457 ~(REC_INFO_DELETED_FLAG | REC_INFO_MIN_REC_FLAG)));
1458
1459 if (comp)
1460 {
1461 #ifdef UNIV_DEBUG
1462 switch (rec_get_status(cur->rec)) {
1463 case REC_STATUS_ORDINARY:
1464 case REC_STATUS_NODE_PTR:
1465 case REC_STATUS_INSTANT:
1466 case REC_STATUS_INFIMUM:
1467 break;
1468 case REC_STATUS_SUPREMUM:
1469 ut_ad("wrong status on cur->rec" == 0);
1470 }
1471 switch (rec_get_status(rec)) {
1472 case REC_STATUS_NODE_PTR:
1473 ut_ad(!page_is_leaf(block->frame));
1474 break;
1475 case REC_STATUS_INSTANT:
1476 ut_ad(index->is_instant());
1477 ut_ad(page_is_leaf(block->frame));
1478 if (!rec_is_metadata(rec, true))
1479 break;
1480 ut_ad(cur->rec == &block->frame[PAGE_NEW_INFIMUM]);
1481 break;
1482 case REC_STATUS_ORDINARY:
1483 ut_ad(page_is_leaf(block->frame));
1484 ut_ad(!(rec_get_info_bits(rec, true) & ~REC_INFO_DELETED_FLAG));
1485 break;
1486 case REC_STATUS_INFIMUM:
1487 case REC_STATUS_SUPREMUM:
1488 ut_ad("wrong status on rec" == 0);
1489 }
1490 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1491 #endif
1492
1493 rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
1494 REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1495 insert_rec[-REC_NEW_STATUS]= rec[-REC_NEW_STATUS];
1496 rec_set_bit_field_2(insert_rec, heap_no,
1497 REC_NEW_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
1498 mach_write_to_2(insert_rec - REC_NEXT,
1499 static_cast<uint16_t>(next_rec - insert_rec));
1500 mach_write_to_2(cur->rec - REC_NEXT,
1501 static_cast<uint16_t>(insert_rec - cur->rec));
1502 while (!(n_owned= rec_get_n_owned_new(next_rec)))
1503 {
1504 next_rec= block->frame + rec_get_next_offs(next_rec, true);
1505 ut_ad(next_rec != block->frame);
1506 }
1507 rec_set_bit_field_1(next_rec, n_owned + 1, REC_NEW_N_OWNED,
1508 REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1509 if (mtr->get_log_mode() != MTR_LOG_ALL)
1510 {
1511 mtr->set_modified(*block);
1512 goto copied;
1513 }
1514
1515 const byte * const c_start= cur->rec - extra_size;
1516 if (extra_size > REC_N_NEW_EXTRA_BYTES &&
1517 c_start >=
1518 &block->frame[PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES])
1519 {
1520 /* Find common header bytes with the preceding record. */
1521 const byte *r= rec - (REC_N_NEW_EXTRA_BYTES + 1);
1522 for (const byte *c= cur->rec - (REC_N_NEW_EXTRA_BYTES + 1);
1523 *r == *c && c-- != c_start; r--);
1524 hdr_common= static_cast<size_t>((rec - (REC_N_NEW_EXTRA_BYTES + 1)) - r);
1525 ut_ad(hdr_common <= extra_size - REC_N_NEW_EXTRA_BYTES);
1526 }
1527 }
1528 else
1529 {
1530 #ifdef UNIV_DEBUG
1531 if (!page_is_leaf(block->frame));
1532 else if (rec_is_metadata(rec, false))
1533 {
1534 ut_ad(index->is_instant());
1535 ut_ad(cur->rec == &block->frame[PAGE_OLD_INFIMUM]);
1536 }
1537 #endif
1538 rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED,
1539 REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1540 rec_set_bit_field_2(insert_rec, heap_no,
1541 REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
1542 memcpy(insert_rec - REC_NEXT, cur->rec - REC_NEXT, 2);
1543 mach_write_to_2(cur->rec - REC_NEXT, page_offset(insert_rec));
1544 while (!(n_owned= rec_get_n_owned_old(next_rec)))
1545 {
1546 next_rec= block->frame + rec_get_next_offs(next_rec, false);
1547 ut_ad(next_rec != block->frame);
1548 }
1549 rec_set_bit_field_1(next_rec, n_owned + 1, REC_OLD_N_OWNED,
1550 REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1551 if (mtr->get_log_mode() != MTR_LOG_ALL)
1552 {
1553 mtr->set_modified(*block);
1554 goto copied;
1555 }
1556
1557 ut_ad(extra_size > REC_N_OLD_EXTRA_BYTES);
1558 const byte * const c_start= cur->rec - extra_size;
1559 if (c_start >=
1560 &block->frame[PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES])
1561 {
1562 /* Find common header bytes with the preceding record. */
1563 const byte *r= rec - (REC_N_OLD_EXTRA_BYTES + 1);
1564 for (const byte *c= cur->rec - (REC_N_OLD_EXTRA_BYTES + 1);
1565 *r == *c && c-- != c_start; r--);
1566 hdr_common= static_cast<size_t>((rec - (REC_N_OLD_EXTRA_BYTES + 1)) - r);
1567 ut_ad(hdr_common <= extra_size - REC_N_OLD_EXTRA_BYTES);
1568 }
1569 }
1570
1571 /* Insert the record, possibly copying from the preceding record. */
1572 ut_ad(mtr->get_log_mode() == MTR_LOG_ALL);
1573
1574 {
1575 const byte *r= rec;
1576 const byte *c= cur->rec;
1577 const byte *c_end= cur->rec + data_size;
1578 if (c <= insert_buf && c_end > insert_buf)
1579 c_end= insert_buf;
1580 else
1581 c_end= std::min<const byte*>(c_end, block->frame + srv_page_size -
1582 PAGE_DIR - PAGE_DIR_SLOT_SIZE *
1583 page_dir_get_n_slots(block->frame));
1584 size_t data_common;
1585 /* Copy common data bytes of the preceding record. */
1586 for (; c != c_end && *r == *c; c++, r++);
1587 data_common= static_cast<size_t>(r - rec);
1588
1589 if (comp)
1590 mtr->page_insert(*block, reuse,
1591 cur->rec - block->frame - PAGE_NEW_INFIMUM,
1592 info_status, free_offset, hdr_common, data_common,
1593 insert_buf,
1594 extra_size - hdr_common - REC_N_NEW_EXTRA_BYTES,
1595 r, data_size - data_common);
1596 else
1597 mtr->page_insert(*block, reuse,
1598 cur->rec - block->frame - PAGE_OLD_INFIMUM,
1599 info_status, rec_get_n_fields_old(insert_rec) << 1 |
1600 rec_get_1byte_offs_flag(insert_rec),
1601 hdr_common, data_common,
1602 insert_buf,
1603 extra_size - hdr_common - REC_N_OLD_EXTRA_BYTES,
1604 r, data_size - data_common);
1605 }
1606
1607 copied:
1608 ut_ad(!memcmp(insert_buf, rec - extra_size, extra_size -
1609 (comp ? REC_N_NEW_EXTRA_BYTES : REC_N_OLD_EXTRA_BYTES)));
1610 ut_ad(!memcmp(insert_rec, rec, data_size));
1611 /* We have incremented the n_owned field of the owner record.
1612 If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED, we have to split the
1613 corresponding directory slot in two. */
1614
1615 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
1616 {
1617 const auto owner= page_dir_find_owner_slot(next_rec);
1618 page_dir_split_slot(*block, page_dir_get_nth_slot(block->frame, owner));
1619 }
1620
1621 rec_offs_make_valid(insert_buf + extra_size, index,
1622 page_is_leaf(block->frame), offsets);
1623 return insert_buf + extra_size;
1624 }
1625
1626 /** Add a slot to the dense page directory.
1627 @param[in,out] block ROW_FORMAT=COMPRESSED page
1628 @param[in] index the index that the page belongs to
1629 @param[in,out] mtr mini-transaction */
page_zip_dir_add_slot(buf_block_t * block,const dict_index_t * index,mtr_t * mtr)1630 static inline void page_zip_dir_add_slot(buf_block_t *block,
1631 const dict_index_t *index, mtr_t *mtr)
1632 {
1633 page_zip_des_t *page_zip= &block->page.zip;
1634
1635 ut_ad(page_is_comp(page_zip->data));
1636 MEM_CHECK_DEFINED(page_zip->data, page_zip_get_size(page_zip));
1637
1638 /* Read the old n_dense (n_heap has already been incremented). */
1639 ulint n_dense= page_dir_get_n_heap(page_zip->data) - (PAGE_HEAP_NO_USER_LOW +
1640 1U);
1641
1642 byte *dir= page_zip->data + page_zip_get_size(page_zip) -
1643 PAGE_ZIP_DIR_SLOT_SIZE * n_dense;
1644 byte *stored= dir;
1645
1646 if (!page_is_leaf(page_zip->data))
1647 {
1648 ut_ad(!page_zip->n_blobs);
1649 stored-= n_dense * REC_NODE_PTR_SIZE;
1650 }
1651 else if (index->is_clust())
1652 {
1653 /* Move the BLOB pointer array backwards to make space for the
1654 columns DB_TRX_ID,DB_ROLL_PTR and the dense directory slot. */
1655
1656 stored-= n_dense * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1657 byte *externs= stored - page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
1658 byte *dst= externs - PAGE_ZIP_CLUST_LEAF_SLOT_SIZE;
1659 ut_ad(!memcmp(dst, field_ref_zero, PAGE_ZIP_CLUST_LEAF_SLOT_SIZE));
1660 if (const ulint len = ulint(stored - externs))
1661 {
1662 memmove(dst, externs, len);
1663 mtr->memmove(*block, dst - page_zip->data, externs - page_zip->data,
1664 len);
1665 }
1666 }
1667 else
1668 {
1669 stored-= page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
1670 ut_ad(!memcmp(stored - PAGE_ZIP_DIR_SLOT_SIZE, field_ref_zero,
1671 PAGE_ZIP_DIR_SLOT_SIZE));
1672 }
1673
1674 /* Move the uncompressed area backwards to make space
1675 for one directory slot. */
1676 if (const ulint len = ulint(dir - stored))
1677 {
1678 byte* dst = stored - PAGE_ZIP_DIR_SLOT_SIZE;
1679 memmove(dst, stored, len);
1680 mtr->memmove(*block, dst - page_zip->data, stored - page_zip->data, len);
1681 }
1682 }
1683
1684 /***********************************************************//**
1685 Inserts a record next to page cursor on a compressed and uncompressed
1686 page. Returns pointer to inserted record if succeed, i.e.,
1687 enough space available, NULL otherwise.
1688 The cursor stays at the same position.
1689
1690 IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
1691 if this is a compressed leaf page in a secondary index.
1692 This has to be done either within the same mini-transaction,
1693 or by invoking ibuf_reset_free_bits() before mtr_commit().
1694
1695 @return pointer to record if succeed, NULL otherwise */
1696 rec_t*
page_cur_insert_rec_zip(page_cur_t * cursor,dict_index_t * index,const rec_t * rec,rec_offs * offsets,mtr_t * mtr)1697 page_cur_insert_rec_zip(
1698 /*====================*/
1699 page_cur_t* cursor, /*!< in/out: page cursor */
1700 dict_index_t* index, /*!< in: record descriptor */
1701 const rec_t* rec, /*!< in: pointer to a physical record */
1702 rec_offs* offsets,/*!< in/out: rec_get_offsets(rec, index) */
1703 mtr_t* mtr) /*!< in/out: mini-transaction */
1704 {
1705 page_zip_des_t * const page_zip= page_cur_get_page_zip(cursor);
1706 ut_ad(page_zip);
1707 ut_ad(rec_offs_validate(rec, index, offsets));
1708
1709 ut_ad(index->table->not_redundant());
1710 ut_ad(page_is_comp(cursor->block->frame));
1711 ut_ad(rec_offs_comp(offsets));
1712 ut_ad(fil_page_get_type(cursor->block->frame) == FIL_PAGE_INDEX ||
1713 fil_page_get_type(cursor->block->frame) == FIL_PAGE_RTREE);
1714 ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + cursor->block->frame) ==
1715 index->id || mtr->is_inside_ibuf());
1716 ut_ad(!page_get_instant(cursor->block->frame));
1717 ut_ad(!page_cur_is_after_last(cursor));
1718 #ifdef UNIV_ZIP_DEBUG
1719 ut_a(page_zip_validate(page_zip, cursor->block->frame, index));
1720 #endif /* UNIV_ZIP_DEBUG */
1721
1722 /* 1. Get the size of the physical record in the page */
1723 const ulint rec_size= rec_offs_size(offsets);
1724
1725 #ifdef HAVE_MEM_CHECK
1726 {
1727 const void *rec_start __attribute__((unused))=
1728 rec - rec_offs_extra_size(offsets);
1729 ulint extra_size __attribute__((unused))=
1730 rec_offs_extra_size(offsets) - REC_N_NEW_EXTRA_BYTES;
1731 /* All data bytes of the record must be valid. */
1732 MEM_CHECK_DEFINED(rec, rec_offs_data_size(offsets));
1733 /* The variable-length header must be valid. */
1734 MEM_CHECK_DEFINED(rec_start, extra_size);
1735 }
1736 #endif /* HAVE_MEM_CHECK */
1737 const bool reorg_before_insert= page_has_garbage(cursor->block->frame) &&
1738 rec_size > page_get_max_insert_size(cursor->block->frame, 1) &&
1739 rec_size <= page_get_max_insert_size_after_reorganize(cursor->block->frame,
1740 1);
1741 constexpr uint16_t page_free_f= PAGE_FREE + PAGE_HEADER;
1742 byte* const page_free = my_assume_aligned<4>(page_free_f +
1743 cursor->block->frame);
1744 uint16_t free_rec= 0;
1745
1746 /* 2. Try to find suitable space from page memory management */
1747 ulint heap_no;
1748 byte *insert_buf;
1749
1750 if (reorg_before_insert ||
1751 !page_zip_available(page_zip, index->is_clust(), rec_size, 1))
1752 {
1753 /* SET GLOBAL might be executed concurrently. Sample the value once. */
1754 ulint level= page_zip_level;
1755 #ifdef UNIV_DEBUG
1756 const rec_t * const cursor_rec= page_cur_get_rec(cursor);
1757 #endif /* UNIV_DEBUG */
1758
1759 if (page_is_empty(cursor->block->frame))
1760 {
1761 ut_ad(page_cur_is_before_first(cursor));
1762
1763 /* This is an empty page. Recreate to remove the modification log. */
1764 page_create_zip(cursor->block, index,
1765 page_header_get_field(cursor->block->frame, PAGE_LEVEL),
1766 0, mtr);
1767 ut_ad(!page_header_get_ptr(cursor->block->frame, PAGE_FREE));
1768
1769 if (page_zip_available(page_zip, index->is_clust(), rec_size, 1))
1770 goto use_heap;
1771
1772 /* The cursor should remain on the page infimum. */
1773 return nullptr;
1774 }
1775
1776 if (page_zip->m_nonempty || page_has_garbage(cursor->block->frame))
1777 {
1778 ulint pos= page_rec_get_n_recs_before(cursor->rec);
1779
1780 if (!page_zip_reorganize(cursor->block, index, level, mtr, true))
1781 {
1782 ut_ad(cursor->rec == cursor_rec);
1783 return nullptr;
1784 }
1785
1786 if (pos)
1787 cursor->rec= page_rec_get_nth(cursor->block->frame, pos);
1788 else
1789 ut_ad(cursor->rec == page_get_infimum_rec(cursor->block->frame));
1790
1791 ut_ad(!page_header_get_ptr(cursor->block->frame, PAGE_FREE));
1792
1793 if (page_zip_available(page_zip, index->is_clust(), rec_size, 1))
1794 goto use_heap;
1795 }
1796
1797 /* Try compressing the whole page afterwards. */
1798 const mtr_log_t log_mode= mtr->set_log_mode(MTR_LOG_NONE);
1799 rec_t *insert_rec= page_cur_insert_rec_low(cursor, index, rec, offsets,
1800 mtr);
1801 mtr->set_log_mode(log_mode);
1802
1803 if (insert_rec)
1804 {
1805 ulint pos= page_rec_get_n_recs_before(insert_rec);
1806 ut_ad(pos > 0);
1807
1808 /* We are writing entire page images to the log. Reduce the redo
1809 log volume by reorganizing the page at the same time. */
1810 if (page_zip_reorganize(cursor->block, index, level, mtr))
1811 {
1812 /* The page was reorganized: Seek to pos. */
1813 cursor->rec= pos > 1
1814 ? page_rec_get_nth(cursor->block->frame, pos - 1)
1815 : cursor->block->frame + PAGE_NEW_INFIMUM;
1816 insert_rec= cursor->block->frame + rec_get_next_offs(cursor->rec, 1);
1817 rec_offs_make_valid(insert_rec, index,
1818 page_is_leaf(cursor->block->frame), offsets);
1819 return insert_rec;
1820 }
1821
1822 /* Theoretically, we could try one last resort of
1823 page_zip_reorganize() followed by page_zip_available(), but that
1824 would be very unlikely to succeed. (If the full reorganized page
1825 failed to compress, why would it succeed to compress the page,
1826 plus log the insert of this record?) */
1827
1828 /* Out of space: restore the page */
1829 if (!page_zip_decompress(page_zip, cursor->block->frame, false))
1830 ut_error; /* Memory corrupted? */
1831 ut_ad(page_validate(cursor->block->frame, index));
1832 insert_rec= nullptr;
1833 }
1834 return insert_rec;
1835 }
1836
1837 free_rec= mach_read_from_2(page_free);
1838 if (free_rec)
1839 {
1840 /* Try to allocate from the head of the free list. */
1841 rec_offs foffsets_[REC_OFFS_NORMAL_SIZE];
1842 mem_heap_t *heap= nullptr;
1843
1844 rec_offs_init(foffsets_);
1845
1846 rec_offs *foffsets= rec_get_offsets(cursor->block->frame + free_rec, index,
1847 foffsets_,
1848 page_is_leaf(cursor->block->frame)
1849 ? index->n_core_fields : 0,
1850 ULINT_UNDEFINED, &heap);
1851 insert_buf= cursor->block->frame + free_rec -
1852 rec_offs_extra_size(foffsets);
1853
1854 if (rec_offs_size(foffsets) < rec_size)
1855 {
1856 too_small:
1857 if (UNIV_LIKELY_NULL(heap))
1858 mem_heap_free(heap);
1859 free_rec= 0;
1860 goto use_heap;
1861 }
1862
1863 /* On compressed pages, do not relocate records from
1864 the free list. If extra_size would grow, use the heap. */
1865 const ssize_t extra_size_diff= lint(rec_offs_extra_size(offsets) -
1866 rec_offs_extra_size(foffsets));
1867
1868 if (UNIV_UNLIKELY(extra_size_diff < 0))
1869 {
1870 /* Add an offset to the extra_size. */
1871 if (rec_offs_size(foffsets) < rec_size - ssize_t(extra_size_diff))
1872 goto too_small;
1873
1874 insert_buf-= extra_size_diff;
1875 }
1876 else if (UNIV_UNLIKELY(extra_size_diff))
1877 /* Do not allow extra_size to grow */
1878 goto too_small;
1879
1880 byte *const free_rec_ptr= cursor->block->frame + free_rec;
1881 heap_no= rec_get_heap_no_new(free_rec_ptr);
1882 int16_t next_rec= mach_read_from_2(free_rec_ptr - REC_NEXT);
1883 /* With innodb_page_size=64k, int16_t would be unsafe to use here,
1884 but that cannot be used with ROW_FORMAT=COMPRESSED. */
1885 static_assert(UNIV_ZIP_SIZE_SHIFT_MAX == 14, "compatibility");
1886 if (next_rec)
1887 {
1888 next_rec= static_cast<int16_t>(next_rec + free_rec);
1889 ut_ad(int{PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES} <= next_rec);
1890 ut_ad(static_cast<uint16_t>(next_rec) < srv_page_size);
1891 }
1892
1893 byte *hdr= my_assume_aligned<4>(&page_zip->data[page_free_f]);
1894 mach_write_to_2(hdr, static_cast<uint16_t>(next_rec));
1895 const byte *const garbage= my_assume_aligned<2>(page_free + 2);
1896 ut_ad(mach_read_from_2(garbage) >= rec_size);
1897 mach_write_to_2(my_assume_aligned<2>(hdr + 2),
1898 mach_read_from_2(garbage) - rec_size);
1899 static_assert(PAGE_GARBAGE == PAGE_FREE + 2, "compatibility");
1900 mtr->memcpy(*cursor->block, page_free, hdr, 4);
1901
1902 if (!page_is_leaf(cursor->block->frame))
1903 {
1904 /* Zero out the node pointer of free_rec, in case it will not be
1905 overwritten by insert_rec. */
1906 ut_ad(rec_size > REC_NODE_PTR_SIZE);
1907
1908 if (rec_offs_size(foffsets) > rec_size)
1909 memset(rec_get_end(free_rec_ptr, foffsets) -
1910 REC_NODE_PTR_SIZE, 0, REC_NODE_PTR_SIZE);
1911 }
1912 else if (index->is_clust())
1913 {
1914 /* Zero out DB_TRX_ID,DB_ROLL_PTR in free_rec, in case they will
1915 not be overwritten by insert_rec. */
1916
1917 ulint len;
1918 ulint trx_id_offs= rec_get_nth_field_offs(foffsets, index->db_trx_id(),
1919 &len);
1920 ut_ad(len == DATA_TRX_ID_LEN);
1921
1922 if (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN + trx_id_offs +
1923 rec_offs_extra_size(foffsets) > rec_size)
1924 memset(free_rec_ptr + trx_id_offs, 0,
1925 DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
1926
1927 ut_ad(free_rec_ptr + trx_id_offs + DATA_TRX_ID_LEN ==
1928 rec_get_nth_field(free_rec_ptr, foffsets, index->db_roll_ptr(),
1929 &len));
1930 ut_ad(len == DATA_ROLL_PTR_LEN);
1931 }
1932
1933 if (UNIV_LIKELY_NULL(heap))
1934 mem_heap_free(heap);
1935 }
1936 else
1937 {
1938 use_heap:
1939 ut_ad(!free_rec);
1940 insert_buf= page_mem_alloc_heap<true>(cursor->block, rec_size, &heap_no);
1941
1942 if (UNIV_UNLIKELY(!insert_buf))
1943 return insert_buf;
1944
1945 static_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2, "compatibility");
1946 mtr->memcpy(*cursor->block, PAGE_HEAP_TOP + PAGE_HEADER, 4);
1947 page_zip_dir_add_slot(cursor->block, index, mtr);
1948 }
1949
1950 /* 3. Create the record */
1951 byte *insert_rec= rec_copy(insert_buf, rec, offsets);
1952 rec_offs_make_valid(insert_rec, index, page_is_leaf(cursor->block->frame),
1953 offsets);
1954
1955 /* 4. Insert the record in the linked list of records */
1956 ut_ad(cursor->rec != insert_rec);
1957
1958 /* next record after current before the insertion */
1959 const rec_t* next_rec = page_rec_get_next_low(cursor->rec, TRUE);
1960 ut_ad(rec_get_status(cursor->rec) <= REC_STATUS_INFIMUM);
1961 ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
1962 ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
1963
1964 mach_write_to_2(insert_rec - REC_NEXT, static_cast<uint16_t>
1965 (next_rec - insert_rec));
1966 mach_write_to_2(cursor->rec - REC_NEXT, static_cast<uint16_t>
1967 (insert_rec - cursor->rec));
1968 byte *n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
1969 cursor->block->frame);
1970 mtr->write<2>(*cursor->block, n_recs, 1U + mach_read_from_2(n_recs));
1971 memcpy_aligned<2>(&page_zip->data[PAGE_N_RECS + PAGE_HEADER], n_recs, 2);
1972
1973 /* 5. Set the n_owned field in the inserted record to zero,
1974 and set the heap_no field */
1975 rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
1976 REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
1977 rec_set_bit_field_2(insert_rec, heap_no, REC_NEW_HEAP_NO,
1978 REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
1979
1980 MEM_CHECK_DEFINED(rec_get_start(insert_rec, offsets),
1981 rec_offs_size(offsets));
1982
1983 /* 6. Update the last insertion info in page header */
1984 byte *last_insert= my_assume_aligned<4>(PAGE_LAST_INSERT + PAGE_HEADER +
1985 page_zip->data);
1986 const uint16_t last_insert_rec= mach_read_from_2(last_insert);
1987 ut_ad(!last_insert_rec ||
1988 rec_get_node_ptr_flag(cursor->block->frame + last_insert_rec) ==
1989 rec_get_node_ptr_flag(insert_rec));
1990 mach_write_to_2(last_insert, page_offset(insert_rec));
1991
1992 if (!index->is_spatial())
1993 {
1994 byte *dir= &page_zip->data[PAGE_HEADER + PAGE_DIRECTION_B];
1995 ut_ad(!(*dir & ~((1U << 3) - 1)));
1996 byte *n= my_assume_aligned<2>
1997 (&page_zip->data[PAGE_HEADER + PAGE_N_DIRECTION]);
1998 if (UNIV_UNLIKELY(!last_insert_rec))
1999 {
2000 no_direction:
2001 *dir= PAGE_NO_DIRECTION;
2002 memset(n, 0, 2);
2003 }
2004 else if (*dir != PAGE_LEFT &&
2005 cursor->block->frame + last_insert_rec == cursor->rec)
2006 {
2007 *dir= PAGE_RIGHT;
2008 inc_dir:
2009 mach_write_to_2(n, mach_read_from_2(n) + 1);
2010 }
2011 else if (*dir != PAGE_RIGHT && page_rec_get_next(insert_rec) ==
2012 cursor->block->frame + last_insert_rec)
2013 {
2014 *dir= PAGE_LEFT;
2015 goto inc_dir;
2016 }
2017 else
2018 goto no_direction;
2019 }
2020
2021 /* Write the header fields in one record. */
2022 mtr->memcpy(*cursor->block,
2023 my_assume_aligned<8>(PAGE_LAST_INSERT + PAGE_HEADER +
2024 cursor->block->frame),
2025 my_assume_aligned<8>(PAGE_LAST_INSERT + PAGE_HEADER +
2026 page_zip->data),
2027 PAGE_N_RECS - PAGE_LAST_INSERT + 2);
2028
2029 /* 7. It remains to update the owner record. */
2030 ulint n_owned;
2031
2032 while (!(n_owned = rec_get_n_owned_new(next_rec)))
2033 next_rec= page_rec_get_next_low(next_rec, true);
2034
2035 rec_set_bit_field_1(const_cast<rec_t*>(next_rec), n_owned + 1,
2036 REC_NEW_N_OWNED, REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
2037
2038 page_zip_dir_insert(cursor, free_rec, insert_rec, mtr);
2039
2040 /* 8. Now we have incremented the n_owned field of the owner
2041 record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
2042 we have to split the corresponding directory slot in two. */
2043 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
2044 page_zip_dir_split_slot(cursor->block,
2045 page_dir_find_owner_slot(next_rec), mtr);
2046
2047 page_zip_write_rec(cursor->block, insert_rec, index, offsets, 1, mtr);
2048 return insert_rec;
2049 }
2050
2051 /** Prepend a record to the PAGE_FREE list, or shrink PAGE_HEAP_TOP.
2052 @param[in,out] block index page
2053 @param[in,out] rec record being deleted
2054 @param[in] data_size record payload size, in bytes
2055 @param[in] extra_size record header size, in bytes */
page_mem_free(const buf_block_t & block,rec_t * rec,size_t data_size,size_t extra_size)2056 static void page_mem_free(const buf_block_t &block, rec_t *rec,
2057 size_t data_size, size_t extra_size)
2058 {
2059 ut_ad(page_align(rec) == block.frame);
2060 ut_ad(!block.page.zip.data);
2061 const rec_t *free= page_header_get_ptr(block.frame, PAGE_FREE);
2062
2063 const uint16_t n_heap= uint16_t(page_header_get_field(block.frame,
2064 PAGE_N_HEAP) - 1);
2065 ut_ad(page_get_n_recs(block.frame) < (n_heap & 0x7fff));
2066 const bool deleting_top= n_heap == ((n_heap & 0x8000)
2067 ? (rec_get_heap_no_new(rec) | 0x8000)
2068 : rec_get_heap_no_old(rec));
2069
2070 if (deleting_top)
2071 {
2072 byte *page_heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER +
2073 block.frame);
2074 const uint16_t heap_top= mach_read_from_2(page_heap_top);
2075 const size_t extra_savings= heap_top - page_offset(rec + data_size);
2076 ut_ad(extra_savings < heap_top);
2077
2078 /* When deleting the last record, do not add it to the PAGE_FREE list.
2079 Instead, decrement PAGE_HEAP_TOP and PAGE_N_HEAP. */
2080 mach_write_to_2(page_heap_top, page_offset(rec - extra_size));
2081 mach_write_to_2(my_assume_aligned<2>(page_heap_top + 2), n_heap);
2082 static_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2, "compatibility");
2083 if (extra_savings)
2084 {
2085 byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
2086 block.frame);
2087 uint16_t garbage= mach_read_from_2(page_garbage);
2088 ut_ad(garbage >= extra_savings);
2089 mach_write_to_2(page_garbage, garbage - extra_savings);
2090 }
2091 }
2092 else
2093 {
2094 byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
2095 block.frame);
2096 byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
2097 block.frame);
2098 mach_write_to_2(page_free, page_offset(rec));
2099 mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) +
2100 extra_size + data_size);
2101 }
2102
2103 memset_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER + block.frame, 0, 2);
2104 byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
2105 block.frame);
2106 mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) - 1);
2107
2108 const byte* const end= rec + data_size;
2109
2110 if (!deleting_top)
2111 {
2112 uint16_t next= free
2113 ? ((n_heap & 0x8000)
2114 ? static_cast<uint16_t>(free - rec)
2115 : static_cast<uint16_t>(free - block.frame))
2116 : uint16_t{0};
2117 mach_write_to_2(rec - REC_NEXT, next);
2118 }
2119 else
2120 rec-= extra_size;
2121
2122 memset(rec, 0, end - rec);
2123 }
2124
2125 /***********************************************************//**
2126 Deletes a record at the page cursor. The cursor is moved to the next
2127 record after the deleted one. */
2128 void
page_cur_delete_rec(page_cur_t * cursor,const dict_index_t * index,const rec_offs * offsets,mtr_t * mtr)2129 page_cur_delete_rec(
2130 /*================*/
2131 page_cur_t* cursor, /*!< in/out: a page cursor */
2132 const dict_index_t* index, /*!< in: record descriptor */
2133 const rec_offs* offsets,/*!< in: rec_get_offsets(
2134 cursor->rec, index) */
2135 mtr_t* mtr) /*!< in/out: mini-transaction */
2136 {
2137 page_dir_slot_t* cur_dir_slot;
2138 rec_t* current_rec;
2139 rec_t* prev_rec = NULL;
2140 rec_t* next_rec;
2141 ulint cur_slot_no;
2142 ulint cur_n_owned;
2143 rec_t* rec;
2144
2145 /* page_zip_validate() will fail here when
2146 btr_cur_pessimistic_delete() invokes btr_set_min_rec_mark().
2147 Then, both "page_zip" and "block->frame" would have the min-rec-mark
2148 set on the smallest user record, but "block->frame" would additionally
2149 have it set on the smallest-but-one record. Because sloppy
2150 page_zip_validate_low() only ignores min-rec-flag differences
2151 in the smallest user record, it cannot be used here either. */
2152
2153 current_rec = cursor->rec;
2154 buf_block_t* const block = cursor->block;
2155 ut_ad(rec_offs_validate(current_rec, index, offsets));
2156 ut_ad(!!page_is_comp(block->frame) == index->table->not_redundant());
2157 ut_ad(fil_page_index_page_check(block->frame));
2158 ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + block->frame)
2159 == index->id
2160 || mtr->is_inside_ibuf());
2161 ut_ad(mtr->is_named_space(index->table->space));
2162
2163 /* The record must not be the supremum or infimum record. */
2164 ut_ad(page_rec_is_user_rec(current_rec));
2165
2166 if (page_get_n_recs(block->frame) == 1
2167 && !rec_is_alter_metadata(current_rec, *index)) {
2168 /* Empty the page. */
2169 ut_ad(page_is_leaf(block->frame));
2170 /* Usually, this should be the root page,
2171 and the whole index tree should become empty.
2172 However, this could also be a call in
2173 btr_cur_pessimistic_update() to delete the only
2174 record in the page and to insert another one. */
2175 page_cur_move_to_next(cursor);
2176 ut_ad(page_cur_is_after_last(cursor));
2177 page_create_empty(page_cur_get_block(cursor),
2178 const_cast<dict_index_t*>(index), mtr);
2179 return;
2180 }
2181
2182 /* Save to local variables some data associated with current_rec */
2183 cur_slot_no = page_dir_find_owner_slot(current_rec);
2184 ut_ad(cur_slot_no > 0);
2185 cur_dir_slot = page_dir_get_nth_slot(block->frame, cur_slot_no);
2186 cur_n_owned = page_dir_slot_get_n_owned(cur_dir_slot);
2187
2188 /* The page gets invalid for btr_pcur_restore_pos().
2189 We avoid invoking buf_block_modify_clock_inc(block) because its
2190 consistency checks would fail for the dummy block that is being
2191 used during IMPORT TABLESPACE. */
2192 block->modify_clock++;
2193
2194 /* Find the next and the previous record. Note that the cursor is
2195 left at the next record. */
2196
2197 rec = const_cast<rec_t*>
2198 (page_dir_slot_get_rec(cur_dir_slot + PAGE_DIR_SLOT_SIZE));
2199
2200 /* rec now points to the record of the previous directory slot. Look
2201 for the immediate predecessor of current_rec in a loop. */
2202
2203 while (current_rec != rec) {
2204 prev_rec = rec;
2205 rec = page_rec_get_next(rec);
2206 }
2207
2208 page_cur_move_to_next(cursor);
2209 next_rec = cursor->rec;
2210
2211 /* Remove the record from the linked list of records */
2212 /* If the deleted record is pointed to by a dir slot, update the
2213 record pointer in slot. In the following if-clause we assume that
2214 prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
2215 >= 2. */
2216 /* Update the number of owned records of the slot */
2217
2218 compile_time_assert(PAGE_DIR_SLOT_MIN_N_OWNED >= 2);
2219 ut_ad(cur_n_owned > 1);
2220
2221 rec_t* slot_rec = const_cast<rec_t*>
2222 (page_dir_slot_get_rec(cur_dir_slot));
2223
2224 if (UNIV_LIKELY_NULL(block->page.zip.data)) {
2225 ut_ad(page_is_comp(block->frame));
2226 if (current_rec == slot_rec) {
2227 page_zip_rec_set_owned(block, prev_rec, 1, mtr);
2228 page_zip_rec_set_owned(block, slot_rec, 0, mtr);
2229 slot_rec = prev_rec;
2230 mach_write_to_2(cur_dir_slot, page_offset(slot_rec));
2231 } else if (cur_n_owned == 1
2232 && !page_rec_is_supremum(slot_rec)) {
2233 page_zip_rec_set_owned(block, slot_rec, 0, mtr);
2234 }
2235
2236 mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
2237 (next_rec - prev_rec));
2238 slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>(
2239 (slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK)
2240 | (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
2241
2242 page_header_reset_last_insert(block, mtr);
2243 page_zip_dir_delete(block, rec, index, offsets,
2244 page_header_get_ptr(block->frame,
2245 PAGE_FREE),
2246 mtr);
2247 if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2248 page_zip_dir_balance_slot(block, cur_slot_no, mtr);
2249 }
2250 return;
2251 }
2252
2253 if (current_rec == slot_rec) {
2254 slot_rec = prev_rec;
2255 mach_write_to_2(cur_dir_slot, page_offset(slot_rec));
2256 }
2257
2258 const size_t data_size = rec_offs_data_size(offsets);
2259 const size_t extra_size = rec_offs_extra_size(offsets);
2260
2261 if (page_is_comp(block->frame)) {
2262 mtr->page_delete(*block, page_offset(prev_rec)
2263 - PAGE_NEW_INFIMUM,
2264 extra_size - REC_N_NEW_EXTRA_BYTES,
2265 data_size);
2266 mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
2267 (next_rec - prev_rec));
2268 slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>(
2269 (slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK)
2270 | (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
2271 } else {
2272 mtr->page_delete(*block, page_offset(prev_rec)
2273 - PAGE_OLD_INFIMUM);
2274 memcpy(prev_rec - REC_NEXT, current_rec - REC_NEXT, 2);
2275 slot_rec[-REC_OLD_N_OWNED] = static_cast<byte>(
2276 (slot_rec[-REC_OLD_N_OWNED] & ~REC_N_OWNED_MASK)
2277 | (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
2278 }
2279
2280 page_mem_free(*block, current_rec, data_size, extra_size);
2281
2282 /* Now we have decremented the number of owned records of the slot.
2283 If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
2284 slots. */
2285
2286 if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
2287 page_dir_balance_slot(*block, cur_slot_no);
2288 }
2289
2290 ut_ad(page_is_comp(block->frame)
2291 ? page_simple_validate_new(block->frame)
2292 : page_simple_validate_old(block->frame));
2293 }
2294
2295 /** Apply a INSERT_HEAP_REDUNDANT or INSERT_REUSE_REDUNDANT record that was
2296 written by page_cur_insert_rec_low() for a ROW_FORMAT=REDUNDANT page.
2297 @param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
2298 @param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
2299 @param prev byte offset of the predecessor, relative to PAGE_OLD_INFIMUM
2300 @param enc_hdr encoded fixed-size header bits
2301 @param hdr_c number of common record header bytes with prev
2302 @param data_c number of common data bytes with prev
2303 @param data literal header and data bytes
2304 @param data_len length of the literal data, in bytes
2305 @return whether the operation failed (inconcistency was noticed) */
page_apply_insert_redundant(const buf_block_t & block,bool reuse,ulint prev,ulint enc_hdr,size_t hdr_c,size_t data_c,const void * data,size_t data_len)2306 bool page_apply_insert_redundant(const buf_block_t &block, bool reuse,
2307 ulint prev, ulint enc_hdr,
2308 size_t hdr_c, size_t data_c,
2309 const void *data, size_t data_len)
2310 {
2311 const uint16_t n_slots= page_dir_get_n_slots(block.frame);
2312 byte *page_n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER +
2313 block.frame);
2314 const uint16_t h= mach_read_from_2(page_n_heap);
2315 const page_id_t id(block.page.id());
2316 if (UNIV_UNLIKELY(n_slots < 2 || h < n_slots || h < PAGE_HEAP_NO_USER_LOW ||
2317 h >= srv_page_size / REC_N_OLD_EXTRA_BYTES ||
2318 !fil_page_index_page_check(block.frame) ||
2319 page_get_page_no(block.frame) != id.page_no() ||
2320 mach_read_from_2(my_assume_aligned<2>
2321 (PAGE_OLD_SUPREMUM - REC_NEXT +
2322 block.frame))))
2323 {
2324 corrupted:
2325 ib::error() << (reuse
2326 ? "Not applying INSERT_REUSE_REDUNDANT"
2327 " due to corruption on "
2328 : "Not applying INSERT_HEAP_REDUNDANT"
2329 " due to corruption on ")
2330 << id;
2331 return true;
2332 }
2333
2334 byte * const last_slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
2335 byte * const page_heap_top= my_assume_aligned<2>
2336 (PAGE_HEAP_TOP + PAGE_HEADER + block.frame);
2337 const byte *const heap_bot= &block.frame[PAGE_OLD_SUPREMUM_END];
2338 byte *heap_top= block.frame + mach_read_from_2(page_heap_top);
2339 if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > last_slot))
2340 goto corrupted;
2341 if (UNIV_UNLIKELY(mach_read_from_2(last_slot) != PAGE_OLD_SUPREMUM))
2342 goto corrupted;
2343 if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) !=
2344 PAGE_OLD_INFIMUM))
2345 goto corrupted;
2346 rec_t * const prev_rec= block.frame + PAGE_OLD_INFIMUM + prev;
2347 if (!prev);
2348 else if (UNIV_UNLIKELY(heap_bot + (REC_N_OLD_EXTRA_BYTES + 1) > prev_rec ||
2349 prev_rec > heap_top))
2350 goto corrupted;
2351 const ulint pn_fields= rec_get_bit_field_2(prev_rec, REC_OLD_N_FIELDS,
2352 REC_OLD_N_FIELDS_MASK,
2353 REC_OLD_N_FIELDS_SHIFT);
2354 if (UNIV_UNLIKELY(pn_fields == 0 || pn_fields > REC_MAX_N_FIELDS))
2355 goto corrupted;
2356 const ulint pextra_size= REC_N_OLD_EXTRA_BYTES +
2357 (rec_get_1byte_offs_flag(prev_rec) ? pn_fields : pn_fields * 2);
2358 if (prev_rec == &block.frame[PAGE_OLD_INFIMUM]);
2359 else if (UNIV_UNLIKELY(prev_rec - pextra_size < heap_bot))
2360 goto corrupted;
2361 if (UNIV_UNLIKELY(hdr_c && prev_rec - hdr_c < heap_bot))
2362 goto corrupted;
2363 const ulint pdata_size= rec_get_data_size_old(prev_rec);
2364 if (UNIV_UNLIKELY(prev_rec + pdata_size > heap_top))
2365 goto corrupted;
2366 rec_t * const next_rec= block.frame + mach_read_from_2(prev_rec - REC_NEXT);
2367 if (next_rec == block.frame + PAGE_OLD_SUPREMUM);
2368 else if (UNIV_UNLIKELY(heap_bot + REC_N_OLD_EXTRA_BYTES > next_rec ||
2369 next_rec > heap_top))
2370 goto corrupted;
2371 const bool is_short= (enc_hdr >> 2) & 1;
2372 const ulint n_fields= (enc_hdr >> 3) + 1;
2373 if (UNIV_UNLIKELY(n_fields > REC_MAX_N_FIELDS))
2374 goto corrupted;
2375 const ulint extra_size= REC_N_OLD_EXTRA_BYTES +
2376 (is_short ? n_fields : n_fields * 2);
2377 hdr_c+= REC_N_OLD_EXTRA_BYTES;
2378 if (UNIV_UNLIKELY(hdr_c > extra_size))
2379 goto corrupted;
2380 if (UNIV_UNLIKELY(extra_size - hdr_c > data_len))
2381 goto corrupted;
2382 /* We buffer all changes to the record header locally, so that
2383 we will avoid modifying the page before all consistency checks
2384 have been fulfilled. */
2385 alignas(2) byte insert_buf[REC_N_OLD_EXTRA_BYTES + REC_MAX_N_FIELDS * 2];
2386
2387 ulint n_owned;
2388 rec_t *owner_rec= next_rec;
2389 for (ulint ns= PAGE_DIR_SLOT_MAX_N_OWNED;
2390 !(n_owned= rec_get_n_owned_old(owner_rec)); )
2391 {
2392 owner_rec= block.frame + mach_read_from_2(owner_rec - REC_NEXT);
2393 if (owner_rec == &block.frame[PAGE_OLD_SUPREMUM]);
2394 else if (UNIV_UNLIKELY(heap_bot + REC_N_OLD_EXTRA_BYTES > owner_rec ||
2395 owner_rec > heap_top))
2396 goto corrupted;
2397 if (!ns--)
2398 goto corrupted; /* Corrupted (cyclic?) next-record list */
2399 }
2400
2401 page_dir_slot_t *owner_slot= last_slot;
2402
2403 if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED)
2404 goto corrupted;
2405 else
2406 {
2407 mach_write_to_2(insert_buf, owner_rec - block.frame);
2408 static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
2409 const page_dir_slot_t * const first_slot=
2410 page_dir_get_nth_slot(block.frame, 0);
2411
2412 while (memcmp_aligned<2>(owner_slot, insert_buf, 2))
2413 if ((owner_slot+= 2) == first_slot)
2414 goto corrupted;
2415 }
2416
2417 memcpy(insert_buf, data, extra_size - hdr_c);
2418 byte *insert_rec= &insert_buf[extra_size];
2419 memcpy(insert_rec - hdr_c, prev_rec - hdr_c, hdr_c);
2420 rec_set_bit_field_1(insert_rec, (enc_hdr & 3) << 4,
2421 REC_OLD_INFO_BITS, REC_INFO_BITS_MASK,
2422 REC_INFO_BITS_SHIFT);
2423 rec_set_1byte_offs_flag(insert_rec, is_short);
2424 rec_set_n_fields_old(insert_rec, n_fields);
2425 rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED,
2426 REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
2427
2428 const ulint data_size= rec_get_data_size_old(insert_rec);
2429 if (UNIV_UNLIKELY(data_c > data_size))
2430 goto corrupted;
2431 if (UNIV_UNLIKELY(extra_size - hdr_c + data_size - data_c != data_len))
2432 goto corrupted;
2433
2434 /* Perform final consistency checks and then apply the change to the page. */
2435 byte *buf;
2436 if (reuse)
2437 {
2438 byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
2439 block.frame);
2440 rec_t *free_rec= block.frame + mach_read_from_2(page_free);
2441 if (UNIV_UNLIKELY(heap_bot + REC_N_OLD_EXTRA_BYTES > free_rec ||
2442 free_rec > heap_top))
2443 goto corrupted;
2444 const ulint fn_fields= rec_get_n_fields_old(free_rec);
2445 const ulint fextra_size= REC_N_OLD_EXTRA_BYTES +
2446 (rec_get_1byte_offs_flag(free_rec) ? fn_fields : fn_fields * 2);
2447 if (UNIV_UNLIKELY(free_rec - fextra_size < heap_bot))
2448 goto corrupted;
2449 const ulint fdata_size= rec_get_data_size_old(free_rec);
2450 if (UNIV_UNLIKELY(free_rec + fdata_size > heap_top))
2451 goto corrupted;
2452 if (UNIV_UNLIKELY(extra_size + data_size > fextra_size + fdata_size))
2453 goto corrupted;
2454 byte *page_garbage= my_assume_aligned<2>(page_free + 2);
2455 if (UNIV_UNLIKELY(mach_read_from_2(page_garbage) <
2456 fextra_size + fdata_size))
2457 goto corrupted;
2458 buf= free_rec - fextra_size;
2459 const rec_t *const next_free= block.frame +
2460 mach_read_from_2(free_rec - REC_NEXT);
2461 if (next_free == block.frame);
2462 else if (UNIV_UNLIKELY(next_free < &heap_bot[REC_N_OLD_EXTRA_BYTES + 1] ||
2463 heap_top < next_free))
2464 goto corrupted;
2465 mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) -
2466 extra_size - data_size);
2467 rec_set_bit_field_2(insert_rec, rec_get_heap_no_old(free_rec),
2468 REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
2469 memcpy(page_free, free_rec - REC_NEXT, 2);
2470 }
2471 else
2472 {
2473 if (UNIV_UNLIKELY(heap_top + extra_size + data_size > last_slot))
2474 goto corrupted;
2475 rec_set_bit_field_2(insert_rec, h,
2476 REC_OLD_HEAP_NO, REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
2477 mach_write_to_2(page_n_heap, h + 1);
2478 mach_write_to_2(page_heap_top,
2479 mach_read_from_2(page_heap_top) + extra_size + data_size);
2480 buf= heap_top;
2481 }
2482
2483 ut_ad(data_size - data_c == data_len - (extra_size - hdr_c));
2484 byte *page_last_insert= my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER +
2485 block.frame);
2486 const uint16_t last_insert= mach_read_from_2(page_last_insert);
2487 memcpy(buf, insert_buf, extra_size);
2488 buf+= extra_size;
2489 mach_write_to_2(page_last_insert, buf - block.frame);
2490 memcpy(prev_rec - REC_NEXT, page_last_insert, 2);
2491 memcpy(buf, prev_rec, data_c);
2492 memcpy(buf + data_c, static_cast<const byte*>(data) + (extra_size - hdr_c),
2493 data_len - (extra_size - hdr_c));
2494 rec_set_bit_field_1(owner_rec, n_owned + 1, REC_OLD_N_OWNED,
2495 REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
2496
2497 /* Update PAGE_DIRECTION_B, PAGE_N_DIRECTION if needed */
2498 if (block.frame[FIL_PAGE_TYPE + 1] != byte(FIL_PAGE_RTREE))
2499 {
2500 byte *dir= &block.frame[PAGE_DIRECTION_B + PAGE_HEADER];
2501 byte *n_dir= my_assume_aligned<2>
2502 (&block.frame[PAGE_N_DIRECTION + PAGE_HEADER]);
2503 if (UNIV_UNLIKELY(!last_insert))
2504 {
2505 no_direction:
2506 *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_NO_DIRECTION);
2507 memset(n_dir, 0, 2);
2508 }
2509 else if (block.frame + last_insert == prev_rec &&
2510 (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
2511 {
2512 *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_RIGHT);
2513 inc_dir:
2514 mach_write_to_2(n_dir, mach_read_from_2(n_dir) + 1);
2515 }
2516 else if (next_rec == block.frame + last_insert &&
2517 (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
2518 {
2519 *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_LEFT);
2520 goto inc_dir;
2521 }
2522 else
2523 goto no_direction;
2524 }
2525
2526 /* Update PAGE_N_RECS. */
2527 byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
2528 block.frame);
2529
2530 mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1);
2531
2532 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
2533 page_dir_split_slot(block, owner_slot);
2534 ut_ad(page_simple_validate_old(block.frame));
2535 return false;
2536 }
2537
2538 /** Apply a INSERT_HEAP_DYNAMIC or INSERT_REUSE_DYNAMIC record that was
2539 written by page_cur_insert_rec_low() for a ROW_FORMAT=COMPACT or DYNAMIC page.
2540 @param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
2541 @param reuse false=allocate from PAGE_HEAP_TOP; true=reuse PAGE_FREE
2542 @param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
2543 @param shift unless !reuse: number of bytes the PAGE_FREE is moving
2544 @param enc_hdr_l number of copied record header bytes, plus record type bits
2545 @param hdr_c number of common record header bytes with prev
2546 @param data_c number of common data bytes with prev
2547 @param data literal header and data bytes
2548 @param data_len length of the literal data, in bytes
2549 @return whether the operation failed (inconcistency was noticed) */
page_apply_insert_dynamic(const buf_block_t & block,bool reuse,ulint prev,ulint shift,ulint enc_hdr_l,size_t hdr_c,size_t data_c,const void * data,size_t data_len)2550 bool page_apply_insert_dynamic(const buf_block_t &block, bool reuse,
2551 ulint prev, ulint shift, ulint enc_hdr_l,
2552 size_t hdr_c, size_t data_c,
2553 const void *data, size_t data_len)
2554 {
2555 const uint16_t n_slots= page_dir_get_n_slots(block.frame);
2556 byte *page_n_heap= my_assume_aligned<2>(PAGE_N_HEAP + PAGE_HEADER +
2557 block.frame);
2558 ulint h= mach_read_from_2(page_n_heap);
2559 const page_id_t id(block.page.id());
2560 if (UNIV_UNLIKELY(n_slots < 2 || h < (PAGE_HEAP_NO_USER_LOW | 0x8000) ||
2561 (h & 0x7fff) >= srv_page_size / REC_N_NEW_EXTRA_BYTES ||
2562 (h & 0x7fff) < n_slots ||
2563 !fil_page_index_page_check(block.frame) ||
2564 page_get_page_no(block.frame) != id.page_no() ||
2565 mach_read_from_2(my_assume_aligned<2>
2566 (PAGE_NEW_SUPREMUM - REC_NEXT +
2567 block.frame)) ||
2568 ((enc_hdr_l & REC_STATUS_INSTANT) &&
2569 !page_is_leaf(block.frame)) ||
2570 (enc_hdr_l >> 3) > data_len))
2571 {
2572 corrupted:
2573 ib::error() << (reuse
2574 ? "Not applying INSERT_REUSE_DYNAMIC"
2575 " due to corruption on "
2576 : "Not applying INSERT_HEAP_DYNAMIC"
2577 " due to corruption on ")
2578 << id;
2579 return true;
2580 }
2581
2582 byte * const last_slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
2583 byte * const page_heap_top= my_assume_aligned<2>
2584 (PAGE_HEAP_TOP + PAGE_HEADER + block.frame);
2585 const byte *const heap_bot= &block.frame[PAGE_NEW_SUPREMUM_END];
2586 byte *heap_top= block.frame + mach_read_from_2(page_heap_top);
2587 if (UNIV_UNLIKELY(heap_bot > heap_top || heap_top > last_slot))
2588 goto corrupted;
2589 if (UNIV_UNLIKELY(mach_read_from_2(last_slot) != PAGE_NEW_SUPREMUM))
2590 goto corrupted;
2591 if (UNIV_UNLIKELY(mach_read_from_2(page_dir_get_nth_slot(block.frame, 0)) !=
2592 PAGE_NEW_INFIMUM))
2593 goto corrupted;
2594
2595 uint16_t n= static_cast<uint16_t>(PAGE_NEW_INFIMUM + prev);
2596 rec_t *prev_rec= block.frame + n;
2597 n= static_cast<uint16_t>(n + mach_read_from_2(prev_rec - REC_NEXT));
2598 if (!prev);
2599 else if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > prev_rec ||
2600 prev_rec > heap_top))
2601 goto corrupted;
2602
2603 rec_t * const next_rec= block.frame + n;
2604 if (next_rec == block.frame + PAGE_NEW_SUPREMUM);
2605 else if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > next_rec ||
2606 next_rec > heap_top))
2607 goto corrupted;
2608
2609 ulint n_owned;
2610 rec_t *owner_rec= next_rec;
2611 n= static_cast<uint16_t>(next_rec - block.frame);
2612
2613 for (ulint ns= PAGE_DIR_SLOT_MAX_N_OWNED;
2614 !(n_owned= rec_get_n_owned_new(owner_rec)); )
2615 {
2616 n= static_cast<uint16_t>(n + mach_read_from_2(owner_rec - REC_NEXT));
2617 owner_rec= block.frame + n;
2618 if (n == PAGE_NEW_SUPREMUM);
2619 else if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > owner_rec ||
2620 owner_rec > heap_top))
2621 goto corrupted;
2622 if (!ns--)
2623 goto corrupted; /* Corrupted (cyclic?) next-record list */
2624 }
2625
2626 page_dir_slot_t* owner_slot= last_slot;
2627
2628 if (n_owned > PAGE_DIR_SLOT_MAX_N_OWNED)
2629 goto corrupted;
2630 else
2631 {
2632 static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
2633 alignas(2) byte slot_buf[2];
2634 mach_write_to_2(slot_buf, owner_rec - block.frame);
2635 const page_dir_slot_t * const first_slot=
2636 page_dir_get_nth_slot(block.frame, 0);
2637
2638 while (memcmp_aligned<2>(owner_slot, slot_buf, 2))
2639 if ((owner_slot+= 2) == first_slot)
2640 goto corrupted;
2641 }
2642
2643 const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_c + (enc_hdr_l >> 3);
2644 const ulint data_size= data_c + data_len - (enc_hdr_l >> 3);
2645
2646 /* Perform final consistency checks and then apply the change to the page. */
2647 byte *buf;
2648 if (reuse)
2649 {
2650 byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
2651 block.frame);
2652 rec_t *free_rec= block.frame + mach_read_from_2(page_free);
2653 if (UNIV_UNLIKELY(heap_bot + REC_N_NEW_EXTRA_BYTES > free_rec ||
2654 free_rec > heap_top))
2655 goto corrupted;
2656 buf= free_rec - extra_size;
2657 if (shift & 1)
2658 buf-= shift >> 1;
2659 else
2660 buf+= shift >> 1;
2661
2662 if (UNIV_UNLIKELY(heap_bot > buf ||
2663 &buf[extra_size + data_size] > heap_top))
2664 goto corrupted;
2665 byte *page_garbage= my_assume_aligned<2>(page_free + 2);
2666 if (UNIV_UNLIKELY(mach_read_from_2(page_garbage) < extra_size + data_size))
2667 goto corrupted;
2668 if ((n= mach_read_from_2(free_rec - REC_NEXT)) != 0)
2669 {
2670 n= static_cast<uint16_t>(n + free_rec - block.frame);
2671 if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
2672 heap_top < block.frame + n))
2673 goto corrupted;
2674 }
2675 mach_write_to_2(page_free, n);
2676 mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) -
2677 (extra_size + data_size));
2678 h= rec_get_heap_no_new(free_rec);
2679 }
2680 else
2681 {
2682 if (UNIV_UNLIKELY(heap_top + extra_size + data_size > last_slot))
2683 goto corrupted;
2684 mach_write_to_2(page_n_heap, h + 1);
2685 h&= 0x7fff;
2686 mach_write_to_2(page_heap_top,
2687 mach_read_from_2(page_heap_top) + extra_size + data_size);
2688 buf= heap_top;
2689 }
2690
2691 memcpy(buf, data, (enc_hdr_l >> 3));
2692 buf+= enc_hdr_l >> 3;
2693 data_len-= enc_hdr_l >> 3;
2694 data= &static_cast<const byte*>(data)[enc_hdr_l >> 3];
2695
2696 memcpy(buf, prev_rec - REC_N_NEW_EXTRA_BYTES - hdr_c, hdr_c);
2697 buf+= hdr_c;
2698 *buf++= static_cast<byte>((enc_hdr_l & 3) << 4); /* info_bits; n_owned=0 */
2699 *buf++= static_cast<byte>(h >> 5); /* MSB of heap number */
2700 h= (h & ((1U << 5) - 1)) << 3;
2701 static_assert(REC_STATUS_ORDINARY == 0, "compatibility");
2702 static_assert(REC_STATUS_INSTANT == 4, "compatibility");
2703 if (page_is_leaf(block.frame))
2704 h|= enc_hdr_l & REC_STATUS_INSTANT;
2705 else
2706 {
2707 ut_ad(!(enc_hdr_l & REC_STATUS_INSTANT)); /* Checked at the start */
2708 h|= REC_STATUS_NODE_PTR;
2709 }
2710 *buf++= static_cast<byte>(h); /* LSB of heap number, and status */
2711 static_assert(REC_NEXT == 2, "compatibility");
2712 buf+= REC_NEXT;
2713 mach_write_to_2(buf - REC_NEXT, static_cast<uint16_t>(next_rec - buf));
2714 byte *page_last_insert= my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER +
2715 block.frame);
2716 const uint16_t last_insert= mach_read_from_2(page_last_insert);
2717 mach_write_to_2(page_last_insert, buf - block.frame);
2718 mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>(buf - prev_rec));
2719 memcpy(buf, prev_rec, data_c);
2720 buf+= data_c;
2721 memcpy(buf, data, data_len);
2722
2723 rec_set_bit_field_1(owner_rec, n_owned + 1, REC_NEW_N_OWNED,
2724 REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
2725
2726 /* Update PAGE_DIRECTION_B, PAGE_N_DIRECTION if needed */
2727 if (block.frame[FIL_PAGE_TYPE + 1] != byte(FIL_PAGE_RTREE))
2728 {
2729 byte *dir= &block.frame[PAGE_DIRECTION_B + PAGE_HEADER];
2730 byte *n_dir= my_assume_aligned<2>
2731 (&block.frame[PAGE_N_DIRECTION + PAGE_HEADER]);
2732 if (UNIV_UNLIKELY(!last_insert))
2733 {
2734 no_direction:
2735 *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_NO_DIRECTION);
2736 memset(n_dir, 0, 2);
2737 }
2738 else if (block.frame + last_insert == prev_rec &&
2739 (*dir & ((1U << 3) - 1)) != PAGE_LEFT)
2740 {
2741 *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_RIGHT);
2742 inc_dir:
2743 mach_write_to_2(n_dir, mach_read_from_2(n_dir) + 1);
2744 }
2745 else if (next_rec == block.frame + last_insert &&
2746 (*dir & ((1U << 3) - 1)) != PAGE_RIGHT)
2747 {
2748 *dir= static_cast<byte>((*dir & ~((1U << 3) - 1)) | PAGE_LEFT);
2749 goto inc_dir;
2750 }
2751 else
2752 goto no_direction;
2753 }
2754
2755 /* Update PAGE_N_RECS. */
2756 byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
2757 block.frame);
2758
2759 mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) + 1);
2760
2761 if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED))
2762 page_dir_split_slot(block, owner_slot);
2763 ut_ad(page_simple_validate_new(block.frame));
2764 return false;
2765 }
2766
2767 /** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by
2768 page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page.
2769 @param block B-tree or R-tree page in ROW_FORMAT=REDUNDANT
2770 @param prev byte offset of the predecessor, relative to PAGE_OLD_INFIMUM
2771 @return whether the operation failed (inconcistency was noticed) */
page_apply_delete_redundant(const buf_block_t & block,ulint prev)2772 bool page_apply_delete_redundant(const buf_block_t &block, ulint prev)
2773 {
2774 const uint16_t n_slots= page_dir_get_n_slots(block.frame);
2775 ulint n_recs= page_get_n_recs(block.frame);
2776 const page_id_t id(block.page.id());
2777
2778 if (UNIV_UNLIKELY(!n_recs || n_slots < 2 ||
2779 !fil_page_index_page_check(block.frame) ||
2780 page_get_page_no(block.frame) != id.page_no() ||
2781 mach_read_from_2(my_assume_aligned<2>
2782 (PAGE_OLD_SUPREMUM - REC_NEXT +
2783 block.frame)) ||
2784 page_is_comp(block.frame)))
2785 {
2786 corrupted:
2787 ib::error() << "Not applying DELETE_ROW_FORMAT_REDUNDANT"
2788 " due to corruption on " << id;
2789 return true;
2790 }
2791
2792 byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
2793 rec_t *prev_rec= block.frame + PAGE_OLD_INFIMUM + prev;
2794 if (UNIV_UNLIKELY(prev_rec > slot))
2795 goto corrupted;
2796 uint16_t n= mach_read_from_2(prev_rec - REC_NEXT);
2797 rec_t *rec= block.frame + n;
2798 if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
2799 slot < rec))
2800 goto corrupted;
2801 const ulint extra_size= REC_N_OLD_EXTRA_BYTES + rec_get_n_fields_old(rec) *
2802 (rec_get_1byte_offs_flag(rec) ? 1 : 2);
2803 const ulint data_size= rec_get_data_size_old(rec);
2804 if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + extra_size ||
2805 slot < rec + data_size))
2806 goto corrupted;
2807
2808 n= mach_read_from_2(rec - REC_NEXT);
2809 rec_t *next= block.frame + n;
2810 if (n == PAGE_OLD_SUPREMUM);
2811 else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
2812 slot < next))
2813 goto corrupted;
2814
2815 rec_t *s= rec;
2816 ulint slot_owned;
2817 for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_old(s)); )
2818 {
2819 n= mach_read_from_2(s - REC_NEXT);
2820 s= block.frame + n;
2821 if (n == PAGE_OLD_SUPREMUM);
2822 else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
2823 slot < s))
2824 goto corrupted;
2825 if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */
2826 goto corrupted;
2827 }
2828 slot_owned--;
2829
2830 /* The first slot is always pointing to the infimum record.
2831 Find the directory slot pointing to s. */
2832 const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2);
2833 alignas(2) byte slot_offs[2];
2834 mach_write_to_2(slot_offs, s - block.frame);
2835 static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
2836
2837 while (memcmp_aligned<2>(slot, slot_offs, 2))
2838 if ((slot+= 2) == first_slot)
2839 goto corrupted;
2840
2841 if (rec == s)
2842 {
2843 s= prev_rec;
2844 mach_write_to_2(slot, s - block.frame);
2845 }
2846
2847 memcpy(prev_rec - REC_NEXT, rec - REC_NEXT, 2);
2848 s-= REC_OLD_N_OWNED;
2849 *s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) |
2850 slot_owned << REC_N_OWNED_SHIFT);
2851 page_mem_free(block, rec, data_size, extra_size);
2852
2853 if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED)
2854 page_dir_balance_slot(block, (first_slot - slot) / 2);
2855
2856 ut_ad(page_simple_validate_old(block.frame));
2857 return false;
2858 }
2859
2860 /** Apply a DELETE_ROW_FORMAT_DYNAMIC record that was written by
2861 page_cur_delete_rec() for a ROW_FORMAT=COMPACT or DYNAMIC page.
2862 @param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
2863 @param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
2864 @param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES
2865 @param data_size data payload size, in bytes
2866 @return whether the operation failed (inconcistency was noticed) */
page_apply_delete_dynamic(const buf_block_t & block,ulint prev,size_t hdr_size,size_t data_size)2867 bool page_apply_delete_dynamic(const buf_block_t &block, ulint prev,
2868 size_t hdr_size, size_t data_size)
2869 {
2870 const uint16_t n_slots= page_dir_get_n_slots(block.frame);
2871 ulint n_recs= page_get_n_recs(block.frame);
2872 const page_id_t id(block.page.id());
2873
2874 if (UNIV_UNLIKELY(!n_recs || n_slots < 2 ||
2875 !fil_page_index_page_check(block.frame) ||
2876 page_get_page_no(block.frame) != id.page_no() ||
2877 mach_read_from_2(my_assume_aligned<2>
2878 (PAGE_NEW_SUPREMUM - REC_NEXT +
2879 block.frame)) ||
2880 !page_is_comp(block.frame)))
2881 {
2882 corrupted:
2883 ib::error() << "Not applying DELETE_ROW_FORMAT_DYNAMIC"
2884 " due to corruption on " << id;
2885 return true;
2886 }
2887
2888 byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
2889 uint16_t n= static_cast<uint16_t>(PAGE_NEW_INFIMUM + prev);
2890 rec_t *prev_rec= block.frame + n;
2891 if (UNIV_UNLIKELY(prev_rec > slot))
2892 goto corrupted;
2893 n= static_cast<uint16_t>(n + mach_read_from_2(prev_rec - REC_NEXT));
2894 rec_t *rec= block.frame + n;
2895 if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
2896 slot < rec))
2897 goto corrupted;
2898 const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_size;
2899 if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + extra_size ||
2900 slot < rec + data_size))
2901 goto corrupted;
2902 n= static_cast<uint16_t>(n + mach_read_from_2(rec - REC_NEXT));
2903 rec_t *next= block.frame + n;
2904 if (n == PAGE_NEW_SUPREMUM);
2905 else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
2906 slot < next))
2907 goto corrupted;
2908
2909 rec_t *s= rec;
2910 n= static_cast<uint16_t>(rec - block.frame);
2911 ulint slot_owned;
2912 for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_new(s)); )
2913 {
2914 const uint16_t next= mach_read_from_2(s - REC_NEXT);
2915 if (UNIV_UNLIKELY(next < REC_N_NEW_EXTRA_BYTES ||
2916 next > static_cast<uint16_t>(-REC_N_NEW_EXTRA_BYTES)))
2917 goto corrupted;
2918 n= static_cast<uint16_t>(n + next);
2919 s= block.frame + n;
2920 if (n == PAGE_NEW_SUPREMUM);
2921 else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
2922 slot < s))
2923 goto corrupted;
2924 if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */
2925 goto corrupted;
2926 }
2927 slot_owned--;
2928
2929 /* The first slot is always pointing to the infimum record.
2930 Find the directory slot pointing to s. */
2931 const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2);
2932 alignas(2) byte slot_offs[2];
2933 mach_write_to_2(slot_offs, s - block.frame);
2934 static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
2935
2936 while (memcmp_aligned<2>(slot, slot_offs, 2))
2937 if ((slot+= 2) == first_slot)
2938 goto corrupted;
2939
2940 if (rec == s)
2941 {
2942 s= prev_rec;
2943 mach_write_to_2(slot, s - block.frame);
2944 }
2945
2946 mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>(next - prev_rec));
2947 s-= REC_NEW_N_OWNED;
2948 *s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) |
2949 slot_owned << REC_N_OWNED_SHIFT);
2950 page_mem_free(block, rec, data_size, extra_size);
2951
2952 if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED)
2953 page_dir_balance_slot(block, (first_slot - slot) / 2);
2954
2955 ut_ad(page_simple_validate_new(block.frame));
2956 return false;
2957 }
2958
2959 #ifdef UNIV_COMPILE_TEST_FUNCS
2960
2961 /*******************************************************************//**
2962 Print the first n numbers, generated by ut_rnd_gen() to make sure
2963 (visually) that it works properly. */
2964 void
test_ut_rnd_gen(int n)2965 test_ut_rnd_gen(
2966 int n) /*!< in: print first n numbers */
2967 {
2968 int i;
2969 unsigned long long rnd;
2970
2971 for (i = 0; i < n; i++) {
2972 rnd = ut_rnd_gen();
2973 printf("%llu\t%%2=%llu %%3=%llu %%5=%llu %%7=%llu %%11=%llu\n",
2974 rnd,
2975 rnd % 2,
2976 rnd % 3,
2977 rnd % 5,
2978 rnd % 7,
2979 rnd % 11);
2980 }
2981 }
2982
2983 #endif /* UNIV_COMPILE_TEST_FUNCS */
2984